pydatatask.repository module#
Repositories are arbitrary key-value stores. They are the data part of pydatatask. You can store your data in any way you desire and as long as you can write a Repository class to describe it, it can be used to drive a pipeline.
The notion of the “value” part of the key-value store abstraction is defined very, very loosely. The repository base
class doesn’t have an interface to get or store values, only to query for and delete keys. Instead, you have to know
which repository subclass you’re working with, and use its interfaces. For example, MetadataRepository
assumes that
its values are structured objects and loads them fully into memory, and BlobRepository
provides a streaming interface
to a flat address space.
- class pydatatask.repository.Repository[source]#
Bases:
ABC
A repository is a key-value store where the keys are names of jobs. Since the values have unspecified semantics, the only operations you can do on a generic repository are query for keys.
A repository can be async-iterated to get a listing of its members.
- CHARSET = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'#
- CHARSET_START_END = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'#
- classmethod is_valid_job_id(job: str)[source]#
Determine whether the given job identifier is valid, i.e. that it contains only valid characters (numbers and letters by default).
- async filter_jobs(iterator: AsyncIterable[str]) AsyncIterable[str] [source]#
Apply
is_valid_job_id
as a filter to an async iterator.
- async contains(item)[source]#
Determine whether the given job identifier is present in this repository.
The default implementation is quite inefficient; please override this if possible.
- abstract async unfiltered_iter() AsyncGenerator[str, None] [source]#
The core method of Repository. Implement this to produce an iterable of every string which could potentially be a job identifier present in this repository. When the repository is iterated directly, this will be filtered by
filter_jobs
.
- abstract async info(job) Any [source]#
Returns an arbitrary piece of data related to job. Notably, this is used during templating. This should do something meaningful even if the repository does not contain the requested job.
- abstract async delete(job)[source]#
Delete the given job from the repository. This should succeed even if the job is not present in this repository.
- async info_all() Dict[str, Any] [source]#
Produce a mapping from every job present in the repository to its corresponding info. The default implementation is somewhat inefficient; please override it if there is a more effective way to load all info.
- class pydatatask.repository.BlobRepository[source]#
Bases:
Repository
,ABC
A blob repository has values which are flat data blobs that can be streamed for reading or writing.
- abstract async open(job: str, mode: Literal['r']) AReadText [source]#
- abstract async open(job: str, mode: Literal['rb']) AReadStream
- abstract async open(job: str, mode: Literal['w']) AWriteText
- abstract async open(job: str, mode: Literal['wb']) AWriteStream
Open the given job’s value as a stream for reading or writing, in text or binary mode.
- class pydatatask.repository.MetadataRepository[source]#
Bases:
Repository
,ABC
A metadata repository has values which are small, structured data, and loads them entirely into memory, returning the structured data from the
info
method.
- class pydatatask.repository.FileRepositoryBase(basedir, extension='', case_insensitive=False)[source]#
Bases:
Repository
,ABC
A file repository is a local directory where each job identifier is a filename, optionally suffixed with an extension before hitting the filesystem. This is an abstract base class for other file repositories which have more to say about what is found at these filepaths.
- class pydatatask.repository.FileRepository(basedir, extension='', case_insensitive=False)[source]#
Bases:
FileRepositoryBase
,BlobRepository
A file repository whose members are files, treated as streamable blobs.
- class pydatatask.repository.DirectoryRepository(*args, discard_empty=False, **kwargs)[source]#
Bases:
FileRepositoryBase
A file repository whose members are directories.
- Parameters:
discard_empty – Whether only directories containing at least one member should be considered as “present” in the repository.
- class pydatatask.repository.S3BucketRepository(client: Callable[[], S3Client], bucket: str, prefix: str = '', suffix: str = '', mimetype: str = 'application/octet-stream', incluster_endpoint: str | None = None)[source]#
Bases:
BlobRepository
A repository where keys are paths in a S3 bucket. Provides a streaming interface to the corresponding blobs.
- Parameters:
client – A callable returning an aiobotocore S3 client connected and authenticated to the server you wish to store things on.
bucket – The name of the bucket from which to load and store.
prefix – A prefix to put on the job name before translating it into a bucket path. If this is meant to be a directory name it should end with a slash character.
suffix – A suffix to put on the job name before translating it into a bucket path. If this is meant to be a file extension it should start with a dot.
mimetype – The MIME type to set the content when adding data.
incluster_endpoint – Optional: An endpoint URL to provide as the result of info() queries instead of extracting the URL from
client
.
- property client#
The aiobotocore S3 client. This will raise an error if the client comes from a session which is not opened.
- async info(job)[source]#
Return an
S3BucketInfo
corresponding to the given job.
- class pydatatask.repository.S3BucketInfo(endpoint: str, uri: str, bucket: str, prefix: str, suffix: str)[source]#
Bases:
object
The data structure returned from
S3BucketRepository.info()
.- Variables:
uri – The s3 URI of the current job’s resource, e.g.
s3://bucket/prefix/job.ext
.str(info)
will also return this.endpoint – The URL of the API server providing the S3 interface.
bucket – The name of the bucket objects are stored in.
prefix – How to prefix an object name such that it will fit into this repository.
suffix – How to suffix an object name such that it will fit into this repository.
- class pydatatask.repository.MongoMetadataRepository(collection: Callable[[], AsyncIOMotorCollection], subcollection: str | None)[source]#
Bases:
MetadataRepository
A metadata repository using a MongoDB collection as the backing store.
- Parameters:
collection – A callable returning a motor async collection.
subcollection – Optional: the name of a subcollection within the collection in which to store data.
- property collection: AsyncIOMotorCollection#
The motor async collection data will be stored in. If this is provided by an unopened session, raise an error.
- class pydatatask.repository.InProcessMetadataRepository(data: Dict[str, Any] | None = None)[source]#
Bases:
MetadataRepository
An incredibly simple metadata repository which stores all its values in a dict, and will let them vanish when the process terminates.
- class pydatatask.repository.InProcessBlobStream(repo: InProcessBlobRepository, job: str)[source]#
Bases:
object
A stream returned from an
BlobRepository.open
call fromInProcessBlobRepository
. Do not construct this manually.
- class pydatatask.repository.InProcessBlobRepository(data: Dict[str, bytes] | None = None)[source]#
Bases:
BlobRepository
An incredibly simple blob repository which stores all its values in a dict, and will let them vanish when the process terminates.
- async info(job)[source]#
There is no templating info for an
InProcessBlobRepository
.
- class pydatatask.repository.DockerRepository(registry: Callable[[], DockerRegistryClientAsync], domain: str, repository: str)[source]#
Bases:
Repository
A docker repository is, well, an actual docker repository hosted in some registry somewhere. Keys translate to tags on this repository.
- Parameters:
registry – A callable returning a docker_registry_client_async client object with appropriate authentication information.
domain – The registry domain to connect to, e.g.
index.docker.io
.repository – The repository to store images in within the domain, e.g.
myname/myrepo
.
- property registry: DockerRegistryClientAsync#
The
docker_registry_client_async
client object. If this is provided by an unopened session, raise an error.
- class pydatatask.repository.LiveKubeRepository(task: KubeTask)[source]#
Bases:
Repository
A repository where keys translate to
job
labels on running kube pods. This repository is constructed automatically by aKubeTask
or subclass and is linked as thelive
repository. Do not construct this class manually.
- class pydatatask.repository.ExecutorLiveRepo(task: ExecutorTask)[source]#
Bases:
Repository
A repository where keys translate to running jobs in an ExecutorTask. This repository is constructed automatically and is linked as the
live
repository. Do not construct this class manually.- async info(job)[source]#
There is no templating info for an
ExecutorLiveRepo
.
- class pydatatask.repository.AggregateOrRepository(**children: Repository)[source]#
Bases:
Repository
A repository which is said to contain a job if any of its children also contain that job
- class pydatatask.repository.AggregateAndRepository(**children: Repository)[source]#
Bases:
Repository
A repository which is said to contain a job if all its children also contain that job
- class pydatatask.repository.BlockingRepository(source: Repository, unless: Repository, enumerate_unless=True)[source]#
Bases:
Repository
A class that is said to contain a job if
source
contains it andunless
does not contain it
- class pydatatask.repository.YamlMetadataRepository[source]#
Bases:
BlobRepository
,MetadataRepository
,ABC
A metadata repository based on a blob repository. When info is accessed, it will load the target file into memory, parse it as yaml, and return the resulting object.
This is a base class, and must be overridden to implement the blob loading portion.
- class pydatatask.repository.YamlMetadataFileRepository(filename, extension='.yaml', case_insensitive=False)[source]#
Bases:
YamlMetadataRepository
,FileRepository
A metadata repository based on a file blob repository.
- class pydatatask.repository.YamlMetadataS3Repository(client, bucket, prefix, suffix='.yaml', mimetype='text/yaml')[source]#
Bases:
YamlMetadataRepository
,S3BucketRepository
A metadata repository based on a s3 bucket repository.
- class pydatatask.repository.RelatedItemRepository(base_repository: Repository, translator_repository: Repository, allow_deletes=False, prefetch_lookup=True)[source]#
Bases:
Repository
A repository which returns items from another repository based on following a related-item lookup.
- Parameters:
base_repository – The repository from which to return results based on translated keys. The resulting repository will duck-type as the same type as the base.
translator_repository – A repository whose info() will be used to translate keys:
info(job) == translated_job
.allow_deletes – Whether the delete operation on this repository does anything. If enabled, it will delete only from the base repository.
prefetch_lookup – Whether to cache the entirety of the translator repository in memory to improve performance.