pydatatask.proc_manager module#

A process manager can be used with a ProcessTask to specify where and how the processes should run. All a process manager needs to specify is how to launch a process and manipulate the filesystem, and the ProcessTask will set up an appropriate environment for running the task and retrieve the results using this interface.

class pydatatask.proc_manager.AbstractProcessManager[source]#

Bases: object

The base class for process managers.

Processes are managed through arbitrary “process identifier” handle strings.

abstract async get_live_pids(hint: Set[str]) Set[str][source]#

Get a set of the live process identifiers. This may include processes which are not part of the current app/task, but MUST include all the processes in hint which are still alive.

abstract async spawn(args: List[str], environ: Dict[str, str], cwd: str, return_code: str, stdin: str | None, stdout: str | None, stderr: str | _StderrIsStdout | None) str[source]#

Launch a process on the target system. This function MUST NOT wait until the process has terminated before returning.

Parameters:
  • args – The command line of the process to launch. args[0] is the executable to run.

  • environ – A set of environment variables to add to the target process’ environment.

  • cwd – The directory to launch the process in.

  • return_code – A filepath in which to store the process exit code, as an ascii integer.

  • stdin – A filepath from which to read the process’ stdin, or None for a null file.

  • stdout – A filepath to which to write the process’ stdout, or None for a null file.

  • stderr – A filepath to which to write the process’ stderr, None for a null file, or the constant pydatatask.task.STDOUT to interleave it into the stdout stream.

Returns:

The process identifier of the process spawned.

abstract async kill(pid: str)[source]#

Terminate the process with the given identifier. This should do nothing if the process is not currently running. This should not prevent e.g. the return_code file from being populated.

abstract property basedir: Path#

A path on the target system to a directory which can be freely manipulated by the app.

abstract async open(path: Path, mode: Literal['r', 'rb', 'w', 'wb']) AReadText | AWriteText | AReadStream | AWriteStream[source]#

Open the file on the target system for reading or writing according to the given mode.

abstract async mkdir(path: Path)[source]#

Create a directory with the given path on the target system.

This should have the semantics of mkdir -p, i.e. create any necessary parent directories and also succeed if the directory already exists.

abstract async rmtree(path: Path)[source]#

Remove a directory and all its children with the given path on the target system.

class pydatatask.proc_manager.LocalLinuxManager(app: str, local_path: Path | str = '/tmp/pydatatask')[source]#

Bases: AbstractProcessManager

A process manager to run tasks on the local linux machine. By default, it will create a directory /tmp/pydatatask in which to store data.

property basedir: Path#
async get_live_pids(hint: Set[str]) Set[str][source]#
async spawn(args, environ, cwd, return_code, stdin, stdout, stderr)[source]#
async kill(pid: str)[source]#
async mkdir(path: Path)[source]#
async rmtree(path: Path)[source]#
async open(path, mode)[source]#
class pydatatask.proc_manager.SSHLinuxManager(app: str, ssh: Callable[[], SSHClientConnection], remote_path: Path | str = '/tmp/pydatatask')[source]#

Bases: AbstractProcessManager

A process manager that runs its processes on a remote linux machine, accessed over SSH. The SSH connection is parameterized by an asyncssh.connection.SSHClientConnection instance.

Sample usage:

session = pydatatask.Session()

@session.resource
async def ssh():
    async with asyncssh.connect(
        "localhost", port=self.port, username="weh", password="weh", known_hosts=None
    ) as s:
        yield s

@session.resource
async def procman():
    yield pydatatask.SSHLinuxManager(self.test_id, ssh)

task = pydatatask.ProcessTask("mytask", procman, ...)
property ssh: SSHClientConnection#

The asyncssh.SSHClientConnection instance associated. Will fail if the connection is provided by an unopened Session.

property basedir: Path#
async open(path, mode)[source]#
async rmtree(path: Path)[source]#
async mkdir(path: Path)[source]#
async kill(pid: str)[source]#
async get_live_pids(hint)[source]#
async spawn(args, environ, cwd, return_code, stdin, stdout, stderr)[source]#