A Task is a unit of execution which can act on multiple repositories.

You define a task by instantiating a Task subclass and passing it to a Pipeline object.

Tasks are related to Repositories by Links. Links are created by
``"my_link_name", my_repository, **disposition)``. The main disposition kwargs you'll want to use are
``is_input`` and ``is_output``. See `` for more information.

For a shortcut for linking the output of one task as the input of another task, see `Task.plug`.

from typing import (
from abc import abstractmethod
from asyncio import Future
from concurrent.futures import FIRST_EXCEPTION, Executor, wait
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from enum import Enum, auto
from pathlib import Path
import asyncio
import copy
import inspect
import logging
import os
import sys
import traceback

from kubernetes_asyncio.client import ApiException, V1Pod
import aiofiles.os
import jinja2.async_utils
import jinja2.compiler
import yaml

from .pod_manager import PodManager
from .repository import (
from .resource_manager import ResourceManager, Resources
from .utils import async_copyfile

    from .proc_manager import AbstractProcessManager

l = logging.getLogger(__name__)

__all__ = (

class RepoHandlingMode(Enum):
    Mode for the `build_env` function.

    LAZY = Build repositories and links into repository references
    SMART = Build repositories and links into un-awaited `info` coroutines.
    EAGER = Build repositories and links into awaited `info` values.

    LAZY = auto()
    SMART = auto()
    EAGER = auto()

async def build_env(env: Dict[str, Any], job: str, mode: RepoHandlingMode) -> Dict[str, Any]:
    Given a mapping from environment key names to values that can be used during templating. The way repositories are
    transformed into job-related information is controlled by the `RepoHandlingMode`.
    result = {}
    for key, val in env.items():
        if isinstance(val, Repository):
            repo = val
        elif isinstance(val, Link):
            repo = val.repo
            result[key] = val
        if mode == RepoHandlingMode.SMART:
            result[key] = repo
        elif mode == RepoHandlingMode.LAZY:
            result[key] =
            result[key] = await
    return result

async def render_template(template, env: Dict[str, Any]):
    Given a template and an environment, use jinja2 to parameterize the template with the environment.

    :param template:  A template string, or a path to a local template file.
    :param env:       A mapping from environment key names to values.
    :return:          The rendered template, as a string.
    j = jinja2.Environment(
    j.code_generator_class = ParanoidAsyncGenerator
    if await aiofiles.os.path.isfile(template):
        async with, "r") as fp:
            template_str = await
        template_str = template
    templating = j.from_string(template_str)
    return await templating.render_async(**env)

[docs]class Task: """ The Task base class. """ def __init__(self, name: str, ready: Optional[Repository] = None, disabled: bool = False): = name self._ready = ready self.links: Dict[str, Link] = {} self.synchronous = False self.metadata = True self.disabled = disabled def __repr__(self): return f"<{type(self).__name__} {}>" @property def ready(self): """ Return the repository whose job membership is used to determine whether a task instance should be launched. If an override is provided to the constructor, this is that, otherwise it is ``AND(*requred_for_start, NOT(OR(*inhibits_start)))``. """ if self._ready is not None: return self._ready return BlockingRepository( AggregateAndRepository(**self.required_for_start), AggregateOrRepository(**self.inhibits_start), )
[docs] def plug( self, output: "Task", output_links: Optional[Iterable[str]] = None, meta: bool = True, translator: Optional[Repository] = None, translate_allow_deletes=False, translate_prefetch_lookup=True, ): """ Link the output repositories from `output` as inputs to this task. :param output: The task to plug into this one. :param output_links: Optional: An iterable allowlist of links to only use. :param meta: Whether to transfer repository inhibit- and required_for- dispositions. Default True. :param translator: Optional: A repository used to transform job identifiers. If this is provided, a job will be said to be present in the resulting linked repository if the source repository contains the key ``await``. :param translate_allow_deletes: If translator is provided, this controls whether attempts to delete from the translated repository will do anything. Default False. :param translate_prefetch_lookup: If translator is provided, this controls whether the translated repository will pre-fetch the list of translations on first access. Default True. """ for name, link in output.links.items(): link_attrs = {} if link.inhibits_output and meta: link_attrs["inhibits_start"] = True if link.is_output and (output_links is None or name in output_links): link_attrs["is_input"] = True if link.required_for_output and meta: link_attrs["required_for_start"] = True if not link.is_output: name = f"{}_{name}" if link_attrs: repo = link.repo if translator is not None: repo = RelatedItemRepository( repo, translator, allow_deletes=translate_allow_deletes, prefetch_lookup=translate_prefetch_lookup, ), repo, **link_attrs)
@property def input(self): """ A mapping from link name to repository for all links marked ``is_input``. """ return {name: link.repo for name, link in self.links.items() if link.is_input} @property def output(self): """ A mapping from link name to repository for all links marked ``is_output``. """ return {name: link.repo for name, link in self.links.items() if link.is_output} @property def status(self): """ A mapping from link name to repository for all links marked ``is_status``. """ return {name: link.repo for name, link in self.links.items() if link.is_status} @property def inhibits_start(self): """ A mapping from link name to repository for all links marked ``inhibits_start``. """ return {name: link.repo for name, link in self.links.items() if link.inhibits_start} @property def required_for_start(self): """ A mapping from link name to repository for all links marked ``required_for_start``. """ return {name: link.repo for name, link in self.links.items() if link.required_for_start} @property def inhibits_output(self): """ A mapping from link name to repository for all links marked ``inhibits_output``. """ return {name: link.repo for name, link in self.links.items() if link.inhibits_output} @property def required_for_output(self): """ A mapping from link name to repository for all links marked ``required_for_output``. """ return {name: link.repo for name, link in self.links.items() if link.required_for_output}
[docs] async def validate(self): """ Raise an exception if for any reason the task is misconfigured. This is guaranteed to be called exactly once per pipeline, so it is safe to use for setup and initialization in an async context. """
[docs] @abstractmethod async def update(self) -> bool: """ Part one of the pipeline maintenance loop. Override this to perform any maintenance operations on the set of live tasks. Typically, this entails reaping finished processes. Returns True if literally anything interesting happened, or if there are any live tasks. """ raise NotImplementedError
[docs] @abstractmethod async def launch(self, job): """ Launch a job. Override this to begin execution of the provided job. Error handling will be done for you. """ raise NotImplementedError
class ParanoidAsyncGenerator(jinja2.compiler.CodeGenerator): """ A class to instrument jinja2 to be more aggressive about awaiting objects and to cache their results. Probably don't use this directly. """ def write_commons(self): self.writeline("from jinja2.async_utils import _common_primitives") self.writeline("import inspect") self.writeline("seen = {}") self.writeline("async def auto_await2(value):") self.writeline(" if type(value) in _common_primitives:") self.writeline(" return value") self.writeline(" if inspect.isawaitable(value):") self.writeline(" cached = seen.get(value)") self.writeline(" if cached is None:") self.writeline(" cached = await value") self.writeline(" seen[value] = cached") self.writeline(" return cached") self.writeline(" return value") super().write_commons() def visit_Name(self, node, frame): if self.environment.is_async: self.write("(await auto_await2(") super().visit_Name(node, frame) if self.environment.is_async: self.write("))")
[docs]class KubeTask(Task): """ A task which runs a kubernetes pod. Will automatically link a `LiveKubeRepository` as "live" with ``inhibits_start, inhibits_output, is_status`` """ def __init__( self, name: str, podman: Callable[[], PodManager], resources: ResourceManager, template: Union[str, Path], logs: Optional[BlobRepository], done: Optional[MetadataRepository], window: timedelta = timedelta(minutes=1), timeout: Optional[timedelta] = None, env: Optional[Dict[str, Any]] = None, ready: Optional[Repository] = None, ): """ :param name: The name of the task. :param podman: A callable returning a PodManager to use to connect to the cluster. :param resources: A ResourceManager instance. Tasks launched will contribute to its quota and be denied if they would break the quota. :param template: YAML markup for a pod manifest template, either as a string or a path to a file. :param logs: Optional: A BlobRepository to dump pod logs to on completion. Linked as "logs" with ``inhibits_start, required_for_output, is_status``. :param done: A MetadataRepository in which to dump some information about the pod's lifetime and termination on completion. Linked as "done" with ``inhibits_start, required_for_output, is_status``. :param window: Optional: How far back into the past to look in order to determine whether we have recently launched too many pods too quickly. :param timeout: Optional: When a pod is found to have been running continuously for this amount of time, it will be timed out and stopped. The method `handle_timeout` will be called in-process. :param env: Optional: Additional keys to add to the template environment. :param ready: Optional: A repository from which to read task-ready status. It is highly recommended to provide one or more of ``done`` or ``logs`` so that at least one link is present with ``inhibits_start``. """ super().__init__(name, ready) self.template = template self.resources = resources self._podman = podman self.logs = logs self.timeout = timeout self.done = done self.env = env if env is not None else {} self.warned = False self.window = window self.resources.register(self._get_load) "live", LiveKubeRepository(self), is_status=True, inhibits_start=True, inhibits_output=True, ) if logs: "logs", logs, is_status=True, inhibits_start=True, required_for_output=True, ) if done: "done", done, is_status=True, inhibits_start=True, required_for_output=True, ) @property def podman(self) -> PodManager: """ The pod manager instance for this task. Will raise an error if the manager is provided by an unopened session. """ return self._podman() async def _get_load(self): usage = Resources() cutoff = - self.window try: for pod in await self.podman.query( if pod.metadata.creation_timestamp > cutoff: usage += Resources(launches=1) for container in pod.spec.containers: usage += Resources.parse( container.resources.requests["cpu"], container.resources.requests["memory"], 0 ) except ApiException as e: if e.reason != "Forbidden": raise return usage
[docs] async def launch(self, job): env_input = dict(vars(self)) env_input.update(self.links) env_input.update(self.env) env = await build_env(env_input, job, RepoHandlingMode.LAZY) env["job"] = job env["task"] = env["argv0"] = os.path.basename(sys.argv[0]) manifest = yaml.safe_load(await render_template(self.template, env)) for item in env.values(): if asyncio.iscoroutine(item): item.close() assert manifest["kind"] == "Pod" spec = manifest["spec"] spec["restartPolicy"] = "Never" request = Resources(launches=1) for container in spec["containers"]: request += Resources.parse( container["resources"]["requests"]["cpu"], container["resources"]["requests"]["memory"], 0, ) limit = await self.resources.reserve(request) if limit is None: try: await self.podman.launch(job,, manifest) except: await self.resources.relinquish(request) raise elif not self.warned: l.warning("Cannot launch %s: %s limit", self, limit) self.warned = True
async def _cleanup(self, pod: V1Pod, reason: str): job = pod.metadata.labels["job"] if self.logs is not None: async with await, "w") as fp: try: await fp.write(await self.podman.logs(pod)) except (TimeoutError, ApiException): await fp.write("<failed to fetch logs>\n") if self.done is not None: data = { "reason": reason, "start_time": pod.metadata.creation_timestamp, "end_time":, "image": pod.status.container_statuses[0].image, "node": pod.spec.node_name, } await self.done.dump(job, data) await self.delete(pod)
[docs] async def delete(self, pod: V1Pod): """ Kill a pod and relinquish its resources without marking the task as complete. """ request = Resources(launches=1) for container in pod.spec.containers: request += Resources.parse(container.resources.requests["cpu"], container.resources.requests["memory"], 0) await self.podman.delete(pod) await self.resources.relinquish(request)
[docs] async def update(self): self.warned = False result = False pods = await self.podman.query( result = bool(pods) jobs = [self._update_one(pod) for pod in pods] await asyncio.gather(*jobs) return result
async def _update_one(self, pod: V1Pod): try: uptime: timedelta = - pod.metadata.creation_timestamp total_min = uptime.total_seconds() // 60 uptime_hours, uptime_min = divmod(total_min, 60) l.debug( "Pod %s is alive for %dh%dm",, uptime_hours, uptime_min, ) if pod.status.phase in ("Succeeded", "Failed"): l.debug("...finished: %s", pod.status.phase) await self._cleanup(pod, pod.status.phase) elif self.timeout is not None and uptime > self.timeout: l.debug("...timed out") await self.handle_timeout(pod) await self._cleanup(pod, "Timeout") except Exception: l.exception("Failed to update kube task %s:%s",,
[docs] async def handle_timeout(self, pod: V1Pod): """ You may override this method in a subclass, and it will be called whenever a pod times out. You can use this method to e.g. scrape in-progress data out of the pod via an exec. """
class _StderrIsStdout: pass STDOUT = _StderrIsStdout()
[docs]class ProcessTask(Task): """ A task that runs a script. The interpreter is specified by the shebang, or the default shell if none present. The execution environment for the task is defined by the ProcessManager instance provided as an argument. """ def __init__( self, name: str, manager: Callable[[], "AbstractProcessManager"], resource_manager: "ResourceManager", job_resources: "Resources", pids: MetadataRepository, template: str, window: timedelta = timedelta(minutes=1), environ: Optional[Dict[str, str]] = None, done: Optional[MetadataRepository] = None, stdin: Optional[BlobRepository] = None, stdout: Optional[BlobRepository] = None, stderr: Optional[Union[BlobRepository, _StderrIsStdout]] = None, ready: Optional[Repository] = None, ): """ :param name: The name of this task. :param manager: A callable returnint the process manager with control over the target execution environment. :param resource_manager: A ResourceManager instance. Tasks launched will contribute to its quota and be denied if they would break the quota. :param job_resources: The amount of resources an individual job should contribute to the quota. Note that this is currently **not enforced** target-side, so jobs may actually take up more resources than assigned. :param pids: A metadata repository used to store the current live-status of processes. Will automatically be linked as "pids" with ``is_status, inhibits_start, inhibits_output``. :param template: YAML markup for the template of a script to run, either as a string or a path to a file. :param environ: Additional environment variables to set on the target machine before running the task. :param window: How recently a process must have been launched in order to contribute to the process rate-limiting. :param done: Optional: A metadata repository in which to dump some information about the process's lifetime and termination on completion. Linked as "done" with ``inhibits_start, required_for_output, is_status``. :param stdin: Optional: A blob repository from which to source the process' standard input. The content will be preloaded and transferred to the target environment, so the target does not need to be authenticated to this repository. Linked as "stdin" with ``is_input``. :param stdout: Optional: A blob repository into which to dump the process' standard output. The content will be transferred from the target environment on completion, so the target does not need to be authenticated to this repository. Linked as "stdout" with ``is_output``. :param stderr: Optional: A blob repository into which to dump the process' standard error, or the constant `pydatatask.task.STDOUT` to indicate that the stream should be interleaved with stdout. Otherwise, the content will be transferred from the target environment on completion, so the target does not need to be authenticated to this repository. Linked as "stderr" with ``is_output``. :param ready: Optional: A repository from which to read task-ready status. It is highly recommended to provide at least one of ``done``, ``stdout``, or ``stderr``, so that at least one link is present with ``inhibits_start``. """ super().__init__(name, ready=ready) self.pids = pids self.template = template self.environ = environ self.done = done self.stdin = stdin self.stdout = stdout self._stderr = stderr self.job_resources = copy.copy(job_resources) self.job_resources.launches = 1 self.resource_manager = resource_manager self._manager = manager self.warned = False self.window = window self.resource_manager.register(self._get_load)"pids", pids, is_status=True, inhibits_start=True, inhibits_output=True) if stdin is not None:"stdin", stdin, is_input=True) if stdout is not None:"stdout", stdout, is_output=True) if isinstance(stderr, BlobRepository):"stderr", stderr, is_output=True) if done is not None:"done", done, is_status=True, required_for_output=True, inhibits_start=True) @property def manager(self) -> "AbstractProcessManager": """ The process manager for this task. Will raise an error if the manager comes from a session which is closed. """ return self._manager() async def _get_load(self) -> "Resources": cutoff = - self.window count = 0 recent = 0 job_map = await self.pids.info_all() for v in job_map.values(): count += 1 if v["start_time"] > cutoff: recent += 1 return self.job_resources * count - Resources(launches=count - recent) @property def stderr(self) -> Optional[BlobRepository]: """ The repository into which stderr will be dumped, or None if it will go to the null device. """ if isinstance(self._stderr, BlobRepository): return self._stderr else: return self.stdout @property def _unique_stderr(self): return self._stderr is not None and not isinstance(self._stderr, _StderrIsStdout) @property def basedir(self) -> Path: """ The path in the target environment that will be used to store information about this task. """ return self.manager.basedir /
[docs] async def update(self): self.warned = False job_map = await self.pids.info_all() pid_map = {meta["pid"]: job for job, meta in job_map.items()} expected_live = set(pid_map) try: live_pids = await self.manager.get_live_pids(expected_live) except Exception: l.error("Could not load live PIDs for %s", self, exc_info=True) else: died = expected_live - live_pids coros = [] for pid in died: job = pid_map[pid] start_time = job_map[job]["start_time"] coros.append(self._reap(job, start_time)) await asyncio.gather(*coros) return bool(expected_live)
[docs] async def launch(self, job): limit = await self.resource_manager.reserve(self.job_resources) if limit is not None: if not self.warned: l.warning("Cannot launch %s: %s limit", self, limit) self.warned = True return pid = None dirmade = False try: cwd = self.basedir / job / "cwd" await self.manager.mkdir(cwd) # implicitly creates basedir / task / job dirmade = True stdin = None if self.stdin is not None: stdin = self.basedir / job / "stdin" async with await, "rb") as fpr, await, "wb") as fpw: await async_copyfile(fpr, fpw) stdin = str(stdin) stdout = None if self.stdout is None else str(self.basedir / job / "stdout") stderr = STDOUT if not isinstance(self._stderr, _StderrIsStdout): stderr = None if self._stderr is None else str(self.basedir / job / "stderr") env_src = dict(self.links) env_src["job"] = job env_src["task"] = env = await build_env(env_src, job, RepoHandlingMode.LAZY) exe_path = self.basedir / job / "exe" exe_txt = await render_template(self.template, env) for item in env.values(): if asyncio.iscoroutine(item): item.close() async with await, "w") as fp: await fp.write(exe_txt) pid = await self.manager.spawn( [str(exe_path)], self.environ, str(cwd), str(self.basedir / job / "return_code"), stdin, stdout, stderr ) if pid is not None: await self.pids.dump(job, {"pid": pid, "start_time":}) except: # CLEAN UP YOUR MESS try: await self.resource_manager.relinquish(self.job_resources) except: pass try: if pid is not None: await self.manager.kill(pid) except: pass try: if dirmade: await self.manager.rmtree(self.basedir / job) except: pass raise
async def _reap(self, job: str, start_time: datetime): try: if self.stdout is not None: async with await / job / "stdout", "rb") as fpr, await job, "wb" ) as fpw: await async_copyfile(fpr, fpw) if isinstance(self._stderr, BlobRepository): async with await / job / "stderr", "rb") as fpr, await job, "wb" ) as fpw: await async_copyfile(fpr, fpw) if self.done is not None: async with await / job / "return_code", "r") as fp1: code = int(await await self.done.dump( job, {"return_code": code, "start_time": start_time, "end_time":} ) await self.manager.rmtree(self.basedir / job) await self.pids.delete(job) await self.resource_manager.relinquish(self.job_resources) except Exception: l.error("Could not reap process for %s:%s", self, job, exc_info=True)
class FunctionTaskProtocol(Protocol): """ The protocol which is expected by the tasks which take a python function to execute. """ def __call__(self, job: str, **kwargs) -> Awaitable[None]: ...
[docs]class InProcessSyncTask(Task): """ A task which runs in-process. Typical usage of this task might look like the following: .. code:: python @pydatatask.InProcessSyncTask("my_task", done_repo) async def my_task(job: str, inp: pydatatask.MetadataRepository, out: pydatatask.MetadataRepository): await out.dump(job, await"inp", repo_input, is_input=True)"out", repo_output, is_output=True) """ def __init__( self, name: str, done: MetadataRepository, ready: Optional[Repository] = None, func: Optional[FunctionTaskProtocol] = None, ): """ :param name: The name of this task. :param done: A metadata repository to store some information about a job's runtime and termination on completion. :param ready: Optional: A repository from which to read task-ready status. :param func: Optional: The async function to run as the task body, if you don't want to use this task as a decorator. """ super().__init__(name, ready=ready) self.done = done self.func = func"done", done, is_status=True, inhibits_start=True, required_for_output=True) self._env: Dict[str, Any] = {} def __call__(self, f: FunctionTaskProtocol) -> "InProcessSyncTask": self.func = f return self
[docs] async def validate(self): if self.func is None: raise ValueError("InProcessSyncTask.func is None") sig = inspect.signature(self.func, follow_wrapped=True) for name in sig.parameters.keys(): if name == "job": self._env[name] = None elif name in self.links: self._env[name] = self.links[name].repo else: raise NameError("%s takes parameter %s but no such argument is available" % (self.func, name))
[docs] async def update(self): pass
[docs] async def launch(self, job): start_time = l.debug("Launching in-process %s:%s...",, job) args = dict(self._env) if "job" in args: args["job"] = job args = await build_env(args, job, RepoHandlingMode.SMART) try: await self.func(**args) except Exception as e:"In-process task %s:%s failed",, job, exc_info=True) result = { "result": "exception", "exception": repr(e), "traceback": traceback.format_tb(e.__traceback__), } else: l.debug("...success") result = {"result": "success"} result["start_time"] = start_time result["end_time"] = if self.metadata: await self.done.dump(job, result)
[docs]class ExecutorTask(Task): """ A task which runs python functions in a :external:class:`concurrent.futures.Executor`. This has not been tested on anything but the :external:class:`concurrent.futures.ThreadPoolExecutor`, so beware! See `InProcessSyncTask` for information on how to use instances of this class as decorators for their bodies. It is expected that the executor will perform all necessary resource quota management. """ def __init__( self, name: str, executor: Executor, done: MetadataRepository, ready: Optional[Repository] = None, func: Optional[Callable] = None, ): """ :param name: The name of this task. :param executor: The executor to run jobs in. :param done: A metadata repository to store some information about a job's runtime and termination on completion. :param ready: Optional: A repository from which to read task-ready status. :param func: Optional: The async function to run as the task body, if you don't want to use this task as a decorator. """ super().__init__(name, ready) self.executor = executor self.func = func Dict[Future, Tuple[str, datetime]] = {} self.rev_jobs: Dict[str, Future] = {} = ExecutorLiveRepo(self) self.done = done self._env: Dict[str, Any] = {}"live",, is_status=True, inhibits_output=True, inhibits_start=True) "done", self.done, is_status=True, required_for_output=True, inhibits_start=True, ) def __call__(self, f: Callable) -> "ExecutorTask": self.func = f return self
[docs] async def update(self): result = bool( done, _ = wait(, 0, FIRST_EXCEPTION) coros = [] for finished_job in done: job, start_time = # noinspection PyAsyncCall self.rev_jobs.pop(job) coros.append(self._cleanup(finished_job, job, start_time)) await asyncio.gather(*coros) return result
async def _cleanup(self, job_future, job, start_time): e = job_future.exception() if e is not None:"Executor task %s:%s failed",, job, exc_info=e) data = { "result": "exception", "exception": repr(e), "traceback": traceback.format_tb(e.__traceback__), "end_time":, } else: l.debug("...executor task %s:%s success",, job) data = {"result": "success", "end_time": job_future.result()} data["start_time"] = start_time await self.done.dump(job, data)
[docs] async def validate(self): if self.func is None: raise ValueError("InProcessAsyncTask %s has func None" % sig = inspect.signature(self.func, follow_wrapped=True) for name in sig.parameters.keys(): if name == "job": self._env[name] = None elif name in self.links: self._env[name] = self.links[name].repo else: raise NameError("%s takes parameter %s but no such argument is available" % (self.func, name))
[docs] async def launch(self, job): l.debug("Launching %s:%s with %s...",, job, self.executor) args = dict(self._env) if "job" in args: args["job"] = job args = await build_env(args, job, RepoHandlingMode.SMART) start_time = running_job = self.executor.submit(self._timestamped_func, self.func, args) if self.synchronous: while not running_job.done(): await asyncio.sleep(0.1) await self._cleanup(running_job, job, start_time) else:[running_job] = (job, start_time) self.rev_jobs[job] = running_job
[docs] async def cancel(self, job): """ Stop the current job from running, or do nothing if it is not running. """ future = self.rev_jobs.pop(job) if future is not None: future.cancel()
@staticmethod def _timestamped_func(func, args): loop = asyncio.new_event_loop() loop.run_until_complete(func(**args)) return
[docs]class KubeFunctionTask(KubeTask): """ A task which runs a python function on a kubernetes cluster. Requires a pod template which will execute a python script calling `pydatatask.main.main`. This works by running ``python3 launch [task] [job] --sync``. Sample usage: .. code:: python @KubeFunctionTask( "my_task", podman, resman, ''' apiVersion: v1 kind: Pod spec: containers: - name: leader image: "" command: - python3 - {{argv0}} - launch - "{{task}}" - "{{job}}" - "--force" - "--sync" resources: requests: cpu: 100m memory: 256Mi ''', repo_logs, repo_done, repo_func_done, ) async def my_task(job: str, inp: pydatatask.MetadataRepository, out: pydatatask.MetadataRepository): await out.dump(job, await"inp", repo_input, is_input=True)"out", repo_output, is_output=True) """ def __init__( self, name: str, podman: Callable[[], PodManager], resources: ResourceManager, template: Union[str, Path], logs: Optional[BlobRepository] = None, kube_done: Optional[MetadataRepository] = None, func_done: Optional[MetadataRepository] = None, env: Optional[Dict[str, Any]] = None, func: Optional[Callable] = None, ): """ :param name: The name of this task. :param podman: A callable returning a PodManager to use to connect to the cluster. :param resources: A ResourceManager instance. Tasks launched will contribute to its quota and be denied if they would break the quota. :param template: YAML markup for a pod manifest template that will run `pydatatask.main.main` as ``python3 launch [task] [job] --sync --force``, either as a string or a path to a file. :param logs: Optional: A BlobRepository to dump pod logs to on completion. Linked as "logs" with ``inhibits_start, required_for_output, is_status``. :param kube_done: Optional: A MetadataRepository in which to dump some information about the pod's lifetime and termination on completion. Linked as "done" with ``inhibits_start, required_for_output, is_status``. :param func_done: Optional: A MetadataRepository in which to dump some information about the function's lifetime and termination on completion. Linked as "func_done" with ``inhibits_start, required_for_output, is_status``. :param env: Optional: Additional keys to add to the template environment. :param ready: Optional: A repository from which to read task-ready status. :param func: Optional: The async function to run as the task body, if you don't want to use this task as a decorator. It is highly recommended to provide at least one of ``kube_done``, ``func_done``, or ``logs``, so that at least one link is present with ``inhibits_start``. """ super().__init__(name, podman, resources, template, logs, kube_done, env=env) self.func = func self.func_done = func_done if func_done is not None: "func_done", func_done, required_for_output=True, is_status=True, inhibits_start=True, ) self._func_env: Dict[str, Any] = {} def __call__(self, f: Callable) -> "KubeFunctionTask": self.func = f return self
[docs] async def validate(self): if self.func is None: raise ValueError("KubeFunctionTask %s has func None" % sig = inspect.signature(self.func, follow_wrapped=True) for name in sig.parameters.keys(): if name == "job": self._func_env[name] = None elif name in self.links: self._func_env[name] = self.links[name].repo else: raise NameError("%s takes parameter %s but no such argument is available" % (self.func, name))
[docs] async def launch(self, job): if self.synchronous: await self._launch_sync(job) else: await super().launch(job)
async def _launch_sync(self, job): start_time = l.debug("Launching --sync %s:%s...",, job) args = dict(self._func_env) if "job" in args: args["job"] = job args = await build_env(args, job, RepoHandlingMode.SMART) try: await self.func(**args) except Exception as e:"--sync task %s:%s failed",, job, exc_info=True) result = { "result": "exception", "exception": repr(e), "traceback": traceback.format_tb(e.__traceback__), } else: l.debug("...success") result = {"result": "success"} result["start_time"] = start_time result["end_time"] = if self.metadata: await self.func_done.dump(job, result)