pydatatask.main module#

The top-level script you write using pydatatask should call pydatatask.main.main in its if __name__ == '__main__' block. This will parse sys.argv and display the administration interface for the pipeline.

The help screen should look something like this:

$ python3 main.py --help
  usage: main.py [-h] {update,run,status,trace,rm,ls,cat,inject,launch,shell} ...

positional arguments:
  {update,run,status,trace,rm,ls,cat,inject,launch,shell}
    update              Keep the pipeline in motion
    run                 Run update in a loop until everything is quiet
    status              View the pipeline status
    trace               Track a job's progress through the pipeline
    rm                  Delete data from the pipeline
    ls                  List jobs in a repository
    cat                 Print data from a repository
    inject              Dump data into a repository
    launch              Manually start a task
    shell               Launch an interactive shell to interrogate the pipeline

options:
  -h, --help            show this help message and exit
pydatatask.main.main(pipeline: Pipeline, instrument: Callable[[_SubParsersAction], None] | None = None)[source]#

The pydatatask main function! Call this with the pipeline you’ve constructed to parse sys.argv and display the pipeline administration interface.

If you like, you can pass as the instrument argument a function which will add additional commands to the menu.

async pydatatask.main.update(pipeline: Pipeline)[source]#
async pydatatask.main.cat_data(pipeline: Pipeline, data: str, job: str)[source]#
async pydatatask.main.list_data(pipeline: Pipeline, data: List[str])[source]#
async pydatatask.main.delete_data(pipeline: Pipeline, data: str, recursive: bool, job: List[str])[source]#
async pydatatask.main.inject_data(pipeline: Pipeline, data: str, job: str)[source]#
async pydatatask.main.print_status(pipeline: Pipeline, all_repos: bool)[source]#
async pydatatask.main.print_trace(pipeline: Pipeline, all_repos: bool, job: List[str])[source]#
async pydatatask.main.launch(pipeline: Pipeline, task_name: str, job: str, sync: bool, meta: bool, force: bool)[source]#
pydatatask.main.shell(pipeline: Pipeline)[source]#
async pydatatask.main.run(pipeline: Pipeline, forever: bool, launch_once: bool, timeout: float | None)[source]#