Source code for xoa_driver.functions.mgmt

"""
The resource management high-level function module, 
including testers, modules, ports, and streams.
"""

from __future__ import annotations
import asyncio
from typing import (
    TYPE_CHECKING,
    Any,
    Union,
    List,
    Tuple,
)
from xoa_driver import enums, ports
from xoa_driver.utils import apply
if TYPE_CHECKING:
    from xoa_driver.ports import GenericL23Port, Z800FreyaPort, Z1600EdunPort, GenericAnyPort, E100ChimeraPort
    from xoa_driver.modules import GenericAnyModule, GenericL23Module, Z800FreyaModule, Z1600EdunModule, E100ChimeraModule
    from xoa_driver.testers import L23Tester
    FreyaEdunModule = Union[Z800FreyaModule, Z1600EdunModule]
    FreyaEdunPort = Union[Z800FreyaPort, Z1600EdunPort]
    from xoa_driver.internals.commands.enums import MediaConfigurationType
from .exceptions import (
    NotSupportMedia,
    NotSupportPortSpeed,
    NotSupportMediaPortSpeed,
)
from .tools import MODULE_EOL_INFO
from itertools import chain  # type: ignore[Pylance false warning]
from datetime import datetime
import json


# region Testers
[docs] async def reserve_tester(tester: L23Tester, force: bool = True) -> None: """ Reserve a tester regardless whether it is owned by others or not. :param tester: The tester to reserve :type tester: :class:`~xoa_driver.testers.L23Tester` :param force: Should force reserve the tester :type force: boolean :return: :rtype: None """ await release_tester(tester, force) await tester.reservation.set_reserve()
[docs] async def release_tester( tester: L23Tester, should_release_modules_ports: bool = False, ) -> None: """ Free a tester. If the tester is reserved by you, release the tester. If the tester is reserved by others, relinquish the tester. The tester should have no owner afterwards. :param tester: The tester to free :type tester: :class:`~xoa_driver.testers.L23Tester` :param should_release_modules_ports: should modules and ports also be freed, defaults to False :type should_release_modules_ports: bool, optional :return: :rtype: None """ r = await tester.reservation.get() if r.operation == enums.ReservedStatus.RESERVED_BY_OTHER: await tester.reservation.set_relinquish() elif r.operation == enums.ReservedStatus.RESERVED_BY_YOU: await tester.reservation.set_release() if should_release_modules_ports: await asyncio.gather(*(release_modules(list(tester.modules), True)))
[docs] async def get_chassis_sys_uptime(tester: L23Tester) -> int: """ Get chassis system uptime in seconds :param tester: The tester to free :type tester: :class:`~xoa_driver.testers.L23Tester` :return: Chassis system uptime in seconds :rtype: int """ resp = await tester.health.uptime.get() info_js = resp.info info_dict = json.loads(info_js) result = info_dict['1']['data']['uptime_secs'] return result
# endregion # region Modules
[docs] async def obtain_modules_by_ids(tester: L23Tester, module_ids: List[str], reserve: bool = False) -> Tuple[GenericL23Module | E100ChimeraModule, ...]: """ Get the module objects of the tester specified by module index ids :param tester: The tester object :type tester: :class:`~xoa_driver.testers.L23Tester` :param module_ids: the index ids of the modules. Use "*" to get all modules. If the list is empty, return all modules of the tester :type module_ids: List[str] :param reserve: should reserve the modules, defaults to False :type reserve: bool, optional :raises NoSuchModuleError: No such a module index on the tester :return: module objects :rtype: List[:class:`~xoa_driver.modules.GenericL23Module` | :class:`~xoa_driver.modules.E100ChimeraModule`] """ if len(module_ids) == 0 or "*" in module_ids: module_list = [m for m in tuple(tester.modules)] if reserve: await reserve_modules(modules=module_list, force=reserve) return tuple(tester.modules) else: module_list = [tester.modules.obtain(int(module_id)) for module_id in module_ids] if reserve: await reserve_modules(modules=module_list, force=reserve) return tuple(tester.modules.obtain(int(module_id)) for module_id in module_ids)
[docs] async def obtain_module_by_id(tester: L23Tester, module_id: str, reserve: bool = False) -> Union[GenericL23Module, E100ChimeraModule]: """ Get the module object of the tester specified by module index id :param tester: The tester object :type tester: :class:`~xoa_driver.testers.L23Tester` :param module_id: the index id of the module. :type module_id: str :param reserve: should reserve the module, defaults to False :type reserve: bool, optional :raises NoSuchModuleError: No such a module index on the tester :return: module object :rtype: Union[:class:`~xoa_driver.modules.GenericL23Module`, :class:`~xoa_driver.modules.E100ChimeraModule`] """ if reserve: await reserve_modules([tester.modules.obtain(int(module_id))], force=reserve) return tester.modules.obtain(int(module_id))
[docs] async def obtain_module_by_port_id(tester: L23Tester, port_id: str, separator: str = "/", reserve: bool = False) -> Union[GenericL23Module, E100ChimeraModule]: """ Get the module object of the tester specified by the port index id :param tester: The tester object :type tester: :class:`~xoa_driver.testers.L23Tester` :param port_id: the index id of the port. :type port_id: str :param separator: The separator between module index and port index in port id, defaults to "/" :type separator: str, optional :param reserve: should reserve the module, defaults to False :type reserve: bool, optional :raises NoSuchModuleError: No such a module index on the tester :return: module object :rtype: Union[:class:`~xoa_driver.modules.GenericL23Module`, :class:`~xoa_driver.modules.E100ChimeraModule`] """ if separator not in port_id: raise ValueError(f"Invalid port_id format: {port_id}. Expected format 'm{separator}p'.") module_id = port_id.split(separator)[0] if reserve: await reserve_modules([tester.modules.obtain(int(module_id))], force=reserve) return tester.modules.obtain(int(module_id))
[docs] async def reserve_modules(modules: List[GenericL23Module | E100ChimeraModule], force: bool = True) -> None: """ Reserve modules regardless whether they are owned by others or not. :param modules: The modules to reserve :type modules: List[Union[:class:`~xoa_driver.modules.GenericL23Module`, :class:`~xoa_driver.modules.E100ChimeraModule`]] :param force: Should force reserve the module, defaults to True :type force: boolean :return: :rtype: None """ await release_modules(modules, force) await asyncio.gather(*(module.reservation.set_reserve() for module in modules))
[docs] async def release_modules( modules: List[GenericL23Module | E100ChimeraModule], should_release_ports: bool = False ) -> None: """ Free modules. If a module is reserved by you, release the module. If a module is reserved by others, relinquish the module. The modules should have no owner afterwards. :param module: The module to free :type module: Union[:class:`~xoa_driver.modules.GenericL23Module`, :class:`~xoa_driver.modules.E100ChimeraModule`] :param should_release_ports: should ports also be freed, defaults to False :type should_release_ports: bool, optional :return: :rtype: None """ for module in modules: r = await module.reservation.get() if r.operation == enums.ReservedStatus.RESERVED_BY_OTHER: await module.reservation.set_relinquish() elif r.operation == enums.ReservedStatus.RESERVED_BY_YOU: await module.reservation.set_release() if should_release_ports: await release_ports(list(module.ports))
[docs] def get_module_supported_configs( module: Union[GenericL23Module, E100ChimeraModule], ) -> List[Tuple[MediaConfigurationType, int, int]]: """ Get the module's supported configurations in a list. :param module: The module object :type module: Union[GenericL23Module, E100ChimeraModule] :return: List of tuple(supported media, port count, port speed) (The port speed in Mbps, e.g. 40000 for 40G) :rtype: List[Tuple[MediaConfigurationType, int, int]] """ supported_media_list = [] for media_item in module.info.media_info_list: # type: ignore for port_speed_config in media_item.supported_configs: supported_media_list.append((media_item.cage_type, port_speed_config.port_count, port_speed_config.port_speed)) return supported_media_list
[docs] async def set_module_config( module: Union[GenericL23Module, E100ChimeraModule], media: enums.MediaConfigurationType, port_count: int, port_speed: int, force: bool = True, ) -> None: """Change the module configuration to the target media, port count and port speed. :param module: the module object :type module: Union[GenericL23Module, E100ChimeraModule] :param media: the target media for the module :type media: enums.MediaConfigurationType :param port_count: the target port count :type port_count: int :param port_speed: the target port speed in Mbps, e.g. 40000 for 40G :type port_speed: int :param force: should forcibly reserve the module, defaults to True :type force: bool, optional :raises NotSupportMediaPortSpeed: the provided media, port count and port speed configuration is not supported by the module """ await set_module_configs([(module, media, port_count, port_speed)], force)
[docs] async def set_module_configs(module_configs: List[Tuple[Union[GenericL23Module, E100ChimeraModule], enums.MediaConfigurationType, int, int]], force: bool = True) -> None: """Configure multiple modules with specified media, port count and port speed. :param module_configs: List of module configuration tuples. Each tuple contains (module object, target media, target port count, target port speed in Mbps, should forcibly reserve the module) :type module_configs: List[Tuple[Union[GenericL23Module, E100ChimeraModule], enums.MediaConfigurationType, int, int, bool]] :param force: should forcibly reserve the modules, defaults to True :type force: bool, optional :raises NotSupportMediaPortSpeed: one of the provided media, port count and port speed configuration is not supported by the corresponding module. """ # reserve the modules await reserve_modules([module for (module, _, _, _) in module_configs], force) for module_config in module_configs: module, media, port_count, port_speed = module_config # get the supported media supported_media_list = get_module_supported_configs(module) # set the module media if the target media is found in supported media for item in supported_media_list: if all( ( item[0] == media, item[1] == port_count, item[2] == port_speed, ) ): portspeed_list = [port_count] + port_count * [port_speed] await module.config.media.set(media_config=media) await module.config.port_speed.set(portspeed_list=portspeed_list) return None raise NotSupportMediaPortSpeed(module) # release the modules await release_modules([module for (module, _, _, _) in module_configs], False)
[docs] async def get_module_eol_date(module: Union[GenericL23Module, E100ChimeraModule]) -> str: """ Get module's End-of-Life date :param module: The module object :type module: Union[GenericL23Module, E100ChimeraModule] :return: Module's EOL date :rtype: str """ resp = await module.serial_number.get() module_key = str(resp.serial_number)[-2:] return MODULE_EOL_INFO.get(module_key, "2999-01-01")
[docs] async def get_module_eol_days(module: Union[GenericL23Module, E100ChimeraModule]) -> int: """ Get days until module's End-of-Life date :param module: The module object :type module: Union[GenericL23Module, E100ChimeraModule] :return: days until module's End-of-Life date :rtype: int """ eol_string = await get_module_eol_date(module) date1 = datetime.now() date2 = datetime.strptime(eol_string, "%Y-%M-%d") timedelta = date2 - date1 return timedelta.days
[docs] async def get_cage_insertions(module: Union[Z800FreyaModule, Z1600EdunModule]) -> Tuple[int, ...]: """ Get module cage insertion count of each cage :param module: The Z800 Freya/Z1600 Edun module object :type module: Union[Z800FreyaModule, Z1600EdunModule] :return: Insertion count of each cage :rtype: Tuple[int, ...] """ resp = await module.health.cage_insertion.get() info_js = resp.info info_dict = json.loads(info_js) result = tuple(cage['insert_count'] for cage in info_dict['1']['data']) return result
[docs] async def get_cage_count(module: Union[Z800FreyaModule, Z1600EdunModule]) -> int: """ Get module cage count :param module: The Z800 Freya/Z1600 Edun module object :type module: Union[Z800FreyaModule, Z1600EdunModule] :return: The number of cages in the module :rtype: int """ resp = await module.health.cage_insertion.get() info_js = resp.info info_dict = json.loads(info_js) result = len(info_dict['1']['data']) return result
# endregion # region Ports
[docs] async def obtain_ports_by_ids(tester: L23Tester, port_ids: List[str], separator: str = "/", reserve: bool = False) -> tuple[Union[GenericL23Port, E100ChimeraPort], ...]: """ Get ports of the tester specified by port ids :param tester: The tester object :type tester: :class:`~xoa_driver.testers.L23Tester` :param port_ids: The port ids. The port index with format ``m/p``, m is module index, p is port index, e.g. ["1/3", "2/4"]. Use ``1/*`` to get all ports of module 1. Use ``*/1`` to get port 1 of all modules. Use ``*``, or ``*/*`` to get all ports of all modules. Use an empty list to get all ports of all modules. :type port_ids: List[str] :param separator: The separator between module index and port index in port id, defaults to `/` :type separator: str, optional :param reserve: should reserve the ports, defaults to False :type reserve: bool, optional :return: List of port objects :rtype: tuple[Union[GenericL23Port, E100ChimeraPort]] """ returned_ports = [] if len(port_ids) == 0 or f"*{separator}*" in port_ids or f"*" in port_ids: # [] or ["*/*"] or ["*"] all_ports_ = (m.ports for m in tester.modules) if reserve: await reserve_ports(list(chain.from_iterable(all_ports_)), force=reserve) return tuple(chain.from_iterable(all_ports_)) else: for port_id in port_ids: if separator not in port_id: continue mid = port_id.split(separator)[0] if mid != "*": module = tester.modules.obtain(int(mid)) pid = port_id.split(separator)[1] if pid == "*": # ["1/*"] returned_ports.extend(list(module.ports)) else: # ["1/3"] returned_ports.append(module.ports.obtain(int(pid))) else: # ["*/1"] pid = port_id.split(separator)[1] for module in tester.modules: returned_ports.append(module.ports.obtain(int(pid))) if reserve: await reserve_ports(returned_ports, force=reserve) return tuple(returned_ports)
[docs] async def obtain_port_by_id(tester: L23Tester, port_id: str, separator: str = "/", reserve: bool = False) -> Union[GenericL23Port, E100ChimeraPort]: """ Get a port of the module :param tester: The tester object :type tester: :class:`~xoa_driver.testers.L23Tester` :param port_id: The port index with format "m/p", m is module index, p is port index, e.g. "1/3". Wildcard "*" is not allowed. :type port_id: str :param separator: The separator between module index and port index in port id, defaults to "/" :type separator: str, optional :param reserve: should reserve the port, defaults to False :type reserve: bool, optional :raises NoSuchPortError: No port found with the index :return: The port object :rtype: Union[GenericL23Port, E100ChimeraPort] """ if "*" in port_id: raise ValueError("Wildcard '*' is not allowed in port_id for obtain_port_by_id function.") if separator not in port_id: raise ValueError(f"Invalid port_id format: {port_id}. Expected format 'm{separator}p'.") port_obj = (await obtain_ports_by_ids(tester, [port_id], separator=separator, reserve=reserve))[0] return port_obj
[docs] async def reserve_ports(ports: list[Union[GenericL23Port, E100ChimeraPort]], force: bool = True, reset: bool = False) -> None: """ Reserve a port regardless whether it is owned by others or not. :param ports: The ports to reserve :type ports: list[Union[GenericL23Port, E100ChimeraPort]] :param force: Should force reserve the ports, defaults to True :type force: boolean, optional :param reset: Should reset the ports after reserving, defaults to False :type reset: boolean, optional :return: :rtype: None """ for port in ports: r = await port.reservation.get() if force and r.status == enums.ReservedStatus.RESERVED_BY_OTHER: await apply( port.reservation.set_relinquish(), port.reservation.set_reserve(), ) elif r.status == enums.ReservedStatus.RELEASED: await port.reservation.set_reserve() if reset: await port.reset.set()
[docs] async def release_ports(ports: List[Union[GenericL23Port, E100ChimeraPort]]) -> None: """ Free a port. If the port is reserved by you, release the port. If the port is reserved by others, relinquish the port. The port should have no owner afterwards. :param port: The port to free :type port: Union[GenericL23Port, E100ChimeraPort] :return: :rtype: None """ for port in ports: r = await port.reservation.get() if r.status == enums.ReservedStatus.RESERVED_BY_OTHER: await port.reservation.set_relinquish() elif r.status == enums.ReservedStatus.RESERVED_BY_YOU: await port.reservation.set_release()
[docs] async def reset_ports(ports: List[Union[GenericL23Port, E100ChimeraPort]]) -> None: """ Reset a list of ports. :param ports: The ports to reset :type ports: List[Union[GenericL23Port, E100ChimeraPort]] :return: :rtype: None """ await asyncio.gather(*(port.reset.set() for port in ports))
# endregion # region Streams
[docs] async def remove_streams(port: GenericL23Port) -> None: """ Remove all streams on a port witout resetting the port. :param port: The port object :type port: GenericL23Port """ await port.streams.server_sync() await asyncio.gather(*(s.delete() for s in port.streams))
# endregion __all__ = ( "reserve_tester", "release_tester", "get_chassis_sys_uptime", "obtain_modules_by_ids", "obtain_module_by_id", "obtain_module_by_port_id", "reserve_modules", "release_modules", "get_module_supported_configs", "set_module_configs", "set_module_config", "get_module_eol_date", "get_module_eol_days", "get_cage_insertions", "get_cage_count", "obtain_ports_by_ids", "obtain_port_by_id", "reserve_ports", "release_ports", "reset_ports", "remove_streams", )