pydatatask.resource_manager module#

pydatatask defines the notion of resources, or numerical quantities of CPU and memory which can be allocated to a given job. This is mediated through a ResourceManager, an object which can atomically track increments and decrements from a quota, and reject a request if it would break the quota.

Typical usage is to construct a ResourceManager and pass it to a task constructor:

quota = pydatatask.ResourceManager(pydatatask.Resources.parse(cpu='1000m', mem='1Gi'))
task = pydatatask.ProcessTask("my_task", localhost, quota, ...)
class pydatatask.resource_manager.ResourceType(value)[source]#

Bases: Enum

An enum class indicating a type of resource. Presently can be CPU, MEM, or RATE.

CPU = 1#
MEM = 2#
RATE = 3#
class pydatatask.resource_manager.Resources(cpu: Decimal = Decimal('0'), mem: Decimal = Decimal('0'), launches: int = 1)[source]#

Bases: object

A dataclass containing a quantity of resources.

Resources can be summed:

r = pydatatask.Resources.parse(1, 1, 100)
r += pydatatask.Resources.parse(2, 3, 0)
r -= pydatatask.Resources.parse(1, 1, 0)
assert r == pydatatask.Resources.parse(2, 3, 100)
cpu: Decimal = Decimal('0')#
mem: Decimal = Decimal('0')#
launches: int = 1#
static parse(cpu: str | float | int | Decimal, mem: str | float | int | Decimal, launches: str | float | int | Decimal | None = None) Resources[source]#

Construct a Resources instance by parsing the given quantities of CPU, memory, and launches.

excess(limit: Resources) ResourceType | None[source]#

Determine if these resources are over a given limit.

Returns:

The ResourceType of the first resource that is over-limit, or None if self is under-limit.

class pydatatask.resource_manager.ResourceManager(quota: Resources)[source]#

Bases: object

The ResourceManager tracks quotas of resources. Direct use of this class beyond construction will only be necessary if you are writing a custom Task class.

Parameters:

quota – The resource limit that should be applied to the sum of all tasks using this manager.

register(func: Callable[[], Awaitable[Resources]])[source]#

Register an async callback which will be used to load the current resource utilization on process start. The initial reported usage will be the sum of the result of all the registered callbacks.

If you are writing a Task class, you should call this in your constructor to save a reference to a method on your class which retrieves the current resource utilization by that task.

async flush()[source]#

Forget the cached amount of resources currently being used. Next call to reserve or relinquish will calculate usage anew.

async reserve(request: Resources) ResourceType | None[source]#

Atomically reserve the given amount of resources and return None, or do nothing and return the limiting resource type if any resource would be over-quota.

async relinquish(request: Resources)[source]#

Atomically mark the given amount of resources as available.

pydatatask.resource_manager.parse_quantity(quantity)[source]#

Parse kubernetes canonical form quantity like 200Mi to a decimal number. Supported SI suffixes: base1024: Ki | Mi | Gi | Ti | Pi | Ei base1000: n | u | m | “” | k | M | G | T | P | E

See https://github.com/kubernetes/apimachinery/blob/master/pkg/api/resource/quantity.go

Input: quantity: string. kubernetes canonical form quantity

Returns: Decimal

Raises: ValueError on invalid or unknown input