pydatatask.task module#

A Task is a unit of execution which can act on multiple repositories.

You define a task by instantiating a Task subclass and passing it to a Pipeline object.

Tasks are related to Repositories by Links. Links are created by Task.link("my_link_name", my_repository, **disposition). The main disposition kwargs you’ll want to use are is_input and is_output. See Task.link for more information.

For a shortcut for linking the output of one task as the input of another task, see Task.plug.

pydatatask.task.STDOUT = <pydatatask.task._StderrIsStdout object>#

Bases: object

The dataclass for holding linked repositories and their disposition metadata. Don’t create these manually, instead use Task.link.

repo: Repository#
is_input: bool = False#
is_output: bool = False#
is_status: bool = False#
inhibits_start: bool = False#
required_for_start: bool = False#
inhibits_output: bool = False#
required_for_output: bool = False#
class pydatatask.task.Task(name: str, ready: Repository | None = None, disabled: bool = False)[source]#

Bases: object

The Task base class.

property ready#

Return the repository whose job membership is used to determine whether a task instance should be launched. If an override is provided to the constructor, this is that, otherwise it is AND(*requred_for_start, NOT(OR(*inhibits_start))).

Create a link between this task and a repository.

Parameters:
  • name – The name of the link. Used only in the admin interface.

  • repo – The repository to link.

  • is_input – Whether this repository contains data which is consumed by the task. Default False.

  • is_output – Whether this repository is populated by the task. Default False.

  • is_status – Whether this task is populated with task-ephemeral data. Default False.

  • inhibits_start – Whether membership in this repository should be used in the default ready repository to prevent jobs for being launched. Default False.

  • required_for_start – Whether membership in this repository should be used in the default ready repository to allow jobs to be launched. If unspecified, defaults to is_input.

  • inhibits_output – Whether this repository should become inhibits_start in tasks this task is plugged into. Default False.

  • required_for_output – Whether this repository should become required_for_start in tasks this task is plugged into. If unspecified, defaults to is_output.

plug(output: Task, output_links: Iterable[str] | None = None, meta: bool = True, translator: Repository | None = None, translate_allow_deletes=False, translate_prefetch_lookup=True)[source]#

Link the output repositories from output as inputs to this task.

Parameters:
  • output – The task to plug into this one.

  • output_links – Optional: An iterable allowlist of links to only use.

  • meta – Whether to transfer repository inhibit- and required_for- dispositions. Default True.

  • translator – Optional: A repository used to transform job identifiers. If this is provided, a job will be said to be present in the resulting linked repository if the source repository contains the key await translator.info(job).

  • translate_allow_deletes – If translator is provided, this controls whether attempts to delete from the translated repository will do anything. Default False.

  • translate_prefetch_lookup – If translator is provided, this controls whether the translated repository will pre-fetch the list of translations on first access. Default True.

property input#

A mapping from link name to repository for all links marked is_input.

property output#

A mapping from link name to repository for all links marked is_output.

property status#

A mapping from link name to repository for all links marked is_status.

property inhibits_start#

A mapping from link name to repository for all links marked inhibits_start.

property required_for_start#

A mapping from link name to repository for all links marked required_for_start.

property inhibits_output#

A mapping from link name to repository for all links marked inhibits_output.

property required_for_output#

A mapping from link name to repository for all links marked required_for_output.

async validate()[source]#

Raise an exception if for any reason the task is misconfigured. This is guaranteed to be called exactly once per pipeline, so it is safe to use for setup and initialization in an async context.

abstract async update() bool[source]#

Part one of the pipeline maintenance loop. Override this to perform any maintenance operations on the set of live tasks. Typically, this entails reaping finished processes.

Returns True if literally anything interesting happened, or if there are any live tasks.

abstract async launch(job)[source]#

Launch a job. Override this to begin execution of the provided job. Error handling will be done for you.

class pydatatask.task.KubeTask(name: str, podman: Callable[[], PodManager], resources: ResourceManager, template: str | Path, logs: BlobRepository | None, done: MetadataRepository | None, window: timedelta = datetime.timedelta(seconds=60), timeout: timedelta | None = None, env: Dict[str, Any] | None = None, ready: Repository | None = None)[source]#

Bases: Task

A task which runs a kubernetes pod.

Will automatically link a LiveKubeRepository as “live” with inhibits_start, inhibits_output, is_status

