from__future__importannotationsimportasyncioimporttypingastfromxoa_driverimportenums,testersfromxoa_driver.utilsimportapplyfromxoa_driver.internals.hli_v2.ports.port_l23.family_limportFamilyLfromxoa_driver.internals.hli_v2.ports.port_l23.family_l1importFamilyL1fromxoa_driver.portsimportGenericAnyPortfromxoa_driver.modulesimportGenericAnyModule,GenericL23Module,ModuleChimera,Z800FreyaModulefromxoa_driver.testersimportGenericAnyTester,L23Testerfrom.exceptionsimport(NotSupportMedia,NotSupportPortSpeed,)from.toolsimportMODULE_EOL_INFOfromitertoolsimportchain# type: ignore[Pylance false warning]fromdatetimeimportdatetimeimportjsonPcsPmaSupported=(FamilyL,FamilyL1)AutoNegSupported=(FamilyL,FamilyL1)LinkTrainingSupported=FamilyL# region Testers
[docs]asyncdefreserve_tester(tester:GenericAnyTester,force:bool=True)->None:""" .. versionadded:: 1.1 Reserve a tester regardless whether it is owned by others or not. :param tester: The tester to reserve :type tester: :class:`~xoa_driver.testers.GenericAnyTester` :param force: Should force reserve the tester :type force: boolean :return: :rtype: None """r=awaittester.reservation.get()ifforceandr.operation==enums.ReservedStatus.RESERVED_BY_OTHER:awaittester.reservation.set_relinquish()awaitasyncio.gather(*(free_module(m,True)formintester.modules))awaittester.reservation.set_reserve()elifr.operation==enums.ReservedStatus.RELEASED:awaittester.reservation.set_reserve()
[docs]asyncdeffree_tester(tester:GenericAnyTester,should_free_modules_ports:bool=False,)->None:""" .. versionadded:: 1.1 Free a tester. If the tester is reserved by you, release the tester. If the tester is reserved by others, relinquish the tester. The tester should have no owner afterwards. :param tester: The tester to free :type tester: :class:`~xoa_driver.testers.GenericAnyTester` :param should_free_modules_ports: should modules and ports also be freed, defaults to False :type should_free_modules_ports: bool, optional :return: :rtype: None """r=awaittester.reservation.get()ifr.operation==enums.ReservedStatus.RESERVED_BY_OTHER:awaittester.reservation.set_relinquish()elifr.operation==enums.ReservedStatus.RESERVED_BY_YOU:awaittester.reservation.set_release()ifshould_free_modules_ports:awaitasyncio.gather(*(free_module(m,True)formintester.modules))
[docs]asyncdefget_chassis_sys_uptime_sec(tester:L23Tester)->int:""" .. versionadded:: 2.7.2 Get chassis system uptime in seconds :param tester: The tester to free :type tester: :class:`~xoa_driver.testers.L23Tester` :return: Chassis system uptime in seconds :rtype: int """resp=awaittester.health.uptime.get()info_js=resp.infoinfo_dict=json.loads(info_js)result=info_dict['1']['data']['uptime_secs']returnresult
# endregion# region Modules
[docs]defget_module(tester:GenericAnyTester,module_id:int)->GenericAnyModule:""" .. versionadded:: 1.1 Get a module object of the tester. :param tester: The tester object :type tester: :class:`~xoa_driver.testers.GenericAnyTester` :param module_id: the index id of the module :type module_id: int :raises NoSuchModuleError: No such a module index on the tester :return: module object :rtype: :class:`~xoa_driver.modules.GenericAnyModule` """returntester.modules.obtain(module_id)
[docs]defget_modules(tester:GenericAnyTester)->tuple[GenericAnyModule,...]:""" .. versionadded:: 1.1 Get all modules of the tester :param tester: The tester object :type tester: :class:`~xoa_driver.testers.GenericAnyTester` :return: List of module objects :rtype: tuple[GenericAnyModule] """returntuple(tester.modules)
[docs]asyncdefreserve_module(module:GenericAnyModule,force:bool=True)->None:""" .. versionadded:: 1.1 Reserve a module regardless whether it is owned by others or not. :param module: The module to reserve :type module: :class:`~xoa_driver.modules.GenericAnyModule` :param force: Should force reserve the module, defaults to True :type force: boolean :return: :rtype: None """r=awaitmodule.reservation.get()ifforceandr.operation==enums.ReservedStatus.RESERVED_BY_OTHER:awaitfree_module(module,True)awaitmodule.reservation.set_reserve()elifr.operation==enums.ReservedStatus.RELEASED:awaitmodule.reservation.set_reserve()
[docs]asyncdeffree_module(module:GenericAnyModule,should_free_ports:bool=False)->None:""" .. versionadded:: 1.2 Free a module. If the module is reserved by you, release the module. If the module is reserved by others, relinquish the module. The module should have no owner afterwards. :param module: The module to free :type module: :class:`~xoa_driver.modules.GenericAnyModule` :param should_free_ports: should ports also be freed, defaults to False :type should_free_ports: bool, optional :return: :rtype: None """r=awaitmodule.reservation.get()ifr.operation==enums.ReservedStatus.RESERVED_BY_OTHER:awaitmodule.reservation.set_relinquish()elifr.operation==enums.ReservedStatus.RESERVED_BY_YOU:awaitmodule.reservation.set_release()ifshould_free_ports:awaitfree_ports(*module.ports)
[docs]defget_module_supported_media(module:GenericL23Module|ModuleChimera,)->list[dict[str,t.Any]]:""" .. versionadded:: 1.3 Get a list of supported media, port speed and count of the module. :param module: The module object :type module: GenericAnyModule :return: List of supported media, port speed and count :rtype: list[dict[str, t.Any]] """supported_media_list=[]item={}formedia_iteminmodule.info.media_info_list:# type: ignoreforsub_iteminmedia_item.available_speeds:item=dict()item["media"]=media_item.cage_typeitem["port_count"]=sub_item.port_countitem["port_speed"]=sub_item.port_speedsupported_media_list.append(item)returnsupported_media_list
[docs]asyncdefset_module_media_config(module:t.Union[GenericL23Module,ModuleChimera],media:enums.MediaConfigurationType,force:bool=True,)->None:""" .. versionadded:: 1.3 Set module's media configuration. :param module: The module object :type module: GenericAnyModule :param media: Target media for the module :type media: enums.MediaConfigurationType :param force: Should reserve the module by force, defaults to True :type force: bool, optional :raises NotSupportMedia: The module does not support this media type :return: :rtype: """# reserve the module firstawaitreserve_module(module,force)# get the supported mediasupported_media_list=get_module_supported_media(module)# set the module media if the target media is found in supported mediaforiteminsupported_media_list:ifitem["media"]==media:awaitmodule.media.set(media_config=media)returnNone# raise exception is the target media is not found in the supported mediaraiseNotSupportMedia(module)
[docs]asyncdefset_module_port_config(module:t.Union[GenericL23Module,ModuleChimera],port_count:int,port_speed:int,force:bool=True,)->None:""" .. versionadded:: 1.3 Set module's port-speed configuration :param module: The module object :type module: t.Union[GenericL23Module, ModuleChimera] :param port_count: The port count :type port_count: int :param port_speed: The port speed in Mbps, e.g. 40000 for 40G :type port_speed: int :param force: Should reserve the module by force, defaults to True :type force: bool, optional :raises NotSupportPortSpeed: The module does not support the port-speed configuration under its current media configuration :return: :rtype: """# reserve the module firstawaitfree_module(module,True)awaitreserve_module(module,force)# get the supported media by the modulesupported_media_list=get_module_supported_media(module)# get the current media of the modulereply=awaitmodule.media.get()current_media=reply.media_config# set the module port speed if we can find the port-speed in the corresponding mediaforiteminsupported_media_list:ifall((item["media"]==enums.MediaConfigurationType(current_media),item["port_count"]==port_count,item["port_speed"]==port_speed,)):portspeed_list=[port_count]+port_count*[port_speed]awaitmodule.cfp.config.set(portspeed_list=portspeed_list)awaitfree_module(module,False)returnNoneraiseNotSupportPortSpeed(module)
[docs]asyncdefget_module_eol_date(module:GenericAnyModule)->str:""" .. versionadded:: 1.3 Get module's End-of-Life date :param module: The module object :type module: GenericAnyModule :return: Module's EOL date :rtype: str """resp=awaitmodule.serial_number.get()module_key=str(resp.serial_number)[-2:]returnMODULE_EOL_INFO.get(module_key,"2999-01-01")
[docs]asyncdefget_module_eol_days(module:GenericAnyModule)->int:""" .. versionadded:: 1.3 Get days until module's End-of-Life date :param module: The module object :type module: GenericAnyModule :return: days until module's End-of-Life date :rtype: int """eol_string=awaitget_module_eol_date(module)date1=datetime.now()date2=datetime.strptime(eol_string,"%Y-%M-%d")timedelta=date2-date1returntimedelta.days
[docs]asyncdefget_module_cage_insertion_count(module:Z800FreyaModule,cage_index:int)->int:""" .. versionadded:: 2.7.2 Get module cage insertion count :param module: The Z800 Freya module object :type module: Z800FreyaModule :param cage_index: The cage index :type module: int :return: Insertion count of the cage :rtype: int """resp=awaitmodule.health.cage_insertion.get()info_js=resp.infoinfo_dict=json.loads(info_js)if0<=cage_index<len(info_dict['1']['data']):result=info_dict['1']['data'][cage_index]['insert_count']elifcage_index<0:result=-1else:result=-1returnresult
# endregion# region Ports
[docs]defget_all_ports(tester:GenericAnyTester)->tuple[GenericAnyPort,...]:""" .. versionadded:: 1.1 Get all ports of the tester :param tester: The tester object :type tester: :class:`~xoa_driver.testers.GenericAnyTester` :return: List of port objects :rtype: tuple[GenericAnyPort] """all_ports_=(m.portsforminget_modules(tester))returntuple(chain.from_iterable(all_ports_))
[docs]defget_ports(tester:GenericAnyTester,module_id:int)->tuple[GenericAnyPort,...]:""" .. versionadded:: 1.1 Get all ports of the module :param tester: The tester object :type tester: :class:`~xoa_driver.testers.GenericAnyTester` :param module_id: The module index :type module_id: int :return: List of port objects :rtype: tuple[GenericAnyPort] """module=get_module(tester,module_id)returntuple(module.ports)
[docs]defget_port(tester:GenericAnyTester,module_id:int,port_id:int)->GenericAnyPort:""" .. versionadded:: 1.1 Get a port of the module :param tester: The tester object :type tester: :class:`~xoa_driver.testers.GenericAnyTester` :param module_id: The module index :type module_id: int :param port_id: The port index :type port_id: int :raises NoSuchPortError: No port found with the index :return: The port object :rtype: GenericAnyPort """module=get_module(tester,module_id)returnmodule.ports.obtain(port_id)
[docs]asyncdefreserve_port(port:GenericAnyPort,force:bool=True)->None:""" .. versionadded:: 1.1 Reserve a port regardless whether it is owned by others or not. :param port: The port to reserve :type port: GenericAnyPort :param force: Should force reserve the port :type force: boolean :return: :rtype: None """r=awaitport.reservation.get()ifforceandr.status==enums.ReservedStatus.RESERVED_BY_OTHER:awaitapply(port.reservation.set_relinquish(),port.reservation.set_reserve(),)elifr.status==enums.ReservedStatus.RELEASED:awaitport.reservation.set_reserve()
[docs]asyncdefreset_port(port:GenericAnyPort)->None:""" .. versionadded:: 1.1 Reserve and reset a port :param port: The port to reset :type port: GenericAnyPort :return: :rtype: None """awaitreserve_port(port,False)awaitport.reset.set()
[docs]asyncdeffree_port(port:GenericAnyPort)->None:""" .. versionadded:: 1.1 Free a port. If the port is reserved by you, release the port. If the port is reserved by others, relinquish the port. The port should have no owner afterwards. :param port: The port to free :type port: GenericAnyPort :return: :rtype: None """r=awaitport.reservation.get()ifr.status==enums.ReservedStatus.RESERVED_BY_OTHER:awaitport.reservation.set_relinquish()elifr.status==enums.ReservedStatus.RESERVED_BY_YOU:awaitport.reservation.set_release()
[docs]asyncdeffree_ports(*ports:GenericAnyPort)->None:""" .. versionadded:: 1.1 Free all ports on a module. :param port: The port to free :type port: GenericAnyPort """awaitasyncio.gather(*(free_port(port=p)forpinports))
# endregion# region Streams
[docs]asyncdefremove_streams(port:GenericAnyPort)->None:""" .. versionadded:: 2.1 Remove all streams on a port. :param module: The port object :type module: GenericAnyPort """awaitport.streams.server_sync()awaitasyncio.gather(*(s.delete()forsinport.streams))