pydatatask.pipeline module#

A pipeline is just an unordered collection of tasks. Relationships between the tasks are implicit, defined by which repositories they share.

class pydatatask.pipeline.Pipeline(tasks: Iterable[Task], session: Session, resources: Iterable[ResourceManager], priority: Callable[[str, str], int] | None = None)[source]#

Bases: object

The pipeline class.

  • tasks – The tasks which make up this pipeline.

  • session – The session to open while this pipeline is active.

  • resources – Any resource managers in use. You need to provide these so the pipeline can reset the rate-limiting at each update.

  • priority – Optional: A function which takes a task and job name and returns an integer priority. No jobs will be scheduled unless all higher-priority jobs (larger numbers) have already been scheduled.

settings(synchronous=False, metadata=True)[source]#

This method can be called to set properties of the current run.

  • synchronous – Whether jobs will be started and completed in-process, waiting for their completion before a launch phase succeeds.

  • metadata – Whether jobs will store their completion metadata.

async open()[source]#

Opens the pipeline and its associated resource session. This will be automatically when entering an async with pipeline: block.

async close()[source]#

Closes the pipeline and its associated resource session. This will be automatically called when exiting an async with pipeline: block.

async update() bool[source]#

Perform one round of pipeline maintenance, running the update phase and then the launch phase. The pipeline must be opened for this function to run.


Whether there is any activity in the pipeline.

async update_only_update() bool[source]#

Perform one round of the update phase of pipeline maintenance. The pipeline must be opened for this function to run.


Whether there were any live jobs.

async update_only_launch() bool[source]#

Perform one round of the launch phase of pipeline maintenance. The pipeline must be opened for this function to run.


Whether there were any jobs launched or ready to launch.

async gather_ready_jobs(task: Task) Set[str][source]#

Collect all jobs that are ready to be launched for a given task.

graph() DiGraph[source]#

Generate the dependency graph for a pipeline. This is a directed graph containing both repositories and tasks as nodes.

dependants(node: Repository | Task, recursive: bool) Iterable[Repository | Task][source]#

Iterate the list of repositories that are dependent on the given node of the dependency graph.

  • node – The starting point of the graph traversal.

  • recursive – Whether to return only direct dependencies (False) or transitive dependencies (True) of node.