Source code for xoa_driver.internals.commands.m4e_commands

from __future__ import annotations
from dataclasses import dataclass
import typing
import functools

from xoa_driver.internals.core.builders import (
    build_get_request,
    build_set_request
)
from xoa_driver.internals.core import interfaces
from xoa_driver.internals.core.token import Token
from xoa_driver.internals.core.transporter.registry import register_command
from xoa_driver.internals.core.transporter.protocol.payload import (
    field,
    RequestBodyStruct,
    ResponseBodyStruct,
    XmpByte,
    XmpHex,
    Hex,
)
from .enums import ResourceAllocationMode


[docs] @register_command @dataclass class M4E_MODE: """ Select resource allocation mode. """ code: typing.ClassVar[int] = 850 pushed: typing.ClassVar[bool] = False _connection: 'interfaces.IConnection' _module: int class GetDataAttr(ResponseBodyStruct): mode: ResourceAllocationMode = field(XmpByte()) """coded byte, resource allocation mode.""" class SetDataAttr(RequestBodyStruct): mode: ResourceAllocationMode = field(XmpByte()) """coded byte, resource allocation mode."""
[docs] def get(self) -> Token[GetDataAttr]: """Get the resource allocation mode. :return: resource allocation mode :rtype: M4E_MODE.GetDataAttr """ return Token(self._connection, build_get_request(self, module=self._module))
[docs] def set(self, mode: ResourceAllocationMode) -> Token[None]: """Set the resource allocation mode. :param mode: resource allocation mode :type mode: ResourceAllocationMode """ return Token(self._connection, build_set_request(self, module=self._module, mode=mode))
set_simple = functools.partialmethod(set, ResourceAllocationMode.SIMPLE) """Set resource allocation mode to Simple.""" set_advanced = functools.partialmethod(set, ResourceAllocationMode.ADVANCED) """Set resource allocation mode to Advanced."""
[docs] @register_command @dataclass class M4E_RESERVE: """ Advanced mode only: Reserve a number of PEs so they later can be assigned to specific ports. """ code: typing.ClassVar[int] = 851 pushed: typing.ClassVar[bool] = False _connection: 'interfaces.IConnection' _module: int class GetDataAttr(ResponseBodyStruct): mask: Hex = field(XmpHex(size=8)) """eight hex bytes, bitmask of PEs to reserve""" class SetDataAttr(RequestBodyStruct): mask: Hex = field(XmpHex(size=8)) """eight hex bytes, bitmask of PEs to reserve"""
[docs] def get(self) -> Token[GetDataAttr]: """Get the PEs reserved. :return: the number of PEs reserved. :rtype: M4E_RESERVE.GetDataAttr """ return Token(self._connection, build_get_request(self, module=self._module))
[docs] def set(self, mask: Hex) -> Token[None]: """Set PEs reserved. :param mask: bitmask of PEs to reserve :type mask: str """ return Token(self._connection, build_set_request(self, module=self._module, mask=mask))