Parameters:
  • name – The name of the task.

  • podman – A callable returning a PodManager to use to connect to the cluster.

  • resources – A ResourceManager instance. Tasks launched will contribute to its quota and be denied if they would break the quota.

  • template – YAML markup for a pod manifest template, either as a string or a path to a file.

  • logs – Optional: A BlobRepository to dump pod logs to on completion. Linked as “logs” with inhibits_start, required_for_output, is_status.

  • done – A MetadataRepository in which to dump some information about the pod’s lifetime and termination on completion. Linked as “done” with inhibits_start, required_for_output, is_status.

  • window – Optional: How far back into the past to look in order to determine whether we have recently launched too many pods too quickly.

  • timeout – Optional: When a pod is found to have been running continuously for this amount of time, it will be timed out and stopped. The method handle_timeout will be called in-process.

  • env – Optional: Additional keys to add to the template environment.

  • ready – Optional: A repository from which to read task-ready status.

It is highly recommended to provide one or more of done or logs so that at least one link is present with inhibits_start.

property podman: PodManager#

The pod manager instance for this task. Will raise an error if the manager is provided by an unopened session.

async launch(job)[source]#
async delete(pod: V1Pod)[source]#

Kill a pod and relinquish its resources without marking the task as complete.

async update()[source]#
async handle_timeout(pod: V1Pod)[source]#

You may override this method in a subclass, and it will be called whenever a pod times out. You can use this method to e.g. scrape in-progress data out of the pod via an exec.

class pydatatask.task.ProcessTask(name: str, manager: Callable[[], AbstractProcessManager], resource_manager: ResourceManager, job_resources: Resources, pids: MetadataRepository, template: str, window: timedelta = datetime.timedelta(seconds=60), environ: Dict[str, str] | None = None, done: MetadataRepository | None = None, stdin: BlobRepository | None = None, stdout: BlobRepository | None = None, stderr: BlobRepository | _StderrIsStdout | None = None, ready: Repository | None = None)[source]#

Bases: Task

A task that runs a script. The interpreter is specified by the shebang, or the default shell if none present. The execution environment for the task is defined by the ProcessManager instance provided as an argument.

Parameters:
  • name – The name of this task.

  • manager – A callable returnint the process manager with control over the target execution environment.

  • resource_manager – A ResourceManager instance. Tasks launched will contribute to its quota and be denied if they would break the quota.

  • job_resources – The amount of resources an individual job should contribute to the quota. Note that this is currently not enforced target-side, so jobs may actually take up more resources than assigned.

  • pids – A metadata repository used to store the current live-status of processes. Will automatically be linked as “pids” with is_status, inhibits_start, inhibits_output.

  • template – YAML markup for the template of a script to run, either as a string or a path to a file.

  • environ – Additional environment variables to set on the target machine before running the task.

  • window – How recently a process must have been launched in order to contribute to the process rate-limiting.

  • done – Optional: A metadata repository in which to dump some information about the process’s lifetime and termination on completion. Linked as “done” with inhibits_start, required_for_output, is_status.

  • stdin – Optional: A blob repository from which to source the process’ standard input. The content will be preloaded and transferred to the target environment, so the target does not need to be authenticated to this repository. Linked as “stdin” with is_input.

  • stdout – Optional: A blob repository into which to dump the process’ standard output. The content will be transferred from the target environment on completion, so the target does not need to be authenticated to this repository. Linked as “stdout” with is_output.

  • stderr – Optional: A blob repository into which to dump the process’ standard error, or the constant pydatatask.task.STDOUT to indicate that the stream should be interleaved with stdout. Otherwise, the content will be transferred from the target environment on completion, so the target does not need to be authenticated to this repository. Linked as “stderr” with is_output.

  • ready – Optional: A repository from which to read task-ready status.

It is highly recommended to provide at least one of done, stdout, or stderr, so that at least one link is present with inhibits_start.

property manager: AbstractProcessManager#

The process manager for this task. Will raise an error if the manager comes from a session which is closed.

property stderr: BlobRepository | None#

The repository into which stderr will be dumped, or None if it will go to the null device.

property basedir: Path#

The path in the target environment that will be used to store information about this task.

