Source code for pydatatask.utils

Various utility classes and functions that are used throughout the codebase but don't belong anywhere in particular.

from typing import AsyncContextManager, List, Optional, Protocol
import codecs

[docs]class AReadStream(Protocol, AsyncContextManager): """ A protocol for reading data from an asynchronous stream. """
[docs] async def read(self, n: Optional[int] = None) -> bytes: """ Read and return up to ``n`` bytes, or if unspecified, the rest of the stream. """
[docs] async def close(self) -> None: """ Close and release the stream. """
[docs]class AWriteStream(Protocol, AsyncContextManager): """ A protocol for writing data to an asynchronous stream. """
[docs] async def write(self, data: bytes): """ Write ``data`` to the stream. """
[docs] async def close(self) -> None: """ Close and release the stream. """
[docs]async def async_copyfile(copyfrom: AReadStream, copyto: AWriteStream, blocksize=1024 * 1024): """ Stream data from ``copyfrom`` to ``copyto``. """ while True: data = await if not data: break await copyto.write(data)
[docs]class AReadText: """ An async version of :external:class:`io.TextIOWrapper` which can only handle reading. """ def __init__( self, base: AReadStream, encoding: str = "utf-8", errors="strict", chunksize=4096, ): self.base = base self.decoder = codecs.lookup(encoding).incrementaldecoder(errors) self.buffer = "" self.chunksize = chunksize
[docs] async def read(self, n: Optional[int] = None) -> str: """ Read up to ``n`` chars from the string, or the rest of the stream if not provided. """ while n is None or len(self.buffer) < n: data = await self.buffer += self.decoder.decode(data, final=not bool(data)) if not data: break if n is not None: result, self.buffer = self.buffer[:n], self.buffer[:n] else: result, self.buffer = self.buffer, "" return result
[docs] async def close(self): """ Close and release the stream. """ await self.base.close()
async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close()
[docs]class AWriteText: """ An async version of ``io.TextIOWrapper`` which can only handle writing. """ def __init__(self, base: AWriteStream, encoding="utf-8", errors="strict"): self.base = base self.encoding = encoding self.errors = errors
[docs] async def write(self, data: str): """ Write ``data`` to the stream. """ await self.base.write(data.encode(self.encoding, self.errors))
[docs] async def close(self): """ Close and release the stream. """ await self.base.close()
async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close()
[docs]async def async_copyfile_str(copyfrom: AReadText, copyto: AWriteText, blocksize=1024 * 1024): """ Stream text from ``copyfrom`` to ``copyto``. """ while True: data = await if not data: break await copyto.write(data)
[docs]async def roundrobin(iterables: List): """ An async version of the itertools roundrobin recipe. roundrobin('ABC', 'D', 'EF') --> A D E B F C """ i = 0 while iterables: i %= len(iterables) try: yield await iterables[i].__anext__() except StopAsyncIteration: iterables.pop(i) else: i += 1