Source code for xoa_driver.internals.core.funcs

from __future__ import annotations
import asyncio
from asyncio.events import AbstractEventLoop
import io
from typing import (
    TYPE_CHECKING,
    Any,
    AsyncGenerator,
)
if TYPE_CHECKING:
    from .transporter.handler import TransportationHandler
    from . import interfaces

from .token import Token
from .transporter import exceptions


async def establish_connection(transporter: "TransportationHandler", host: str, port: int = 22606, *, seconds_timeout: int = 5, loop: AbstractEventLoop | None = None) -> None:
    """
    Establish connection to provided host and port and assign ``<TransportationHandler>`` to it.
    """
    assert isinstance(loop, AbstractEventLoop) or loop is None, "<loop> must be an instance of AbstractEventLoop or None"
    __loop = loop if loop else asyncio.get_event_loop()
    try:
        await asyncio.wait_for(
            __loop.create_connection(lambda: transporter, host=host, port=port),
            seconds_timeout
        )
    except OSError:
        raise exceptions.XoaConnectionError(host, port) from None
    except asyncio.exceptions.TimeoutError:
        raise exceptions.XoaConnectionTimeoutError(host, port, seconds_timeout) from None


[docs] async def apply_iter(*cmd_tokens: Token[Any], return_exceptions: bool = False, token_timeout_sec: float | None = 5.0) -> AsyncGenerator[Any, None]: """ Main interface for chunking the commands which need to be send to the single tester at the same time. """ if not cmd_tokens: return conn: "interfaces.IConnection" = cmd_tokens[0].connection buffer_bytes = io.BytesIO() queue: asyncio.Queue[asyncio.Future] = asyncio.Queue() for t in cmd_tokens: (data, fut) = await t.connection.prepare_data(t.request) buffer_bytes.write(data) queue.put_nowait(fut) conn.send(buffer_bytes.getvalue()) buffer_bytes.close() del buffer_bytes while not queue.empty(): future = await queue.get() try: result_ = await asyncio.wait_for( asyncio.shield(future), token_timeout_sec if return_exceptions else None ) except Exception as e: if return_exceptions: yield e else: raise e else: yield result_ queue.task_done() await queue.join()
[docs] async def apply(*cmd_tokens: Token[Any], return_exceptions: bool = False, token_timeout_sec: float | None = 5.0) -> list[Any]: """ Main interface for chunking the commands which need to be send to one or multiple testers at the same time. """ assert len(cmd_tokens) <= 200, "Number of the commands is bigger then 200 for one aggregation, please use function <apply_iter> instead" return [ f async for f in apply_iter( *cmd_tokens, return_exceptions=return_exceptions, token_timeout_sec=token_timeout_sec ) ]