async update()[source]#
async launch(job)[source]#
class pydatatask.task.InProcessSyncTask(name: str, done: MetadataRepository, ready: Repository | None = None, func: FunctionTaskProtocol | None = None)[source]#

Bases: Task

A task which runs in-process. Typical usage of this task might look like the following:

@pydatatask.InProcessSyncTask("my_task", done_repo)
async def my_task(job: str, inp: pydatatask.MetadataRepository, out: pydatatask.MetadataRepository):
    await out.dump(job, await inp.info(job))

my_task.link("inp", repo_input, is_input=True)
my_task.link("out", repo_output, is_output=True)
Parameters:
  • name – The name of this task.

  • done – A metadata repository to store some information about a job’s runtime and termination on completion.

  • ready – Optional: A repository from which to read task-ready status.

  • func – Optional: The async function to run as the task body, if you don’t want to use this task as a decorator.

async validate()[source]#
async update()[source]#
async launch(job)[source]#
class pydatatask.task.ExecutorTask(name: str, executor: Executor, done: MetadataRepository, ready: Repository | None = None, func: Callable | None = None)[source]#

Bases: Task

A task which runs python functions in a concurrent.futures.Executor. This has not been tested on anything but the concurrent.futures.ThreadPoolExecutor, so beware!

See InProcessSyncTask for information on how to use instances of this class as decorators for their bodies.

It is expected that the executor will perform all necessary resource quota management.

Parameters:
  • name – The name of this task.

  • executor – The executor to run jobs in.

  • done – A metadata repository to store some information about a job’s runtime and termination on completion.

  • ready – Optional: A repository from which to read task-ready status.

  • func – Optional: The async function to run as the task body, if you don’t want to use this task as a decorator.

async update()[source]#
async validate()[source]#
async launch(job)[source]#
async cancel(job)[source]#

Stop the current job from running, or do nothing if it is not running.

class pydatatask.task.KubeFunctionTask(name: str, podman: Callable[[], PodManager], resources: ResourceManager, template: str | Path, logs: BlobRepository | None = None, kube_done: MetadataRepository | None = None, func_done: MetadataRepository | None = None, env: Dict[str, Any] | None = None, func: Callable | None = None)[source]#

Bases: KubeTask

A task which runs a python function on a kubernetes cluster. Requires a pod template which will execute a python script calling pydatatask.main.main. This works by running python3 main.py launch [task] [job] --sync.

Sample usage:

@KubeFunctionTask(
    "my_task",
    podman,
    resman,
    '''
        apiVersion: v1
        kind: Pod
        spec:
          containers:
            - name: leader
              image: "docker.example.com/my/image"
              command:
                - python3
                - {{argv0}}
                - launch
                - "{{task}}"
                - "{{job}}"
                - "--force"
                - "--sync"
              resources:
                requests:
                  cpu: 100m
                  memory: 256Mi
    ''',
    repo_logs,
    repo_done,
    repo_func_done,
)
async def my_task(job: str, inp: pydatatask.MetadataRepository, out: pydatatask.MetadataRepository):
    await out.dump(job, await inp.info(job))

my_task.link("inp", repo_input, is_input=True)
my_task.link("out", repo_output, is_output=True)
Parameters:
  • name – The name of this task.

  • podman – A callable returning a PodManager to use to connect to the cluster.

  • resources – A ResourceManager instance. Tasks launched will contribute to its quota and be denied if they would break the quota.

  • template – YAML markup for a pod manifest template that will run pydatatask.main.main as python3 main.py launch [task] [job] --sync --force, either as a string or a path to a file.

  • logs – Optional: A BlobRepository to dump pod logs to on completion. Linked as “logs” with inhibits_start, required_for_output, is_status.

  • kube_done – Optional: A MetadataRepository in which to dump some information about the pod’s lifetime and termination on completion. Linked as “done” with inhibits_start, required_for_output, is_status.

  • func_done – Optional: A MetadataRepository in which to dump some information about the function’s lifetime and termination on completion. Linked as “func_done” with inhibits_start, required_for_output, is_status.

  • env – Optional: Additional keys to add to the template environment.

  • ready – Optional: A repository from which to read task-ready status.

  • func – Optional: The async function to run as the task body, if you don’t want to use this task as a decorator.

It is highly recommended to provide at least one of kube_done, func_done, or logs, so that at least one link is present with inhibits_start.

async validate()[source]#
async launch(job)[source]#