pydatatask.task module#
A Task is a unit of execution which can act on multiple repositories.
You define a task by instantiating a Task subclass and passing it to a Pipeline object.
Tasks are related to Repositories by Links. Links are created by
Task.link("my_link_name", my_repository, **disposition)
. The main disposition kwargs you’ll want to use are
is_input
and is_output
. See Task.link
for more information.
For a shortcut for linking the output of one task as the input of another task, see Task.plug
.
- pydatatask.task.STDOUT = <pydatatask.task._StderrIsStdout object>#
- class pydatatask.task.Link(repo: Repository, is_input: bool = False, is_output: bool = False, is_status: bool = False, inhibits_start: bool = False, required_for_start: bool = False, inhibits_output: bool = False, required_for_output: bool = False)[source]#
Bases:
object
The dataclass for holding linked repositories and their disposition metadata. Don’t create these manually, instead use
Task.link
.- repo: Repository#
- class pydatatask.task.Task(name: str, ready: Repository | None = None, disabled: bool = False)[source]#
Bases:
object
The Task base class.
- property ready#
Return the repository whose job membership is used to determine whether a task instance should be launched. If an override is provided to the constructor, this is that, otherwise it is
AND(*requred_for_start, NOT(OR(*inhibits_start)))
.
- link(name: str, repo: Repository, is_input: bool = False, is_output: bool = False, is_status: bool = False, inhibits_start: bool = False, required_for_start: bool | None = None, inhibits_output: bool = False, required_for_output: bool | None = None)[source]#
Create a link between this task and a repository.
- Parameters:
name – The name of the link. Used only in the admin interface.
repo – The repository to link.
is_input – Whether this repository contains data which is consumed by the task. Default False.
is_output – Whether this repository is populated by the task. Default False.
is_status – Whether this task is populated with task-ephemeral data. Default False.
inhibits_start – Whether membership in this repository should be used in the default
ready
repository to prevent jobs for being launched. Default False.required_for_start – Whether membership in this repository should be used in the default
ready
repository to allow jobs to be launched. If unspecified, defaults tois_input
.inhibits_output – Whether this repository should become
inhibits_start
in tasks this task is plugged into. Default False.required_for_output – Whether this repository should become
required_for_start
in tasks this task is plugged into. If unspecified, defaults tois_output
.
- plug(output: Task, output_links: Iterable[str] | None = None, meta: bool = True, translator: Repository | None = None, translate_allow_deletes=False, translate_prefetch_lookup=True)[source]#
Link the output repositories from
output
as inputs to this task.- Parameters:
output – The task to plug into this one.
output_links – Optional: An iterable allowlist of links to only use.
meta – Whether to transfer repository inhibit- and required_for- dispositions. Default True.
translator – Optional: A repository used to transform job identifiers. If this is provided, a job will be said to be present in the resulting linked repository if the source repository contains the key
await translator.info(job)
.translate_allow_deletes – If translator is provided, this controls whether attempts to delete from the translated repository will do anything. Default False.
translate_prefetch_lookup – If translator is provided, this controls whether the translated repository will pre-fetch the list of translations on first access. Default True.
- property input#
A mapping from link name to repository for all links marked
is_input
.
- property output#
A mapping from link name to repository for all links marked
is_output
.
- property status#
A mapping from link name to repository for all links marked
is_status
.
- property inhibits_start#
A mapping from link name to repository for all links marked
inhibits_start
.
- property required_for_start#
A mapping from link name to repository for all links marked
required_for_start
.
- property inhibits_output#
A mapping from link name to repository for all links marked
inhibits_output
.
- property required_for_output#
A mapping from link name to repository for all links marked
required_for_output
.
- async validate()[source]#
Raise an exception if for any reason the task is misconfigured. This is guaranteed to be called exactly once per pipeline, so it is safe to use for setup and initialization in an async context.
- abstract async update() bool [source]#
Part one of the pipeline maintenance loop. Override this to perform any maintenance operations on the set of live tasks. Typically, this entails reaping finished processes.
Returns True if literally anything interesting happened, or if there are any live tasks.
- class pydatatask.task.KubeTask(name: str, podman: Callable[[], PodManager], resources: ResourceManager, template: str | Path, logs: BlobRepository | None, done: MetadataRepository | None, window: timedelta = datetime.timedelta(seconds=60), timeout: timedelta | None = None, env: Dict[str, Any] | None = None, ready: Repository | None = None)[source]#
Bases:
Task
A task which runs a kubernetes pod.
Will automatically link a
LiveKubeRepository
as “live” withinhibits_start, inhibits_output, is_status
- Parameters:
name – The name of the task.
podman – A callable returning a PodManager to use to connect to the cluster.
resources – A ResourceManager instance. Tasks launched will contribute to its quota and be denied if they would break the quota.
template – YAML markup for a pod manifest template, either as a string or a path to a file.
logs – Optional: A BlobRepository to dump pod logs to on completion. Linked as “logs” with
inhibits_start, required_for_output, is_status
.done – A MetadataRepository in which to dump some information about the pod’s lifetime and termination on completion. Linked as “done” with
inhibits_start, required_for_output, is_status
.window – Optional: How far back into the past to look in order to determine whether we have recently launched too many pods too quickly.
timeout – Optional: When a pod is found to have been running continuously for this amount of time, it will be timed out and stopped. The method
handle_timeout
will be called in-process.env – Optional: Additional keys to add to the template environment.
ready – Optional: A repository from which to read task-ready status.
It is highly recommended to provide one or more of
done
orlogs
so that at least one link is present withinhibits_start
.- property podman: PodManager#
The pod manager instance for this task. Will raise an error if the manager is provided by an unopened session.
- class pydatatask.task.ProcessTask(name: str, manager: Callable[[], AbstractProcessManager], resource_manager: ResourceManager, job_resources: Resources, pids: MetadataRepository, template: str, window: timedelta = datetime.timedelta(seconds=60), environ: Dict[str, str] | None = None, done: MetadataRepository | None = None, stdin: BlobRepository | None = None, stdout: BlobRepository | None = None, stderr: BlobRepository | _StderrIsStdout | None = None, ready: Repository | None = None)[source]#
Bases:
Task
A task that runs a script. The interpreter is specified by the shebang, or the default shell if none present. The execution environment for the task is defined by the ProcessManager instance provided as an argument.
- Parameters:
name – The name of this task.
manager – A callable returnint the process manager with control over the target execution environment.
resource_manager – A ResourceManager instance. Tasks launched will contribute to its quota and be denied if they would break the quota.
job_resources – The amount of resources an individual job should contribute to the quota. Note that this is currently not enforced target-side, so jobs may actually take up more resources than assigned.
pids – A metadata repository used to store the current live-status of processes. Will automatically be linked as “pids” with
is_status, inhibits_start, inhibits_output
.template – YAML markup for the template of a script to run, either as a string or a path to a file.
environ – Additional environment variables to set on the target machine before running the task.
window – How recently a process must have been launched in order to contribute to the process rate-limiting.
done – Optional: A metadata repository in which to dump some information about the process’s lifetime and termination on completion. Linked as “done” with
inhibits_start, required_for_output, is_status
.stdin – Optional: A blob repository from which to source the process’ standard input. The content will be preloaded and transferred to the target environment, so the target does not need to be authenticated to this repository. Linked as “stdin” with
is_input
.stdout – Optional: A blob repository into which to dump the process’ standard output. The content will be transferred from the target environment on completion, so the target does not need to be authenticated to this repository. Linked as “stdout” with
is_output
.stderr – Optional: A blob repository into which to dump the process’ standard error, or the constant
pydatatask.task.STDOUT
to indicate that the stream should be interleaved with stdout. Otherwise, the content will be transferred from the target environment on completion, so the target does not need to be authenticated to this repository. Linked as “stderr” withis_output
.ready – Optional: A repository from which to read task-ready status.
It is highly recommended to provide at least one of
done
,stdout
, orstderr
, so that at least one link is present withinhibits_start
.- property manager: AbstractProcessManager#
The process manager for this task. Will raise an error if the manager comes from a session which is closed.
- property stderr: BlobRepository | None#
The repository into which stderr will be dumped, or None if it will go to the null device.
- class pydatatask.task.InProcessSyncTask(name: str, done: MetadataRepository, ready: Repository | None = None, func: FunctionTaskProtocol | None = None)[source]#
Bases:
Task
A task which runs in-process. Typical usage of this task might look like the following:
@pydatatask.InProcessSyncTask("my_task", done_repo) async def my_task(job: str, inp: pydatatask.MetadataRepository, out: pydatatask.MetadataRepository): await out.dump(job, await inp.info(job)) my_task.link("inp", repo_input, is_input=True) my_task.link("out", repo_output, is_output=True)
- Parameters:
name – The name of this task.
done – A metadata repository to store some information about a job’s runtime and termination on completion.
ready – Optional: A repository from which to read task-ready status.
func – Optional: The async function to run as the task body, if you don’t want to use this task as a decorator.
- class pydatatask.task.ExecutorTask(name: str, executor: Executor, done: MetadataRepository, ready: Repository | None = None, func: Callable | None = None)[source]#
Bases:
Task
A task which runs python functions in a
concurrent.futures.Executor
. This has not been tested on anything but theconcurrent.futures.ThreadPoolExecutor
, so beware!See
InProcessSyncTask
for information on how to use instances of this class as decorators for their bodies.It is expected that the executor will perform all necessary resource quota management.
- Parameters:
name – The name of this task.
executor – The executor to run jobs in.
done – A metadata repository to store some information about a job’s runtime and termination on completion.
ready – Optional: A repository from which to read task-ready status.
func – Optional: The async function to run as the task body, if you don’t want to use this task as a decorator.
- class pydatatask.task.KubeFunctionTask(name: str, podman: Callable[[], PodManager], resources: ResourceManager, template: str | Path, logs: BlobRepository | None = None, kube_done: MetadataRepository | None = None, func_done: MetadataRepository | None = None, env: Dict[str, Any] | None = None, func: Callable | None = None)[source]#
Bases:
KubeTask
A task which runs a python function on a kubernetes cluster. Requires a pod template which will execute a python script calling
pydatatask.main.main
. This works by runningpython3 main.py launch [task] [job] --sync
.Sample usage:
@KubeFunctionTask( "my_task", podman, resman, ''' apiVersion: v1 kind: Pod spec: containers: - name: leader image: "docker.example.com/my/image" command: - python3 - {{argv0}} - launch - "{{task}}" - "{{job}}" - "--force" - "--sync" resources: requests: cpu: 100m memory: 256Mi ''', repo_logs, repo_done, repo_func_done, ) async def my_task(job: str, inp: pydatatask.MetadataRepository, out: pydatatask.MetadataRepository): await out.dump(job, await inp.info(job)) my_task.link("inp", repo_input, is_input=True) my_task.link("out", repo_output, is_output=True)
- Parameters:
name – The name of this task.
podman – A callable returning a PodManager to use to connect to the cluster.
resources – A ResourceManager instance. Tasks launched will contribute to its quota and be denied if they would break the quota.
template – YAML markup for a pod manifest template that will run
pydatatask.main.main
aspython3 main.py launch [task] [job] --sync --force
, either as a string or a path to a file.logs – Optional: A BlobRepository to dump pod logs to on completion. Linked as “logs” with
inhibits_start, required_for_output, is_status
.kube_done – Optional: A MetadataRepository in which to dump some information about the pod’s lifetime and termination on completion. Linked as “done” with
inhibits_start, required_for_output, is_status
.func_done – Optional: A MetadataRepository in which to dump some information about the function’s lifetime and termination on completion. Linked as “func_done” with
inhibits_start, required_for_output, is_status
.env – Optional: Additional keys to add to the template environment.
ready – Optional: A repository from which to read task-ready status.
func – Optional: The async function to run as the task body, if you don’t want to use this task as a decorator.
It is highly recommended to provide at least one of
kube_done
,func_done
, orlogs
, so that at least one link is present withinhibits_start
.