pydatatask.repository module#

Repositories are arbitrary key-value stores. They are the data part of pydatatask. You can store your data in any way you desire and as long as you can write a Repository class to describe it, it can be used to drive a pipeline.

The notion of the “value” part of the key-value store abstraction is defined very, very loosely. The repository base class doesn’t have an interface to get or store values, only to query for and delete keys. Instead, you have to know which repository subclass you’re working with, and use its interfaces. For example, MetadataRepository assumes that its values are structured objects and loads them fully into memory, and BlobRepository provides a streaming interface to a flat address space.

class pydatatask.repository.Repository[source]#

Bases: ABC

A repository is a key-value store where the keys are names of jobs. Since the values have unspecified semantics, the only operations you can do on a generic repository are query for keys.

A repository can be async-iterated to get a listing of its members.

CHARSET = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'#
CHARSET_START_END = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'#
classmethod is_valid_job_id(job: str)[source]#

Determine whether the given job identifier is valid, i.e. that it contains only valid characters (numbers and letters by default).

async filter_jobs(iterator: AsyncIterable[str]) AsyncIterable[str][source]#

Apply is_valid_job_id as a filter to an async iterator.

async contains(item)[source]#

Determine whether the given job identifier is present in this repository.

The default implementation is quite inefficient; please override this if possible.

abstract async unfiltered_iter() AsyncGenerator[str, None][source]#

The core method of Repository. Implement this to produce an iterable of every string which could potentially be a job identifier present in this repository. When the repository is iterated directly, this will be filtered by filter_jobs.

abstract async info(job) Any[source]#

Returns an arbitrary piece of data related to job. Notably, this is used during templating. This should do something meaningful even if the repository does not contain the requested job.

abstract async delete(job)[source]#

Delete the given job from the repository. This should succeed even if the job is not present in this repository.

async info_all() Dict[str, Any][source]#

Produce a mapping from every job present in the repository to its corresponding info. The default implementation is somewhat inefficient; please override it if there is a more effective way to load all info.

async validate()[source]#

Override this method to raise an exception if for any reason the repository is misconfigured. This will be automatically called by the pipeline on opening.

map(func: Callable, filt: Callable[[str], Awaitable[bool]] | None = None, allow_deletes=False) MapRepository[source]#

Generate a MapRepository based on this repository and the given parameters.

class pydatatask.repository.BlobRepository[source]#

Bases: Repository, ABC

A blob repository has values which are flat data blobs that can be streamed for reading or writing.

abstract async open(job: str, mode: Literal['r']) AReadText[source]#
abstract async open(job: str, mode: Literal['rb']) AReadStream
abstract async open(job: str, mode: Literal['w']) AWriteText
abstract async open(job: str, mode: Literal['wb']) AWriteStream

Open the given job’s value as a stream for reading or writing, in text or binary mode.

class pydatatask.repository.MetadataRepository[source]#

Bases: Repository, ABC

A metadata repository has values which are small, structured data, and loads them entirely into memory, returning the structured data from the info method.

abstract async info(job)[source]#

Retrieve the data with key job from the repository.

abstract async dump(job, data)[source]#

Insert data into the repository with key job.

class pydatatask.repository.FileRepositoryBase(basedir, extension='', case_insensitive=False)[source]#

Bases: Repository, ABC

A file repository is a local directory where each job identifier is a filename, optionally suffixed with an extension before hitting the filesystem. This is an abstract base class for other file repositories which have more to say about what is found at these filepaths.

async contains(item)[source]#
async unfiltered_iter()[source]#
async validate()[source]#
fullpath(job) Path[source]#

Construct the full local path of the file corresponding to job.

async info(job)[source]#

The templating info provided by a file repository is the full path to the corresponding file as a string.

class pydatatask.repository.FileRepository(basedir, extension='', case_insensitive=False)[source]#

Bases: FileRepositoryBase, BlobRepository

A file repository whose members are files, treated as streamable blobs.

async open(job, mode='r')[source]#
async delete(job)[source]#
class pydatatask.repository.DirectoryRepository(*args, discard_empty=False, **kwargs)[source]#

Bases: FileRepositoryBase

A file repository whose members are directories.

Parameters:

discard_empty – Whether only directories containing at least one member should be considered as “present” in the repository.

async mkdir(job)[source]#

Create an empty directory corresponding to job. Do nothing if the directory already exists.

