from __future__ import annotations
from dataclasses import dataclass, field
import re
import typing as t
import inspect
import ipaddress
from enum import Enum
from xoa_driver.internals import commands
from xoa_driver import enums
from xoa_driver.testers import GenericAnyTester
from xoa_driver.modules import GenericAnyModule
from xoa_driver.ports import GenericAnyPort
from xoa_driver.misc import ArpChunk, NdpChunk
from xoa_driver.utils import apply_iter
from xoa_driver.internals.core.token import Token
from xoa_driver.internals.core.transporter.protocol.payload import Hex
from xoa_driver.internals.core.transporter.protocol.struct_request import Request
from xoa_driver.internals.core.transporter.protocol._constants import CommandType
from xoa_driver.internals.core.transporter._typings import (XoaCommandType, ICmdOnlyGet, ICmdOnlySet)
class ICmdOnlyGett(ICmdOnlyGet, t.Protocol):
__name__: t.ClassVar[str]
class ICmdOnlySett(ICmdOnlySet, t.Protocol):
__name__: t.ClassVar[str]
def build_set_requestt(cls: ICmdOnlySett, **kwargs) -> Request:
indices = kwargs.pop("indices", [])
module = kwargs.pop("module", None)
port = kwargs.pop("port", None)
req_values = cls.SetDataAttr(**kwargs)
return Request(class_name=cls.__name__, cmd_type=CommandType.COMMAND_VALUE, cmd_code=cls.code, module_index=module, port_index=port, indices=indices, values=req_values)
def build_get_requestt(cls: ICmdOnlyGett, **kwargs) -> Request:
indices = kwargs.pop("indices", [])
module = kwargs.pop("module", None)
port = kwargs.pop("port", None)
req_values = None
return Request(class_name=cls.__name__, cmd_type=CommandType.COMMAND_QUERY, cmd_code=cls.code, module_index=module, port_index=port, indices=indices, values=req_values)
module = port = r"\d+"
index = r"((0x|0X)?[A-Fa-f\d]+)"
zero_or_more_space = r"\s*"
module_port_group = r"((?P<module>\d+)(/(?P<port>\d+))?\s*)?"
command_name_group = r"(?P<command_name>[A-Z_a-z0-9]+\s*)"
indices_group = r"(?P<indices>(\[((0x|0X)?[A-Fa-f\d]+)(,?\s*((0x|0X)?[A-Fa-f\d]+))?(,?\s*((0x|0X)?[A-Fa-f\d]+))?\]\s*)?)"
params_group = r"(?P<params>.*)"
command_pattern = re.compile(module_port_group + command_name_group + indices_group + params_group)
class Unpassed:
"""
None has special meaning, so this empty type is needed for parameters which are not passed.
"""
pass
@dataclass
class Body:
command_name: str = ""
cmd_class: XoaCommandType = XoaCommandType # type: ignore
type: CommandType = CommandType.COMMAND_STATUS
module: t.Optional[int] = None
port: t.Optional[int] = None
square_indices: list[int] = field(default_factory=list)
values: dict = field(default_factory=dict)
def as_request(
self,
module_num: int | None | Unpassed = Unpassed(),
port_num: int | None | Unpassed = Unpassed(),
indices_num: list[int] | Unpassed = Unpassed(),
) -> Request:
if not isinstance(indices_num, Unpassed):
self.square_indices = indices_num
if not isinstance(module_num, Unpassed):
self.module = module_num
if not isinstance(port_num, Unpassed):
self.port = port_num
dic = dict(indices=self.square_indices, module=self.module, port=self.port, **self.values,)
return build_get_requestt(self.cmd_class, **dic) if self.type == CommandType.COMMAND_QUERY else build_set_requestt(self.cmd_class, **dic) # type: ignore
class CLIConverter:
@classmethod
def _read_indices(cls, indices_str: str) -> list[int]:
result = []
ind = indices_str.strip().strip("[").strip("]").split(",")
for i in ind:
if not i:
continue
try:
if "0x" in i or "0X" in i:
result.append(int(i.strip(), 16))
else:
result.append(int(i.strip()))
except Exception:
raise ValueError(f"Invalid indices str {indices_str}!")
return result
@classmethod
def _special_read(cls, class_name: str, or_params: list[str]) -> list[str]:
params = or_params
if class_name == "C_DOWN" and params.index("-1480937026") == 0:
params.pop(0)
elif class_name == "P_ARPRXTABLE":
params = []
if or_params:
orr_params = or_params[0].replace("0x", "").replace("0X", "")
params = [orr_params[i: i + 26] for i in range(0, len(orr_params), 26)]
elif class_name == "P_NDPRXTABLE":
params = []
if or_params:
orr_params = or_params[0].replace("0x", "").replace("0X", "")
params = [orr_params[i: i + 50] for i in range(0, len(orr_params), 50)]
return params
@classmethod
def _special_add(cls, class_name: str, dic: dict) -> dict:
if class_name in ("C_DOWN", "C_FILEFINISH", "M_UPGRADE"):
dic["magic"] = -1480937026
elif class_name == "M_FPGAREIMAGE":
dic["key_code"] = 42
elif class_name == "PP_EYEMEASURE":
dic["dummy"] = []
elif class_name == "PP_PHYTXEQ":
dic["mode"] = 4
return dic
@classmethod
def _read_response_values(cls, or_params: list[str], cmd_class: t.Type) -> dict:
dic = {}
attr_set = getattr(cmd_class, "set")
class_name = cmd_class.__name__
values = list(inspect.signature(attr_set).parameters.values())
params = cls._special_read(class_name, or_params)
func_sigs: list[inspect.Parameter] = []
list_index = -1
for i, v in enumerate(values):
if v.name == "self":
continue
if "list[" in str(v.annotation).lower():
list_index = i - 1
func_sigs.append(v)
if list_index == -1:
dic = {f.name: cls._bind_one_param(class_name, p, f.annotation, f.name) for p, f in zip(params, func_sigs)}
else:
element_sig = (values[list_index + 1].annotation.replace("typing.", "").replace("list[", "").replace("List[", "").replace("]", ""))
dic = cls._bind_has_list(class_name, params, func_sigs, list_index, element_sig)
dic = cls._special_add(class_name, dic)
return dic
@classmethod
def _bind_has_list(cls, class_name: str, params: list[str], func_sigs: list[inspect.Parameter], list_index: int, element_sig: str,) -> dict[str, t.Any]:
dic = {}
name = func_sigs[list_index].name
fore_sigs = func_sigs[:list_index]
fore_params = params[:list_index]
back_sigs = func_sigs[list_index + 1:]
back_params = params[-len(back_sigs):]
if len(func_sigs) == 1: # only one param and is list
dic.update({name: [cls._bind_one_param(class_name, p, element_sig) for p in params]})
elif len(fore_params) == len(fore_sigs) != 0: # others followed by a list
list_params = params[list_index:]
dic.update({f.name: cls._bind_one_param(class_name, p, f.annotation, f.name) for p, f in zip(fore_params, fore_sigs)})
dic[name] = [cls._bind_one_param(class_name, b, element_sig) for b in list_params]
elif len(back_params) == len(back_sigs) != 0: # a list follow by others
list_params = params[: -(len(func_sigs) - list_index - 1)]
dic.update({f.name: cls._bind_one_param(class_name, p, f.annotation, f.name) for p, f in zip(back_params, back_sigs)})
dic[name] = [cls._bind_one_param(class_name, b, element_sig) for b in list_params]
return dic
@classmethod
def _basic_cast(
cls, string_param: str, type_name: str
) -> str | int | float | Hex | None:
basics = {"str": str, "int": int, "float": float, "Hex": Hex}
basic_type = basics.get(type_name, None)
if basic_type is not None:
try:
if basic_type is str:
return str(string_param).strip('"').strip("'")
elif basic_type is Hex:
s = string_param.replace("0x", "").replace("0X", "")
if len(s) % 2:
s = "0" + s
return Hex(s)
return basic_type(string_param)
except ValueError:
return None
@classmethod
def _ip_cast(
cls,
string_param: str,
type_name: str,
) -> ipaddress.IPv4Address | ipaddress.IPv6Address | None:
ips = {"ipaddress.IPv4Address": ipaddress.IPv4Address, "IPv4Address": ipaddress.IPv4Address,
"ipaddress.IPv6Address": ipaddress.IPv6Address, "IPv6Address": ipaddress.IPv6Address}
ips_cast = ips.get(type_name, None)
if ips_cast is not None:
try:
return ips_cast(string_param)
except Exception:
pass
try:
# input is int-IPv4 but in a str form:
# ipaddress.ip_address(3232235788) -> IPv4Address('192.168.1.12')
return ips_cast(int(string_param))
except Exception:
pass
try:
# input is IPV6 but in a str form:
# ipaddress.ip_address(int('0x10000000000000000000000000000000'), 16) -> IPv6Address('1000::')
return ips_cast(int(string_param, 16))
except Exception:
pass
return None
@classmethod
def _enum_cast(cls, string_param: str, type_name: str) -> Enum | None:
enum_cast = getattr(enums, type_name, None)
if enum_cast is not None:
try:
return enum_cast(int(string_param))
except Exception:
pass
try:
return enum_cast[string_param.upper()]
except Exception:
pass
return None
@classmethod
def _special_cast(cls, class_name: str, string_param: str, type_name: str, param_name: str) -> t.Any:
if type_name == "ProtocolOption":
enum_cast = getattr(enums, type_name, None)
if enum_cast is not None:
try:
i = int(string_param)
return enum_cast(256 + i)
except Exception:
return None
elif type_name == "SMAInputFunction":
enum_cast = getattr(enums, type_name, None)
if enum_cast is not None and string_param == "NOTUSED":
return enum_cast["NOT_USED"]
elif type_name == "SourceType":
enum_cast = getattr(enums, type_name, None)
if enum_cast is not None:
return {"TXIFG": enum_cast["TX_IFG"], "TXLEN": enum_cast["TX_LEN"],
"RXIFG": enum_cast["RX_IFG"], "RXLEN": enum_cast["RX_LEN"],
"RXLAT": enum_cast["RX_LATENCY"], "RXJIT": enum_cast["RX_JITTER"]}.get(string_param, None)
elif type_name == "LinkTrainingMode":
enum_cast = getattr(enums, type_name, None)
if enum_cast is not None and string_param == "AUTO":
return enum_cast["START_AFTER_AUTONEG"]
elif type_name == "TimeoutMode":
enum_cast = getattr(enums, type_name, None)
if enum_cast is not None and string_param == "DEFAULT_TIMEOUT":
return enum_cast["DEFAULT"]
elif class_name == "P_MULTICASTHDR":
enum_cast = getattr(enums, type_name, None)
if enum_cast is not None:
return enum_cast[string_param.upper().replace("DEI_", "")]
elif class_name == "P_CHECKSUM":
return {"ON": 14, "OFF": 0}.get(string_param, None)
elif class_name == "P_ARPRXTABLE":
ipv4_address = ipaddress.IPv4Address(bytes.fromhex(string_param[0:8]))
prefix = int.from_bytes(bytes.fromhex(string_param[8:12]), "big")
patched_mac = enums.OnOff(int.from_bytes(bytes.fromhex(string_param[12:14]), "big"))
mac_address = Hex(string_param[14:])
chunk = ArpChunk(ipv4_address, prefix, patched_mac, mac_address)
return chunk
elif class_name == "P_NDPRXTABLE":
ipv6_address = ipaddress.IPv6Address(bytes.fromhex(string_param[0:32]))
prefix = int.from_bytes(bytes.fromhex(string_param[32:36]), "big")
patched_mac = enums.OnOff(int.from_bytes(bytes.fromhex(string_param[36:38]), "big"))
mac_address = Hex(string_param[38:])
chunk = NdpChunk(ipv6_address, prefix, patched_mac, mac_address)
return chunk
elif class_name == "PL1_LINKTRAIN_CMD":
s = string_param.upper()
t1 = ("PRE1", "MAIN", "POST", "PRE2", "PRE3")
t2 = ("PRESET_1", "PRESET_2", "PRESET_3", "PRESET_4", "PRESET_5")
t3 = ("PAM2", "PAM4", "PAM4_WITH_PRECODING")
if s in t1:
return t1.index(s)
elif s in t2:
return t2.index(s)
elif s in t3:
return t3.index(s)
elif class_name == "PP_LINKFLAP_PARAMS" and param_name == "repetition":
if string_param.upper() == "INFINITE":
return 0
else:
return int(string_param)
@classmethod
def _bind_one_param(
cls, class_name: str, string_param: str, type_name: str, param_name: str = ""
) -> t.Any:
basic_cast = cls._basic_cast(string_param, type_name)
if basic_cast is not None:
return basic_cast
ip_cast = cls._ip_cast(string_param, type_name)
if ip_cast is not None:
return ip_cast
enum_cast = cls._enum_cast(string_param, type_name)
if enum_cast is not None:
return enum_cast
special_cast = cls._special_cast(class_name, string_param, type_name, param_name)
if special_cast is not None:
return special_cast
raise ValueError(f"Cannot bind param '{string_param}' to type '{type_name}'!")
@classmethod
def _parse_param_string(cls, param_string: str) -> list[str]:
"""Handle extreme cases where \',\",\\\',\\\" or string inside param_string."""
(
OUTSIDE,
INSIDE_DOUBLE_QUOTE,
INSIDE_SINGLE_QUOTE,
ESCAPED_DOUBLE_QUOTE,
ESCAPED_SINGLE_QUOTE,
) = range(5)
state = OUTSIDE
string_list = []
buf = []
for c in param_string:
if state == OUTSIDE:
if c in (" ", "\t", "\r", "\n", "\f"):
string = "".join(buf).strip()
if string:
string_list.append(string)
if buf:
buf = []
elif c == '"':
state = INSIDE_DOUBLE_QUOTE
elif c == "'":
state = INSIDE_SINGLE_QUOTE
else:
buf.append(c)
elif state == INSIDE_DOUBLE_QUOTE:
if c == "\\":
state = ESCAPED_DOUBLE_QUOTE
elif c == '"':
string_list.append("".join(buf))
buf = []
state = OUTSIDE
else:
buf.append(c)
elif state == INSIDE_SINGLE_QUOTE:
if c == "\\":
state = ESCAPED_SINGLE_QUOTE
elif c == "'":
string_list.append("".join(buf))
buf = []
state = OUTSIDE
else:
buf.append(c)
elif state == ESCAPED_DOUBLE_QUOTE:
buf.append(c)
state = INSIDE_DOUBLE_QUOTE
elif state == ESCAPED_SINGLE_QUOTE:
buf.append(c)
state = INSIDE_SINGLE_QUOTE
if state != OUTSIDE:
s = '"' if state == INSIDE_DOUBLE_QUOTE else "'"
raise ValueError(f"String not complete: ({s}{''.join(buf)}).")
if buf:
string_list.append("".join(buf).strip())
return string_list
@classmethod
def _read_params(
cls, param_string: str, cmd_class: t.Type
) -> tuple[CommandType, dict]:
params = cls._parse_param_string(param_string)
if params == ["?"]:
return CommandType.COMMAND_QUERY, {}
return CommandType.COMMAND_VALUE, cls._read_response_values(params, cmd_class)
@classmethod
def _read_command_name(cls, command_name: str) -> t.Type:
return getattr(commands, command_name.upper())
@classmethod
def _read_int(cls, n: str | int | None) -> int | None:
if n is None:
return None
try:
if isinstance(n, str) and ("0x" in n or "0X" in n):
return int(n.strip(), 16)
return int(n)
except Exception:
return None
@classmethod
def convert_each_command(cls, command: str) -> Body | None:
result = re.search(command_pattern, command)
if not result:
return None
command_name = result.group("command_name").strip() or ""
cmd_class = cls._read_command_name(command_name)
module_index = cls._read_int(result.group("module"))
port_index = cls._read_int(result.group("port"))
indices = cls._read_indices(result.group("indices"))
body_type, values = cls._read_params(result.group("params") or "", cmd_class)
body = Body(command_name, cmd_class, body_type, module_index, port_index, indices, values)
return body
@classmethod
def read_commands_from_file(
cls, path: str, comment_start: tuple[str, ...] = (";", "#", "//")
) -> t.Generator[Body, None, None]:
with open(path, "r") as f:
for line in f:
buf = line.strip()
for i, char in enumerate(buf):
if char in comment_start:
buf = buf[:i]
break
if not buf:
continue
r = cls.convert_each_command(buf)
if r is not None:
yield r
@classmethod
def read_commands_from_string(
cls, long_str: str, comment_start: tuple[str, ...] = (";", "#", "//")
) -> t.Generator[Body, None, None]:
for line in long_str.split("\n"):
buf = line.strip()
for i, char in enumerate(buf):
if char in comment_start:
buf = buf[:i]
break
if not buf:
continue
r = cls.convert_each_command(buf)
if r is not None:
yield r
read_commands_from_file = CLIConverter.read_commands_from_file
read_commands_from_string = CLIConverter.read_commands_from_string
def upload_config_from(obj: GenericAnyTester | GenericAnyModule | GenericAnyPort, long_str: str,
is_file: bool, mode: str, comment_start: tuple[str, ...] = (";", "#", "//")) -> t.Generator[Token, None, None]:
func = read_commands_from_file if is_file else read_commands_from_string
for command in func(long_str, comment_start):
if not command.command_name.startswith(mode):
continue
if mode == "M":
request = command.as_request(module_num=getattr(obj, "module_id", None))
elif mode == "P":
request = command.as_request(
module_num=getattr(getattr(obj, "kind", None), "module_id", None),
port_num=getattr(getattr(obj, "kind", None), "port_id", None),
)
else:
request = command.as_request()
yield Token(obj._conn, request)
async def _helper(obj: GenericAnyTester | GenericAnyModule | GenericAnyPort, long_str: str, is_file: bool, mode: str, comment_start: tuple[str, ...]) -> None:
async for f in apply_iter(*upload_config_from(obj, long_str, is_file, mode, comment_start), return_exceptions=True):
pass
[docs]
async def tester_config_from_string(tester: GenericAnyTester, long_str: str, comment_start: tuple[str, ...] = (";", "#", "//")) -> None:
"""Send tester configuration from a string. The CLI commands must all start with `C_` prefix.
:param tester: the tester object
:type tester: GenericAnyTester
:param long_str: the string containing the CLI commands
:type long_str: str
:param comment_start: symbol used to start a comment, defaults to (";", "#", "//")
:type comment_start: tuple[str, ...], optional
"""
await _helper(tester, long_str, False, "C", comment_start)
[docs]
async def tester_config_from_file(tester: GenericAnyTester, path: str, comment_start: tuple[str, ...] = (";", "#", "//")) -> None:
"""Send tester configuration from a configuration file. The CLI commands must all start with `C_` prefix.
:param tester: the tester object
:type tester: GenericAnyTester
:param path: the path to the configuration file
:type path: str
:param comment_start: symbol used to start a comment, defaults to (";", "#", "//")
:type comment_start: tuple[str, ...], optional
"""
await _helper(tester, path, True, "C", comment_start)
[docs]
async def module_config_from_string(module: GenericAnyModule, long_str: str, comment_start: tuple[str, ...] = (";", "#", "//")) -> None:
"""Send module configuration from a string. The CLI commands must all start with `M_` prefix.
:param module: the module object
:type module: GenericAnyModule
:param long_str: the string containing the CLI commands
:type long_str: str
:param comment_start: symbol used to start a comment, defaults to (";", "#", "//")
:type comment_start: tuple[str, ...], optional
"""
assert (module.is_reserved_by_me()), f"Please reserve Module {module.module_id} first!"
await _helper(module, long_str, False, "M", comment_start)
[docs]
async def module_config_from_file(module: GenericAnyModule, path: str, comment_start: tuple[str, ...] = (";", "#", "//")) -> None:
"""Send module configuration from a configuration file. The CLI commands must all start with `M_` prefix.
:param module: the module object
:type module: GenericAnyModule
:param path: the path to the configuration file
:type path: str
:param comment_start: symbol used to start a comment, defaults to (";", "#", "//")
:type comment_start: tuple[str, ...], optional
"""
assert (module.is_reserved_by_me()), f"Please reserve Module {module.module_id} first!"
await _helper(module, path, True, "M", comment_start)
[docs]
async def port_config_from_string(port: GenericAnyPort, long_str: str, comment_start: tuple[str, ...] = (";", "#", "//")) -> None:
"""Send port configuration from a string. The CLI commands must all start with `P_` prefix.
:param port: the port object
:type port: GenericAnyPort
:param long_str: the string containing the CLI commands
:type long_str: str
:param comment_start: symbol used to start a comment, defaults to (";", "#", "//")
:type comment_start: tuple[str, ...], optional
"""
assert (port.is_reserved_by_me()), f"Please reserve Port {port.kind.module_id}/{port.kind.port_id} first!"
await _helper(port, long_str, False, "P", comment_start)
[docs]
async def port_config_from_file(port: GenericAnyPort, path: str, comment_start: tuple[str, ...] = (";", "#", "//")) -> None:
"""Send port configuration from a port configuration file (.xpc file).
:param port: the port object
:type port: GenericAnyPort
:param path: the path to the configuration file
:type path: str
:param comment_start: symbol used to start a comment, defaults to (";", "#", "//")
:type comment_start: tuple[str, ...], optional
"""
assert (port.is_reserved_by_me()), f"Please reserve Port {port.kind.module_id}/{port.kind.port_id} first!"
await _helper(port, path, True, "P", comment_start)
__all__ = (
"read_commands_from_file",
"read_commands_from_string",
"tester_config_from_string",
"module_config_from_string",
"port_config_from_string",
"tester_config_from_file",
"module_config_from_file",
"port_config_from_file",
)