Source code for xoa_driver.functions.anlt

from __future__ import annotations
from dataclasses import dataclass, field
import typing as t
from xoa_driver import enums
from xoa_driver.utils import apply
from xoa_driver.internals.hli_v1.ports.port_l23.family_l import FamilyL
from xoa_driver.internals.hli_v1.ports.port_l23.family_l1 import FamilyFreya
from xoa_driver.ports import GenericL23Port
from xoa_driver.lli import commands
from xoa_driver.internals.core import interfaces as itf
from xoa_driver.misc import Token
from .tools import (
    get_ctx,
    dictionize_autoneg_status,
    dictionize_lt_status,
    dictionize_txtap_get,
    dictionize_anlt_status,
    dictionize_lt_im_status,
    dictionize_lt_algorithm_status,
    dictionize_anlt_log_ctrl_status
)
import asyncio

PcsPmaSupported = (FamilyL, FamilyFreya)
AutoNegSupported = (FamilyL, FamilyFreya)
LinkTrainingSupported = FamilyL


@dataclass
class DoAnlt:
    port: GenericL23Port
    """port object"""
    should_do_an: bool
    """should the port do autoneg?"""
    should_do_lt: bool
    """should the port do link training?"""
    an_allow_loopback: bool
    """should the autoneg allow loopback?"""
    lt_preset0: enums.FreyaOutOfSyncPreset
    """out-of-sync tap values (preset 0): existing or standard"""
    lt_initial_modulations: dict[str, enums.LinkTrainEncoding]
    """the initial modulations of each serdes"""
    should_lt_interactive: bool
    """should perform link training manually?"""
    lt_algorithm: dict[str, enums.LinkTrainAlgorithm]
    """link training algorithm should be used?"""
    should_enable_lt_timeout: bool
    """should the port do link training with timeout enabled?"""

    _group: tuple["itf.IConnection", int, int] = field(init=False, repr=False)

    def __post_init__(self) -> None:
        self._group = get_ctx(self.port)

    # def __pp_autoneg(self, on: bool) -> Token:
    #     state = enums.FreyaAutonegMode.ENABLED if on else enums.FreyaAutonegMode.DISABLED
    #     return commands.PL1_AUTONEG_CONFIG(*self._group).set(

    #     )
    #     return commands.PP_AUTONEG(*self._group).set(
    #         state,
    #         enums.AutoNegTecAbility.DEFAULT_TECH_MODE,
    #         enums.AutoNegFECOption.DEFAULT_FEC,
    #         enums.AutoNegFECOption.DEFAULT_FEC,
    #         enums.PauseMode.NO_PAUSE,
    #     )

    def __pl1_linktrain_config(
        self,
        ooo_preset: enums.FreyaOutOfSyncPreset,
        timeout_mode: enums.TimeoutMode,
    ) -> Token:
        return commands.PL1_LINKTRAIN_CONFIG(*self._group).set(
            oos_preset=ooo_preset,
            timeout_mode=timeout_mode,
        )

    def __pl1_cfg_tmp(
        self, serdes: int, config_type: enums.Layer1ConfigType, values: list[int]
    ) -> Token:
        return commands.PL1_CFG_TMP(*self._group, serdes, config_type).set(
            values=values
        )
    
    def __pl1_anlt(
        self,
        an_mode: enums.FreyaAutonegMode,
        lt_mode: enums.FreyaLinkTrainingMode,
    ) -> Token:
        return commands.PL1_ANLT(*self._group).set(
            an_mode=an_mode,
            lt_mode=lt_mode,
        )

    def __select_modes(self) -> tuple[enums.FreyaAutonegMode, enums.FreyaLinkTrainingMode, enums.TimeoutMode]:
        if self.should_do_an == True:
            _an_mode = enums.FreyaAutonegMode.ENABLED
        else:
            _an_mode = enums.FreyaAutonegMode.DISABLED
        if self.should_do_lt == True:
            # LT interactive must always disable LT timeout
            if self.should_lt_interactive == True:
                _lt_mode = enums.FreyaLinkTrainingMode.ENABLED_INTERACTIVE
                _timeout_mode = enums.TimeoutMode.DISABLED
            # For LT auto, you can either enable LT timeout or disable LT timeout
            elif self.should_enable_lt_timeout == True:
                _lt_mode = enums.FreyaLinkTrainingMode.ENABLED_AUTO
                _timeout_mode = enums.TimeoutMode.DEFAULT
            else:
                _lt_mode = enums.FreyaLinkTrainingMode.ENABLED_AUTO
                _timeout_mode = enums.TimeoutMode.DISABLED
        else:
            _lt_mode = enums.FreyaLinkTrainingMode.DISABLED
            _timeout_mode = enums.TimeoutMode.DISABLED

        return _an_mode, _lt_mode, _timeout_mode

    def __builder__(self) -> t.Generator[Token, None, None]:
        """Defining commands sequence"""
        
        # # Set autoneg timeout
        # yield self.__pl1_linktrain_config(
        #     enums.LinkTrainingMode.DISABLED,
        #     enums.NRZPreset.NRZ_NO_PRESET,
        #     enums.TimeoutMode.DEFAULT,
        # )

        # # Set autoneg allow-loopback
        yield self.__pl1_cfg_tmp(
            0, enums.Layer1ConfigType.AN_LOOPBACK, [int(self.an_allow_loopback)]
        )

        # yield self.__pp_autoneg(self.should_do_an and not self.should_do_lt)
        # if (not self.should_do_an) or self.should_do_lt:
        # Disable autoneg
        # yield self.__pp_autoneg(False)

        # Set the link train algorithm
        for serdes_str, algorithm in self.lt_algorithm.items():
            yield self.__pl1_cfg_tmp(
                int(serdes_str), enums.Layer1ConfigType.LT_TRAINING_ALGORITHM, [algorithm.value]
            )
        # Set the link train initial modulation
        for serdes_str, im in self.lt_initial_modulations.items():
            yield self.__pl1_cfg_tmp(
                int(serdes_str), enums.Layer1ConfigType.LT_INITIAL_MODULATION, [im.value]
            )

        # Get the mode
        _an_mode, _lt_mode, _timeout_mode = self.__select_modes()

        # Set link train config
        _ooo_preset = self.lt_preset0
        yield self.__pl1_linktrain_config(
            _ooo_preset, _timeout_mode
        )

        # Start AN/LT
        yield self.__pl1_anlt(
            _an_mode, _lt_mode
        )


    async def run(self) -> None:
        """Start anlt execution"""
        await apply(*self.__builder__())