async delete(job)[source]#
async contains(item)[source]#
async unfiltered_iter()[source]#
class pydatatask.repository.S3BucketRepository(client: Callable[[], S3Client], bucket: str, prefix: str = '', suffix: str = '', mimetype: str = 'application/octet-stream', incluster_endpoint: str | None = None)[source]#

Bases: BlobRepository

A repository where keys are paths in a S3 bucket. Provides a streaming interface to the corresponding blobs.

Parameters:
  • client – A callable returning an aiobotocore S3 client connected and authenticated to the server you wish to store things on.

  • bucket – The name of the bucket from which to load and store.

  • prefix – A prefix to put on the job name before translating it into a bucket path. If this is meant to be a directory name it should end with a slash character.

  • suffix – A suffix to put on the job name before translating it into a bucket path. If this is meant to be a file extension it should start with a dot.

  • mimetype – The MIME type to set the content when adding data.

  • incluster_endpoint – Optional: An endpoint URL to provide as the result of info() queries instead of extracting the URL from client.

property client#

The aiobotocore S3 client. This will raise an error if the client comes from a session which is not opened.

async contains(item)[source]#
async unfiltered_iter()[source]#
async validate()[source]#
object_name(job)[source]#

Return the object name for the given job.

async open(job, mode='r')[source]#
async info(job)[source]#

Return an S3BucketInfo corresponding to the given job.

async delete(job)[source]#
class pydatatask.repository.S3BucketInfo(endpoint: str, uri: str, bucket: str, prefix: str, suffix: str)[source]#

Bases: object

The data structure returned from S3BucketRepository.info().

Variables:
  • uri – The s3 URI of the current job’s resource, e.g. s3://bucket/prefix/job.ext. str(info) will also return this.

  • endpoint – The URL of the API server providing the S3 interface.

  • bucket – The name of the bucket objects are stored in.

  • prefix – How to prefix an object name such that it will fit into this repository.

  • suffix – How to suffix an object name such that it will fit into this repository.

class pydatatask.repository.MongoMetadataRepository(collection: Callable[[], AsyncIOMotorCollection], subcollection: str | None)[source]#

Bases: MetadataRepository

A metadata repository using a MongoDB collection as the backing store.

Parameters:
  • collection – A callable returning a motor async collection.

  • subcollection – Optional: the name of a subcollection within the collection in which to store data.

property collection: AsyncIOMotorCollection#

The motor async collection data will be stored in. If this is provided by an unopened session, raise an error.

async contains(item)[source]#
async delete(job)[source]#
async unfiltered_iter()[source]#
async info(job)[source]#

The info of a mongo metadata repository is the literal value stored in the repository with identifier job.

async info_all() Dict[str, Any][source]#
async dump(job, data)[source]#
class pydatatask.repository.InProcessMetadataRepository(data: Dict[str, Any] | None = None)[source]#

Bases: MetadataRepository

An incredibly simple metadata repository which stores all its values in a dict, and will let them vanish when the process terminates.

async info(job)[source]#
async dump(job, data)[source]#
async contains(item)[source]#
async delete(job)[source]#
async unfiltered_iter()[source]#
class pydatatask.repository.InProcessBlobStream(repo: InProcessBlobRepository, job: str)[source]#

Bases: object

A stream returned from an BlobRepository.open call from InProcessBlobRepository. Do not construct this manually.

async read(n: int | None = None) bytes[source]#

Read up to n bytes from the stream.

async write(data: bytes)[source]#

Write data to the stream.

async close()[source]#

Close and release the stream, syncing the data back to the repository.

class pydatatask.repository.InProcessBlobRepository(data: Dict[str, bytes] | None = None)[source]#

Bases: BlobRepository

An incredibly simple blob repository which stores all its values in a dict, and will let them vanish when the process terminates.

async info(job)[source]#

There is no templating info for an InProcessBlobRepository.

async open(job, mode='r')[source]#
async unfiltered_iter()[source]#
async contains(item)[source]#
async delete(job)[source]#
class pydatatask.repository.DockerRepository(registry: Callable[[], DockerRegistryClientAsync], domain: str, repository: str)[source]#

Bases: Repository

A docker repository is, well, an actual docker repository hosted in some registry somewhere. Keys translate to tags on this repository.

Parameters:
  • registry – A callable returning a docker_registry_client_async client object with appropriate authentication information.

  • domain – The registry domain to connect to, e.g. index.docker.io.

  • repository – The repository to store images in within the domain, e.g. myname/myrepo.

