Source code for xoa_driver.internals.commands.p4e_commands

from __future__ import annotations
from dataclasses import dataclass
import typing

from xoa_driver.internals.core.builders import (
    build_get_request,
    build_set_request
)
from xoa_driver.internals.core import interfaces
from xoa_driver.internals.core.token import Token
from xoa_driver.internals.core.transporter.registry import register_command
from xoa_driver.internals.core.transporter.protocol.payload import (
    field,
    RequestBodyStruct,
    ResponseBodyStruct,
    XmpByte,
    XmpHex,
    Hex,
)


[docs] @register_command @dataclass class P4E_ASSIGN: """ Advanced mode only: Assign previously reserved PEs to a port. """ code: typing.ClassVar[int] = 675 pushed: typing.ClassVar[bool] = False _connection: 'interfaces.IConnection' _module: int _port: int class GetDataAttr(ResponseBodyStruct): mask: Hex = field(XmpHex(size=8)) """eight hex bytes, a bitmask specifying which PEs should be assigned to this port""" class SetDataAttr(RequestBodyStruct): mask: Hex = field(XmpHex(size=8)) """eight hex bytes, a bitmask specifying which PEs should be assigned to this port"""
[docs] def get(self) -> Token[GetDataAttr]: """Get the assigned PEs on the port. :return: previously reserved PEs :rtype: P4E_ASSIGN.GetDataAttr """ return Token(self._connection, build_get_request(self, module=self._module, port=self._port))
[docs] def set(self, mask: Hex) -> Token[None]: """Assign the previously reserved PEs to the port. :param mask: a bitmask specifying which PEs should be assigned to this port :type mask: str """ return Token(self._connection, build_set_request(self, module=self._module, port=self._port, mask=mask))
[docs] @register_command @dataclass class P4E_AVAILABLE: """ Simple mode only: Report the number of PEs available for allocation. """ code: typing.ClassVar[int] = 676 pushed: typing.ClassVar[bool] = False _connection: 'interfaces.IConnection' _module: int _port: int class GetDataAttr(ResponseBodyStruct): available_pe_count: int = field(XmpByte()) """byte, total number of PEs that can be allocated to the port - including the PEs already allocated to the port."""
[docs] def get(self) -> Token[GetDataAttr]: """Get the number of PEs available for allocation. :return: the number of PEs available for allocation. :rtype: P4E_AVAILABLE.GetDataAttr """ return Token(self._connection, build_get_request(self, module=self._module, port=self._port))
[docs] @register_command @dataclass class P4E_ALLOCATE: """ Simple mode only: Allocate a number of PEs to this port. """ code: typing.ClassVar[int] = 677 pushed: typing.ClassVar[bool] = False _connection: 'interfaces.IConnection' _module: int _port: int class GetDataAttr(ResponseBodyStruct): pe_count_alloc: int = field(XmpByte()) """byte, the total number of PEs to allocate to this port - including the PEs already allocated to the port.""" class SetDataAttr(RequestBodyStruct): pe_count_alloc: int = field(XmpByte()) """byte, the total number of PEs to allocate to this port - including the PEs already allocated to the port."""
[docs] def get(self) -> Token[GetDataAttr]: """Get the number of PEs allocated to this port. :return: the total number of PEs to allocate to this port - including the PEs already allocated to the port. :rtype: P4E_ALLOCATE.GetDataAttr """ return Token(self._connection, build_get_request(self, module=self._module, port=self._port))
[docs] def set(self, pe_count_alloc: int) -> Token[None]: """Allocate a number of PEs to this port. :param pe_count_alloc: the total number of PEs to allocate to this port - including the PEs already allocated to the port. :type pe_count_alloc: int """ return Token(self._connection, build_set_request(self, module=self._module, port=self._port, pe_count_alloc=pe_count_alloc))
[docs] @register_command @dataclass class P4E_ALLOCATION_INFO: """ Display information about which PEs that are available for allocation/assignment and which are currently allocated/assigned to this port. """ code: typing.ClassVar[int] = 678 pushed: typing.ClassVar[bool] = True _connection: 'interfaces.IConnection' _module: int _port: int class GetDataAttr(ResponseBodyStruct): available: Hex = field(XmpHex(size=8)) """eight hex bytes (64-bit) mask of available PEs""" allocated: Hex = field(XmpHex(size=8)) """eight hex bytes (64-bit) mask of PEs assigned to this port"""
[docs] def get(self) -> Token[GetDataAttr]: """Get PEs allocation information. :return: PEs allocation information :rtype: P4E_ALLOCATION_INFO.GetDataAttr """ return Token(self._connection, build_get_request(self, module=self._module, port=self._port))