"""
pydatatask defines the notion of resources, or numerical quantities of CPU and memory which can be allocated to a given
job. This is mediated through a :class:`ResourceManager`, an object which can atomically track increments and decrements
from a quota, and reject a request if it would break the quota.
Typical usage is to construct a :class:`ResourceManager` and pass it to a task constructor:
.. code:: python
quota = pydatatask.ResourceManager(pydatatask.Resources.parse(cpu='1000m', mem='1Gi'))
task = pydatatask.ProcessTask("my_task", localhost, quota, ...)
"""
from typing import Awaitable, Callable, List, Optional, Union
from asyncio import Lock
from dataclasses import dataclass, field
from decimal import Decimal
from enum import Enum, auto
from kubernetes.utils import parse_quantity
__all__ = ("ResourceType", "Resources", "ResourceManager", "parse_quantity")
[docs]class ResourceType(Enum):
"""
An enum class indicating a type of resource. Presently can be CPU, MEM, or RATE.
"""
CPU = auto()
MEM = auto()
RATE = auto()
[docs]@dataclass
class Resources:
"""
A dataclass containing a quantity of resources.
Resources can be summed:
.. code:: python
r = pydatatask.Resources.parse(1, 1, 100)
r += pydatatask.Resources.parse(2, 3, 0)
r -= pydatatask.Resources.parse(1, 1, 0)
assert r == pydatatask.Resources.parse(2, 3, 100)
"""
cpu: Decimal = field(default=Decimal(0))
mem: Decimal = field(default=Decimal(0))
launches: int = 1
[docs] @staticmethod
def parse(
cpu: Union[str, float, int, Decimal],
mem: Union[str, float, int, Decimal],
launches: Union[str, float, int, Decimal, None] = None,
) -> "Resources":
"""
Construct a :class:`Resources` instance by parsing the given quantities of CPU, memory, and launches.
"""
if launches is None:
launches = 999999999
return Resources(cpu=parse_quantity(cpu), mem=parse_quantity(mem), launches=int(launches))
def __add__(self, other: "Resources"):
return Resources(cpu=self.cpu + other.cpu, mem=self.mem + other.mem, launches=self.launches + other.launches)
def __mul__(self, other: int):
return Resources(cpu=self.cpu * other, mem=self.mem * other, launches=self.launches * other)
def __sub__(self, other: "Resources"):
return self + other * -1
[docs] def excess(self, limit: "Resources") -> Optional[ResourceType]:
"""
Determine if these resources are over a given limit.
:return: The ResourceType of the first resource that is over-limit, or None if self is under-limit.
"""
if self.cpu > limit.cpu:
return ResourceType.CPU
elif self.mem > limit.mem:
return ResourceType.MEM
elif self.launches > limit.launches:
return ResourceType.RATE
else:
return None
[docs]class ResourceManager:
"""
The ResourceManager tracks quotas of resources. Direct use of this class beyond construction will only be necessary
if you are writing a custom Task class.
"""
def __init__(self, quota: Resources):
"""
:param quota: The resource limit that should be applied to the sum of all tasks using this manager.
"""
self.quota = quota
self._lock = Lock()
self._cached: Optional[Resources] = None
self._registered_getters: List[Callable[[], Awaitable[Resources]]] = []
[docs] def register(self, func: Callable[[], Awaitable[Resources]]):
"""
Register an async callback which will be used to load the current resource utilization on process start.
The initial reported usage will be the sum of the result of all the registered callbacks.
If you are writing a Task class, you should call this in your constructor to save a reference to a method on
your class which retrieves the current resource utilization by that task.
"""
self._registered_getters.append(func)
async def _getter(self):
result = Resources()
for getter in self._registered_getters:
result += await getter()
return result
[docs] async def flush(self):
"""
Forget the cached amount of resources currently being used. Next call to reserve or relinquish will calculate
usage anew.
"""
async with self._lock:
self._cached = None
[docs] async def reserve(self, request: Resources) -> Optional[ResourceType]:
"""
Atomically reserve the given amount of resources and return None, or do nothing and return the limiting resource
type if any resource would be over-quota.
"""
async with self._lock:
if self._cached is None:
self._cached = await self._getter()
target = self._cached + request
excess = target.excess(self.quota)
if excess is None:
self._cached = target
return None
else:
return excess
[docs] async def relinquish(self, request: Resources):
"""
Atomically mark the given amount of resources as available.
"""
async with self._lock:
if self._cached is None:
self._cached = await self._getter()
self._cached -= request