[docs] async def anlt_start( port: GenericL23Port, should_do_an: bool, should_do_lt: bool, an_allow_loopback: bool, lt_preset0: enums.FreyaOutOfSyncPreset, lt_initial_modulations: dict[str, enums.LinkTrainEncoding], should_lt_interactive: bool, lt_algorithm: dict[str, enums.LinkTrainAlgorithm], should_enable_lt_timeout: bool, ) -> None: """ .. versionchanged:: 2.5 Start ANLT on a port :param port: the port object :type port: :class:`~xoa_driver.ports.GenericL23Port` :param should_do_an: should the port do autoneg? :type should_do_an: bool :param should_do_lt: should the port do link training? :type should_do_lt: bool :param an_allow_loopback: should the autoneg allow loopback? :type an_allow_loopback: bool :param lt_preset0: out-of-sync tap values (preset 0): existing or standard :type lt_preset0: enums.FreyaOutOfSyncPreset :param lt_initial_modulations: the initial modulations of each serdes :type lt_initial_modulations: typing.Dict[str, enums.LinkTrainEncoding] :param should_lt_interactive: should perform link training manually? :type should_lt_interactive: bool :param lt_algorithm: Link training algorithm to use :type lt_algorithm: Dict[str, enums.LinkTrainAlgorithm] :param should_enable_lt_timeout: should run link training with timeout? :type should_enable_lt_timeout: bool """ anlt = DoAnlt( port, should_do_an, should_do_lt, an_allow_loopback, lt_preset0, lt_initial_modulations, should_lt_interactive, lt_algorithm, should_enable_lt_timeout, ) await anlt.run()
[docs] async def autoneg_status(port: GenericL23Port) -> dict[str, t.Any]: """ .. versionchanged:: 2.5 Get the auto-negotiation status :param port: the port object :type port: :class:`~xoa_driver.ports.GenericL23Port` :return: :rtype: typing.Dict[str, typing.Any] """ conn, mid, pid = get_ctx(port) loopback, auto_neg_info, status = await apply( commands.PL1_CFG_TMP( conn, mid, pid, 0, enums.Layer1ConfigType.AN_LOOPBACK ).get(), commands.PL1_AUTONEGINFO(conn, mid, pid, 0).get(), commands.PL1_AUTONEG_STATUS(conn, mid, pid).get(), ) return dictionize_autoneg_status(loopback, auto_neg_info, status)
LinkTrainType = t.Union[ enums.LinkTrainCoeffs, enums.LinkTrainPresets, enums.LinkTrainEncoding, enums.LinkTrainAnnounce, ] async def __lt_coeff( port: GenericL23Port, serdes: int, arg: LinkTrainType, *, cmd: enums.LinkTrainCmd, ) -> enums.LinkTrainCmdResults: conn, mid, pid = get_ctx(port) cmd_ = commands.PL1_LINKTRAIN_CMD(conn, mid, pid, serdes) await cmd_.set(cmd=cmd, arg=arg.value) for _ in range(1000): resp = await cmd_.get() status = resp.result if (resp.flags & enums.LinkTrainCmdFlags.DONE.value): return enums.LinkTrainCmdResults(status) await asyncio.sleep(0.01) return enums.LinkTrainCmdResults.UNKNOWN
[docs] async def lt_coeff_inc( port: GenericL23Port, serdes: int, emphasis: enums.LinkTrainCoeffs ) -> enums.LinkTrainCmdResults: """ .. versionadded:: 1.1 Ask the remote port to increase coeff of the specified serdes. :param port: the port object :type port: :class:`~xoa_driver.ports.GenericL23Port` :param serdes: the serdes index, starting from 0 :type serdes: int :param emphasis: the emphasis to increase :type emphasis: enums.LinkTrainCoeffs :return: :rtype: None """ return await __lt_coeff(port, serdes, emphasis, cmd=enums.LinkTrainCmd.CMD_INC)
[docs] async def lt_coeff_dec( port: GenericL23Port, serdes: int, emphasis: enums.LinkTrainCoeffs ) -> enums.LinkTrainCmdResults: """ .. versionadded:: 1.1 Ask the remote port to decrease coeff of the specified serdes. :param port: the port object :type port: :class:`~xoa_driver.ports.GenericL23Port` :param serdes: the serdes index, starting from 0 :type serdes: int :param emphasis: the emphasis to decrease :type emphasis: enums.LinkTrainCoeffs :return: :rtype: None """ return await __lt_coeff(port, serdes, emphasis, cmd=enums.LinkTrainCmd.CMD_DEC)
[docs] async def lt_coeff_no_eq( port: GenericL23Port, serdes: int, emphasis: enums.LinkTrainCoeffs ) -> enums.LinkTrainCmdResults: """ .. versionadded:: 2.0 Ask the remote port to set the coeff to NO_EQ on the specified serdes. :param port: the port object :type port: :class:`~xoa_driver.ports.GenericL23Port` :param serdes: the serdes index, starting from 0 :type serdes: int :param emphasis: the emphasis to set to NO_EQ :type emphasis: enums.LinkTrainCoeffs :return: :rtype: None """ return await __lt_coeff(port, serdes, emphasis, cmd=enums.LinkTrainCmd.CMD_NO_EQ)
[docs] async def lt_preset( port: GenericL23Port, serdes: int, preset: enums.LinkTrainPresets ) -> enums.LinkTrainCmdResults: """ .. versionadded:: 1.1 Ask the remote port to use the preset of the specified serdes. :param port: the port object :type port: :class:`~xoa_driver.ports.GenericL23Port` :param serdes: the serdes index, starting from 0 :type serdes: int :param preset: preset index to select for the serdes, 0,1,2,3,4, :type preset: enums.LinkTrainPresets :return: :rtype: None """ return await __lt_coeff(port, serdes, preset, cmd=enums.LinkTrainCmd.CMD_PRESET)
[docs] async def lt_encoding( port: GenericL23Port, serdes: int, encoding: enums.LinkTrainEncoding ) -> enums.LinkTrainCmdResults: """ .. versionadded:: 1.1 Ask the remote port to use the encoding of the specified serdes. :param port: the port object :type port: :class:`~xoa_driver.ports.GenericL23Port` :param serdes: the serdes index, starting from 0 :type serdes: int :param encoding: link training encoding :type encoding: enums.LinkTrainCoeffs :return: :rtype: None """ return await __lt_coeff(port, serdes, encoding, cmd=enums.LinkTrainCmd.CMD_ENCODING)
[docs] async def lt_trained(port: GenericL23Port, serdes: int) -> enums.LinkTrainCmdResults: """ .. versionadded:: 1.1 Tell the remote port that the current serdes is trained. :param port: the port object :type port: :class:`~xoa_driver.ports.GenericL23Port` :param serdes: the serdes index, starting from 0 :type serdes: int :return: :rtype: None """ return await __lt_coeff( port, serdes, arg=enums.LinkTrainAnnounce.TRAINED, cmd=enums.LinkTrainCmd.CMD_LOCAL_TRAINED, )
[docs] async def lt_status(port: GenericL23Port, serdes: int) -> dict[str, t.Any]: """ .. versionchanged:: 2.5 Show the link training status. :param port: the port object :type port: :class:`~xoa_driver.ports.GenericL23Port` :param serdes: the serdes index, starting from 0 :type serdes: int :return: LT status of the serdes :rtype: typing.Dict[str, typing.Any] """ conn, mid, pid = get_ctx(port) status, info, ltconf, cfg = await apply( commands.PL1_LINKTRAIN_STATUS(conn, mid, pid, serdes).get(), commands.PL1_LINKTRAININFO(conn, mid, pid, serdes, 0).get(), commands.PL1_LINKTRAIN_CONFIG(conn, mid, pid).get(), commands.PL1_CFG_TMP( conn, mid, pid, serdes, enums.Layer1ConfigType.LT_INITIAL_MODULATION ).get(), ) total_bit_count = (info.prbs_total_bits_high << 32) + info.prbs_total_bits_low total_error_bit_count = ( info.prbs_total_error_bits_high << 32 ) + info.prbs_total_error_bits_low ber = total_error_bit_count / total_bit_count if total_bit_count > 0 else float("nan") return dictionize_lt_status( status, info, ltconf, cfg, ber, total_bit_count, total_error_bit_count )
[docs] async def txtap_get(port: GenericL23Port, serdes: int) -> dict[str, int]: """ .. versionadded:: 1.1 Get the tap value of the local TX tap. :param port: the port object :type port: :class:`~xoa_driver.ports.GenericL23Port` :param serdes: the serdes index, starting from 0 :type serdes: int :return: tap values of the serdes :rtype: typing.Dict[str, int] """ conn, mid, pid = get_ctx(port) r = await commands.PP_PHYTXEQ(conn, mid, pid, serdes).get() return dictionize_txtap_get(r)
[docs] async def txtap_set( port: GenericL23Port, serdes: int, pre3: int, pre2: int, pre: int, main: int, post1: int, ) -> None: """ .. versionadded:: 1.1 Set the tap value of the local TX tap. :param port: the port object :type port: :class:`~xoa_driver.ports.GenericL23Port` :param serdes: the serdes index, starting from 0 :type serdes: int :param pre3: pre3 value :type pre3: int :param pre2: pre2 value :type pre2: int :param pre: pre value :type pre: int :param main: main value :type main: int :param post1: post1 value :type post1: int :return: :rtype: None """ conn, mid, pid = get_ctx(port) cmd_ = commands.PP_PHYTXEQ(conn, mid, pid, serdes) await cmd_.set( pre=pre, main=main, post=post1, pre2=pre2, pre3_post2=pre3, post3=0, )
[docs] async def anlt_status(port: GenericL23Port) -> dict[str, t.Any]: """ .. versionchanged:: 2.5 Get the overview of ANLT status :param port: the port object :type port: :class:`~xoa_driver.ports.GenericL23Port` :return: AN/LT status of the port :rtype: typing.Dict[str, typing.Any] """ # if not isinstance(port, LinkTrainingSupported): # raise NotSupportLinkTrainError(port) conn, mid, pid = get_ctx(port) _link_recovery, _anlt_op, _linktrain_cfg, _capabilities, _allow_loopback = await apply( commands.PL1_CFG_TMP( conn, mid, pid, 0, enums.Layer1ConfigType.AUTO_LINK_RECOVERY ).get(), commands.PL1_ANLT(conn, mid, pid).get(), commands.PL1_LINKTRAIN_CONFIG(conn, mid, pid).get(), commands.P_CAPABILITIES(conn, mid, pid).get(), commands.PL1_CFG_TMP(conn, mid, pid, 0, enums.Layer1ConfigType.AN_LOOPBACK).get(), ) return dictionize_anlt_status(_link_recovery, _anlt_op, _linktrain_cfg, _capabilities, _allow_loopback)
[docs] async def anlt_log(port: GenericL23Port) -> str: """ .. versionadded:: 1.1 Get the anlt log messages :param port: the port object :type port: :class:`~xoa_driver.ports.GenericL23Port` :return: AN/LT protocol log traces of the port :rtype: str """ conn, mid, pid = get_ctx(port) log = await commands.PL1_LOG(conn, mid, pid).get() return log.log_string
[docs] async def anlt_stop(port: GenericL23Port) -> None: """ .. versionchanged:: 2.5 Stop AN & LT :param port: the port object :type port: :class:`~xoa_driver.ports.GenericL23Port` """ conn, mid, pid = get_ctx(port) await commands.PL1_ANLT(conn, mid, pid).set( an_mode=enums.FreyaAutonegMode.DISABLED, lt_mode=enums.FreyaLinkTrainingMode.DISABLED )
[docs] async def txtap_autotune(port: GenericL23Port, serdes: int) -> None: """ .. versionadded:: 1.3 Auto tune the tap value of the local TX tap. :param port: the port object :type port: :class:`~xoa_driver.ports.GenericL23Port` :param serdes: the serdes index, starting from 0 :type serdes: int :return: :rtype: None """ conn, mid, pid = get_ctx(port) phy_autotune = commands.PP_PHYAUTOTUNE(conn, mid, pid, serdes) await phy_autotune.set(on_off=enums.OnOff.OFF) await phy_autotune.set(on_off=enums.OnOff.ON)
async def lt_im_status(port: GenericL23Port) -> dict[str, t.Any]: """ .. versionadded:: 1.3 Get LT initial modulation config :param port: the port object :type port: :class:`~xoa_driver.ports.GenericL23Port` :return: LT initial modulation configuration of the port :rtype: typing.Dict[str, typing.Any] """ # if not isinstance(port, LinkTrainingSupported): # raise NotSupportLinkTrainError(port) conn, mid, pid = get_ctx(port) capabilities = await commands.P_CAPABILITIES(conn, mid, pid).get() initial_mods = {} for i in range(0, capabilities.serdes_count): im = await commands.PL1_CFG_TMP(conn, mid, pid, i, enums.Layer1ConfigType.LT_INITIAL_MODULATION).get() initial_mods[str(i)] = enums.LinkTrainEncoding(im.values[0]).name return dictionize_lt_im_status(capabilities, initial_mods) async def lt_algorithm_status(port: GenericL23Port) -> dict[str, t.Any]: """ .. versionadded:: 1.3 Get LT initial modulation config :param port: the port object :type port: :class:`~xoa_driver.ports.GenericL23Port` :return: LT initial modulation configuration of the port :rtype: typing.Dict[str, typing.Any] """ # if not isinstance(port, LinkTrainingSupported): # raise NotSupportLinkTrainError(port) conn, mid, pid = get_ctx(port) capabilities = await commands.P_CAPABILITIES(conn, mid, pid).get() algorithms = {} for i in range(0, capabilities.serdes_count): alg = await commands.PL1_CFG_TMP(conn, mid, pid, i, enums.Layer1ConfigType.LT_TRAINING_ALGORITHM).get() algorithms[str(i)] = enums.LinkTrainAlgorithm(alg.values[0]).name return dictionize_lt_algorithm_status(capabilities, algorithms)
[docs] async def anlt_strict(port: GenericL23Port, enable: bool) -> None: """ .. versionadded:: 1.3 Should ANLT strict mode be enabled :param port: the port object :type port: :class:`~xoa_driver.ports.GenericL23Port` :param enable: should ANLT strict mode be enabled :type enable: bool :return: :rtype: None """ conn, mid, pid = get_ctx(port) capabilities = await commands.P_CAPABILITIES(conn, mid, pid).get() param = int(enable) for i in range(0, capabilities.serdes_count): await commands.PL1_CFG_TMP( conn, mid, pid, i, enums.Layer1ConfigType.ANLT_STRICT_MODE ).set(values=[param])
[docs] async def anlt_log_control(port: GenericL23Port, types: t.List[enums.AnLtLogControl]) -> None: """ .. versionadded:: 1.3 Control what should be logged for ANLT by xenaserver :param port: the port object :type port: :class:`~xoa_driver.ports.GenericL23Port` :param types: control what should be logged for ANLT by xenaserver :type types: t.List[enums.AnLtLogControl] :return: :rtype: None """ conn, mid, pid = get_ctx(port) capabilities = await commands.P_CAPABILITIES(conn, mid, pid).get() type_ = 0 for _t in types: type_ |= _t.value param = int(type_) for i in range(0, capabilities.serdes_count): await commands.PL1_CFG_TMP( conn, mid, pid, i, enums.Layer1ConfigType.ANLT_LOG_CONTROL ).set(values=[param])
async def anlt_log_control_get(port: GenericL23Port) -> dict[str, bool]: """ .. versionadded:: 2.7 Get ANLT log control config :param port: the port object :type port: :class:`~xoa_driver.ports.GenericL23Port` :return: dict of log control status :rtype: dict[str, bool] """ conn, mid, pid = get_ctx(port) resp = await commands.PL1_CFG_TMP( conn, mid, pid, 0, enums.Layer1ConfigType.ANLT_LOG_CONTROL ).get() return dictionize_anlt_log_ctrl_status(resp.values) __all__ = ( "anlt_link_recovery", "anlt_log", "anlt_start", "anlt_status", "anlt_stop", "autoneg_status", "lt_coeff_inc", "lt_coeff_dec", "lt_encoding", "lt_preset", "lt_status", "lt_trained", "txtap_get", "txtap_set", "txtap_autotune", "lt_im_status", "lt_algorithm_status", "anlt_strict", "anlt_log_control", "anlt_log_control_get", )