property registry: DockerRegistryClientAsync#

The docker_registry_client_async client object. If this is provided by an unopened session, raise an error.

async unfiltered_iter()[source]#
async info(job)[source]#

The info provided by a docker repository is a dict with two keys, “withdomain” and “withoutdomain”. e.g.:

{ "withdomain": "docker.example.com/myname/myrepo:job", "withoutdomain": "myname/myrepo:job" }
async delete(job)[source]#
class pydatatask.repository.LiveKubeRepository(task: KubeTask)[source]#

Bases: Repository

A repository where keys translate to job labels on running kube pods. This repository is constructed automatically by a KubeTask or subclass and is linked as the live repository. Do not construct this class manually.

async unfiltered_iter()[source]#
async contains(item)[source]#
async info(job)[source]#

Cannot template with live kube info. Implement this if you have something in mind.

async pods() List[V1Pod][source]#

A list of live pod objects corresponding to this repository.

async delete(job)[source]#

Deleting a job from this repository will delete the pod.

class pydatatask.repository.ExecutorLiveRepo(task: ExecutorTask)[source]#

Bases: Repository

A repository where keys translate to running jobs in an ExecutorTask. This repository is constructed automatically and is linked as the live repository. Do not construct this class manually.

async unfiltered_iter()[source]#
async contains(item)[source]#
async delete(job)[source]#

Deleting a job from the repository will cancel the corresponding task.

async info(job)[source]#

There is no templating info for an ExecutorLiveRepo.

class pydatatask.repository.AggregateOrRepository(**children: Repository)[source]#

Bases: Repository

A repository which is said to contain a job if any of its children also contain that job

async unfiltered_iter()[source]#
async contains(item)[source]#
async info(job)[source]#

The info provided by an aggregate Or repository is a dict mapping each child’s name to that child’s info.

async delete(job)[source]#

Deleting a job from an aggregate Or repository deletes the job from all of its children.

class pydatatask.repository.AggregateAndRepository(**children: Repository)[source]#

Bases: Repository

A repository which is said to contain a job if all its children also contain that job

async unfiltered_iter()[source]#
async contains(item)[source]#
async info(job)[source]#

The info provided by an aggregate And repository is a dict mapping each child’s name to that child’s info.

async delete(job)[source]#

Deleting a job from an aggregate And repository deletes the job from all of its children.

class pydatatask.repository.BlockingRepository(source: Repository, unless: Repository, enumerate_unless=True)[source]#

Bases: Repository

A class that is said to contain a job if source contains it and unless does not contain it

async unfiltered_iter()[source]#
async contains(item)[source]#
async info(job)[source]#
async delete(job)[source]#
class pydatatask.repository.YamlMetadataRepository[source]#

Bases: BlobRepository, MetadataRepository, ABC

A metadata repository based on a blob repository. When info is accessed, it will load the target file into memory, parse it as yaml, and return the resulting object.

This is a base class, and must be overridden to implement the blob loading portion.

async info(job)[source]#
async dump(job, data)[source]#
class pydatatask.repository.YamlMetadataFileRepository(filename, extension='.yaml', case_insensitive=False)[source]#

Bases: YamlMetadataRepository, FileRepository

A metadata repository based on a file blob repository.

class pydatatask.repository.YamlMetadataS3Repository(client, bucket, prefix, suffix='.yaml', mimetype='text/yaml')[source]#

Bases: YamlMetadataRepository, S3BucketRepository

A metadata repository based on a s3 bucket repository.

async info(job)[source]#
class pydatatask.repository.RelatedItemRepository(base_repository: Repository, translator_repository: Repository, allow_deletes=False, prefetch_lookup=True)[source]#

Bases: Repository

A repository which returns items from another repository based on following a related-item lookup.

Parameters:
  • base_repository – The repository from which to return results based on translated keys. The resulting repository will duck-type as the same type as the base.

  • translator_repository – A repository whose info() will be used to translate keys: info(job) == translated_job.

  • allow_deletes – Whether the delete operation on this repository does anything. If enabled, it will delete only from the base repository.

  • prefetch_lookup – Whether to cache the entirety of the translator repository in memory to improve performance.

async contains(item)[source]#
async delete(job)[source]#
async info(job)[source]#
async unfiltered_iter()[source]#