from __future__ import annotations
from dataclasses import dataclass
import ipaddress
import typing
import functools
from xoa_driver.internals.core.builders import (
build_get_request,
build_set_request
)
from xoa_driver.internals.core import interfaces
from xoa_driver.internals.core.token import Token
from xoa_driver.internals.core.transporter.registry import register_command
from xoa_driver.internals.core.transporter.protocol.payload import (
field,
RequestBodyStruct,
ResponseBodyStruct,
XmpByte,
XmpHex,
XmpInt,
XmpIPv4Address,
XmpIPv6Address,
XmpLong,
XmpMacAddress,
XmpSequence,
XmpStr,
Hex,
)
from .enums import (
OnOff,
OnOffWithSuppress,
ProtocolOption,
ModifierAction,
LengthType,
PayloadType,
PFCMode,
StreamOption,
)
[docs]
@register_command
@dataclass
class PS_INDICES:
"""
The full list of which streams are defined for a port. These are the sub-index
values that are used for the parameters defining the traffic patterns
transmitted for the port. Setting the value of this command creates a new
empty stream for each value that is not already in use, and deletes each stream
that is not mentioned in the list. The same can be accomplished one-stream-at-a-
time using the :class:`PS_CREATE` and :class:`PS_DELETE` commands.
"""
code: typing.ClassVar[int] = 150
pushed: typing.ClassVar[bool] = True
_connection: 'interfaces.IConnection'
_module: int
_port: int
[docs]
class GetDataAttr(ResponseBodyStruct):
stream_indices: typing.List[int] = field(XmpSequence(types_chunk=[XmpInt()]))
"""list of integers, the sub-indices of streams on the port."""
[docs]
class SetDataAttr(RequestBodyStruct):
stream_indices: typing.List[int] = field(XmpSequence(types_chunk=[XmpInt()]))
"""list of integers, the sub-indices of streams on the port."""
[docs]
def get(self) -> Token[GetDataAttr]:
"""Get the full list of which streams are defined for a port.
:return: the sub-indices of streams on the port
:rtype: PS_INDICES.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port))
[docs]
def set(self, stream_indices: typing.List[int]) -> Token[None]:
"""Creates a new empty stream for each value that is not already in use, and deletes each stream that is not mentioned in the list.
:param stream_indices: the sub-indices of streams on the port
:type stream_indices: typing.List[int]
"""
return Token(self._connection, build_set_request(self, module=self._module, port=self._port, stream_indices=stream_indices))
[docs]
@register_command
@dataclass
class PS_CREATE:
"""
Creates an empty stream definition with the specified sub-index value.
"""
code: typing.ClassVar[int] = 151
pushed: typing.ClassVar[bool] = False
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class SetDataAttr(RequestBodyStruct):
pass
[docs]
def set(self) -> Token[None]:
"""Creates an empty stream definition with the specified sub-index value.
"""
return Token(self._connection, build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
@register_command
@dataclass
class PS_DELETE:
"""
Deletes the stream definition with the specified sub-index value.
"""
code: typing.ClassVar[int] = 152
pushed: typing.ClassVar[bool] = False
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class SetDataAttr(RequestBodyStruct):
pass
[docs]
def set(self) -> Token[None]:
"""Deletes the stream definition with the specified sub-index value.
"""
return Token(self._connection, build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
@register_command
@dataclass
class PS_ENABLE:
"""
This property determines if a stream contributes outgoing packets for a port.
The value can be toggled between ON and SUPPRESS while traffic is enabled at the
port level. Streams in the OFF state cannot be set to any other value while
traffic is enabled. The sum of the rates of all enabled or suppressed streams
must not exceed the effective port rate.
"""
code: typing.ClassVar[int] = 153
pushed: typing.ClassVar[bool] = True
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class GetDataAttr(ResponseBodyStruct):
state: OnOffWithSuppress = field(XmpByte())
"""coded byte, specifying a stream state."""
[docs]
class SetDataAttr(RequestBodyStruct):
state: OnOffWithSuppress = field(XmpByte())
"""coded byte, specifying a stream state."""
[docs]
def get(self) -> Token[GetDataAttr]:
"""Get the stream status.
:return: status of the stream
:rtype: PS_ENABLE.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
def set(self, state: OnOffWithSuppress) -> Token[None]:
"""Set the stream status. The value can be toggled between ON and SUPPRESS while traffic is enabled at the
port level. Streams in the OFF state cannot be set to any other value while
traffic is enabled. The sum of the rates of all enabled or suppressed streams
must not exceed the effective port rate.
:param state: a stream state
:type state: OnOffWithSuppress
"""
return Token(self._connection, build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex], state=state))
set_off = functools.partialmethod(set, OnOffWithSuppress.OFF)
"""Set the stream status to OFF.
"""
set_on = functools.partialmethod(set, OnOffWithSuppress.ON)
"""Set the stream status to ON.
"""
set_suppress = functools.partialmethod(set, OnOffWithSuppress.SUPPRESS)
"""Set the stream status to SUPPRESS.
"""
[docs]
@register_command
@dataclass
class PS_PACKETLIMIT:
"""
Based on different port transmission mode, the meaning of this API is different.
When Port TX Mode is set to NORMAL, STRICT UNIFORM or BURST: The number of
packets that will be transmitted when traffic is started on a port. A value of 0
or -1 makes the stream transmit continuously. When Port TX Mode is set to
SEQUENTIAL: The number of sequential packets sent before switching to the next
stream. The minimum value is 1. The port will transmit continuously until the
user stops the traffic.
"""
code: typing.ClassVar[int] = 154
pushed: typing.ClassVar[bool] = True
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class GetDataAttr(ResponseBodyStruct):
packet_count: int = field(XmpInt())
"""integer, the number of packets that the port will send. When Port TX Mode is set to NORMAL, STRICT UNIFORM or BURST: 0 or -1 (disable packet limitation).
When Port TX Mode is set to SEQUENTIAL: 1 or larger (minimum value since the port transmits at least 1 packet per stream per round).
"""
[docs]
class SetDataAttr(RequestBodyStruct):
packet_count: int = field(XmpInt())
"""integer, the number of packets that the port will send. When Port TX Mode is set to NORMAL, STRICT UNIFORM or BURST: 0 or -1 (disable packet limitation).
When Port TX Mode is set to SEQUENTIAL: 1 or larger (minimum value since the port transmits at least 1 packet per stream per round).
"""
[docs]
def get(self) -> Token[GetDataAttr]:
"""If Port TX Mode is NORMAL, STRICT UNIFORM or BURST: get the number of packets that will be transmitted when traffic is started on a port.
If Port TX Mode is SEQUENTIAL: get the number of sequential packets sent before switching to the next stream.
:return: the number of packets
:rtype: PS_PACKETLIMIT.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
def set(self, packet_count: int) -> Token[None]:
"""If Port TX Mode is NORMAL, STRICT UNIFORM or BURST: set the number of packets that will be transmitted when traffic is started on a port,
0 or -1 (disable packet limitation).
If Port TX Mode is SEQUENTIAL: set the number of sequential packets sent before switching to the next stream,
1 or larger (minimum value since the port transmits at least 1 packet per stream per round).
:param packet_count: the number of packets
:type packet_count: int
"""
return Token(self._connection, build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex], packet_count=packet_count))
[docs]
@register_command
@dataclass
class PS_TPLDID:
"""
The identifier of the test payloads inserted into packets transmitted for a
stream. A value of -1 disables test payloads for the stream. Test payloads are
inserted at the end of each packet, and contains time-stamp and sequence-number
information. This allows the receiving port to provide error-checking and
latency measurements, in addition to the basic counts and rate measurements
provided for all traffic. The test payload identifier furthermore allows the
receiving port to distinguish multiple different streams, which may originate
from multiple different chassis. Since test payloads are an inter-port and
inter-chassis mechanism, the test payload identifier assignments should be
planned globally across all the chassis and ports of the testbed.
"""
code: typing.ClassVar[int] = 157
pushed: typing.ClassVar[bool] = True
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class GetDataAttr(ResponseBodyStruct):
test_payload_identifier: int = field(XmpInt())
"""integer, the test payload identifier value. -1 = disable test payloads."""
[docs]
class SetDataAttr(RequestBodyStruct):
test_payload_identifier: int = field(XmpInt())
"""integer, the test payload identifier value. -1 = disable test payloads."""
[docs]
def get(self) -> Token[GetDataAttr]:
"""Get the identifier of the test payloads inserted into packets transmitted for a stream.
:return: the test payload identifier value. -1 = disable test payloads.
:rtype: PS_TPLDID.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
def set(self, test_payload_identifier: int) -> Token[None]:
"""Set the identifier of the test payloads inserted into packets transmitted for a stream.
A value of -1 disables test payloads for the stream. Test payloads are
inserted at the end of each packet, and contains time-stamp and sequence-number
information.
:param test_payload_identifier: the test payload identifier value. -1 = disable test payloads
:type test_payload_identifier: int
"""
return Token(
self._connection,
build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex], test_payload_identifier=test_payload_identifier)
)
[docs]
@register_command
@dataclass
class PS_INSERTFCS:
"""
Whether a valid frame checksum is added to the packets of a stream.
"""
code: typing.ClassVar[int] = 158
pushed: typing.ClassVar[bool] = True
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class GetDataAttr(ResponseBodyStruct):
on_off: OnOff = field(XmpByte())
"""coded byte, whether frame checksums are inserted."""
[docs]
class SetDataAttr(RequestBodyStruct):
on_off: OnOff = field(XmpByte())
"""coded byte, whether frame checksums are inserted."""
[docs]
def get(self) -> Token[GetDataAttr]:
"""Get whether a valid frame checksum is added to the packets of a stream.
:return: whether frame checksums are inserted
:rtype: PS_INSERTFCS.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
def set(self, on_off: OnOff) -> Token[None]:
"""Set whether a valid frame checksum is added to the packets of a stream.
:param on_off: whether frame checksums are inserted
:type on_off: OnOff
"""
return Token(self._connection, build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex], on_off=on_off))
set_off = functools.partialmethod(set, OnOff.OFF)
"""Disable a valid frame checksum to be added to the packets of a stream.
"""
set_on = functools.partialmethod(set, OnOff.ON)
"""Enable a valid frame checksum to be added to the packets of a stream.
"""
[docs]
@register_command
@dataclass
class PS_AUTOADJUST:
"""
.. versionadded:: v2.0
Executing PS_AUTOADJUST will adjust the packet length distribution (:class:`PS_PACKETLENGTH`) of the stream:
(1) Set the type of packet length distribution (:class:`PS_PACKETLENGTH` ``<length_type>``) to ``FIXED``.
(2) Set the lower limit on the packet length (:class:`PS_PACKETLENGTH` ``<min_val>``) to exactly fit the specified protocol headers,
TPLD and FCS (but never set to less than 64).
(3) Set the payload type of packets transmitted for the stream (:class:`PS_PAYLOAD` ``<payload_type>``) to ``PATTERN``.
(4) If necessary, also set the maximum number of header content bytes (`P_MAXHEADERLENGTH <p_maxheaderlength_label>` ``<max_header_length>``)
that can be freely specified for each generated stream of the port to a higher value, if needed to accommodate the header size of the stream
(implicitly given by the `PS_PACKETHEADER` command).
(5) If the needed maximum header length (`P_MAXHEADERLENGTH <p_maxheaderlength_label>` ``<max_header_length>``)
is not possible with the actual number of active streams for the port, the command will fail with :`<BADVALUE>`.
"""
code: typing.ClassVar[int] = 159
pushed: typing.ClassVar[bool] = False
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class SetDataAttr(RequestBodyStruct):
pass
[docs]
def set(self) -> Token[None]:
"""Adjust the packet length distribution of a stream.
"""
return Token(self._connection, build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
@register_command
@dataclass
class PS_ARPREQUEST:
"""
Generates an outgoing ARP request on the test port. The packet header for the
stream must contain an IP protocol segment, and the destination IP address is
used in the ARP request. If there is a gateway IP address specified for the port
and it is on a different subnet than the destination IP address in the packet
header, then the gateway IP address is used instead. The framing of the ARP
request matches the packet header, including any VLAN protocol segments. This
command does not generate an immediate result, but waits until an ARP
reply is received on the test port. If no reply is received within 500
milliseconds, it returns.
"""
code: typing.ClassVar[int] = 161
pushed: typing.ClassVar[bool] = False
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class GetDataAttr(ResponseBodyStruct):
mac_address: Hex = field(XmpMacAddress())
"""six hex bytes, specifying the six bytes of the MAC address."""
[docs]
def get(self) -> Token[GetDataAttr]:
"""Generates an outgoing ARP request on the test port. The packet header for the
stream must contain an IP protocol segment, and the destination IP address is
used in the ARP request. If there is a gateway IP address specified for the port
and it is on a different subnet than the destination IP address in the packet
header, then the gateway IP address is used instead. The framing of the ARP
request matches the packet header, including any VLAN protocol segments. This
command does not generate an immediate result, but waits until an ARP
reply is received on the test port. If no reply is received within 500
milliseconds, it returns.
:return: the MAC address of the peer port
:rtype: PS_ARPREQUEST.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
@register_command
@dataclass
class PS_PINGREQUEST:
"""
Generates an outgoing ping request using the ICMP protocol on the test port. The
packet header for the stream must contain an IP protocol segment, with valid
source and destination IP addresses. The framing of the ping request matches the
packet header, including any VLAN protocol segments, and the destination MAC
address must also be valid, possibly containing a value obtained with
PS_ARPREQUEST. This command does not generate an immediate result, but
waits until a ping reply is received on the test port.
"""
code: typing.ClassVar[int] = 162
pushed: typing.ClassVar[bool] = False
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class GetDataAttr(ResponseBodyStruct):
delay: int = field(XmpInt())
"""integer, the number of milliseconds for the ping reply to arrive."""
time_to_live: int = field(XmpByte())
"""byte, the time-to-live value in the ping reply packet."""
[docs]
def get(self) -> Token[GetDataAttr]:
"""Generates an outgoing ping request using the ICMP protocol on the test port. The
packet header for the stream must contain an IP protocol segment, with valid
source and destination IP addresses. The framing of the ping request matches the
packet header, including any VLAN protocol segments, and the destination MAC
address must also be valid, possibly containing a value obtained with
PS_ARPREQUEST. This command does not generate an immediate result, but
waits until a ping reply is received on the test port.
:return: the number of milliseconds for the ping reply to arrive, and the time-to-live value in the ping reply packet.
:rtype: PS_PINGREQUEST.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
@register_command
@dataclass
class PS_MODIFIERRANGE:
"""
Range specification for a packet modifier for a stream header, specifying which
values the modifier should take on. This applies only to incrementing and
decrementing modifiers; random modifiers always produce every possible bit
pattern. The range is specified as three values: mix, step, and max, where max
must be equal to min plus a multiple of step. Note that when "decrement" is
specified in PS_MODIFIER as the action, the value sequence will begin with the
max value instead of the min value and decrement from there: {max, max-1, max-2,
...., min, max, max-1...}.
"""
code: typing.ClassVar[int] = 168
pushed: typing.ClassVar[bool] = True
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
_modifier_xindex: int
[docs]
class GetDataAttr(ResponseBodyStruct):
min_val: int = field(XmpInt())
"""integer, the minimum modifier value."""
step: int = field(XmpInt())
"""integer, the increment between modifier values."""
max_val: int = field(XmpInt())
"""integer, the maximum modifier value."""
[docs]
class SetDataAttr(RequestBodyStruct):
min_val: int = field(XmpInt())
"""integer, the minimum modifier value."""
step: int = field(XmpInt())
"""integer, the increment between modifier values."""
max_val: int = field(XmpInt())
"""integer, the maximum modifier value."""
[docs]
def get(self) -> Token[GetDataAttr]:
"""Get the range specification for a packet modifier for a stream header, specifying which values the modifier should take on.
:return: the minimum modifier value, the increment between modifier values, the maximum modifier value.
:rtype: PS_MODIFIERRANGE.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port, indices=[self._stream_xindex, self._modifier_xindex]))
[docs]
def set(self, min_val: int, step: int, max_val: int) -> Token[None]:
"""Set the range specification for a packet modifier for a stream header, specifying which
values the modifier should take on. This applies only to incrementing and
decrementing modifiers; random modifiers always produce every possible bit
pattern. The range is specified as three values: mix, step, and max, where max
must be equal to min plus a multiple of step. Note that when "decrement" is
specified in PS_MODIFIER as the action, the value sequence will begin with the
max value instead of the min value and decrement from there: {max, max-1, max-2,
...., min, max, max-1...}.
:param min_val: the minimum modifier value
:type min_val: int
:param step: the increment between modifier values
:type step: int
:param max_val: the maximum modifier value
:type max_val: int
"""
return Token(
self._connection,
build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex, self._modifier_xindex], min_val=min_val, step=step, max_val=max_val)
)
[docs]
@register_command
@dataclass
class PS_RATEFRACTION:
"""
The rate of the traffic transmitted for a stream expressed in millionths of the
effective rate for the port. The bandwidth consumption includes the inter-frame
gap and is independent of the length of the packets generated for the stream.
The sum of the bandwidth consumption for all the enabled streams must not exceed
the effective rate for the port. Setting this command also instructs the
Manager to attempt to keep the rate-percentage unchanged in case it has to cap
stream rates. Get value is only valid if the rate was last set using this
command.
"""
code: typing.ClassVar[int] = 169
pushed: typing.ClassVar[bool] = True
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class GetDataAttr(ResponseBodyStruct):
stream_rate_ppm: int = field(XmpInt())
"""integer, stream rate expressed as a ppm value between 0 and 1,000,000."""
[docs]
class SetDataAttr(RequestBodyStruct):
stream_rate_ppm: int = field(XmpInt())
"""integer, stream rate expressed as a ppm value between 0 and 1,000,000."""
[docs]
def get(self) -> Token[GetDataAttr]:
"""Get the rate of the traffic transmitted for a stream expressed in millionths of the
effective rate for the port. Get value is only valid if the rate was last set using this
command.
:return: stream rate expressed as a ppm value between 0 and 1,000,000
:rtype: PS_RATEFRACTION.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
def set(self, stream_rate_ppm: int) -> Token[None]:
"""Set the rate of the traffic transmitted for a stream expressed in millionths of the
effective rate for the port. The bandwidth consumption includes the inter-frame
gap and is independent of the length of the packets generated for the stream.
The sum of the bandwidth consumption for all the enabled streams must not exceed
the effective rate for the port. Setting this command also instructs the
Manager to attempt to keep the rate-percentage unchanged in case it has to cap
stream rates.
:param stream_rate_ppm: stream rate expressed as a ppm value between 0 and 1,000,000.
:type stream_rate_ppm: int
"""
return Token(self._connection, build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex], stream_rate_ppm=stream_rate_ppm))
[docs]
@register_command
@dataclass
class PS_RATEPPS:
"""
The rate of the traffic transmitted for a stream expressed in packets per
second. The bandwidth consumption is heavily dependent on the length of the
packets generated for the stream, and also on the inter-frame gap for the port.
The sum of the bandwidth consumption for all the enabled streams must not exceed
the effective rate for the port. Setting this command also instructs the
Manager to attempt to keep the packets-per-second unchanged in case it has to
cap stream rates. Get value is only valid if the rate was the last set using
this command.
"""
code: typing.ClassVar[int] = 170
pushed: typing.ClassVar[bool] = False
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class GetDataAttr(ResponseBodyStruct):
stream_rate_pps: int = field(XmpInt())
"""integer, stream rate expressed in packets per second."""
[docs]
class SetDataAttr(RequestBodyStruct):
stream_rate_pps: int = field(XmpInt())
"""integer, stream rate expressed in packets per second."""
[docs]
def get(self) -> Token[GetDataAttr]:
"""Get The rate of the traffic transmitted for a stream expressed in packets per
second. Get value is only valid if the rate was the last set using
this command.
:return: stream rate expressed in packets per second
:rtype: PS_RATEPPS.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
def set(self, stream_rate_pps: int) -> Token[None]:
"""Set The rate of the traffic transmitted for a stream expressed in packets per
second. The bandwidth consumption is heavily dependent on the length of the
packets generated for the stream, and also on the inter-frame gap for the port.
The sum of the bandwidth consumption for all the enabled streams must not exceed
the effective rate for the port. Setting this command also instructs the
Manager to attempt to keep the packets-per-second unchanged in case it has to
cap stream rates.
:param stream_rate_pps: stream rate expressed in packets per second
:type stream_rate_pps: int
"""
return Token(self._connection, build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex], stream_rate_pps=stream_rate_pps))
[docs]
@register_command
@dataclass
class PS_RATEL2BPS:
"""
The rate of the traffic transmitted for a stream, expressed in units of bits-
per-second at layer-2, thus including the Ethernet header but excluding the
inter-frame gap. The bandwidth consumption is somewhat dependent on the length
of the packets generated for the stream, and also on the inter-frame gap for the
port. The sum of the bandwidth consumption for all the enabled streams must not
exceed the effective rate for the port. Setting this command also instructs
the Manager to attempt to keep the layer-2 bps rate unchanged in case it has to
cap stream rates. Get value is only valid if the rate was the last set using
this command.
"""
code: typing.ClassVar[int] = 171
pushed: typing.ClassVar[bool] = False
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class GetDataAttr(ResponseBodyStruct):
l2_bps: int = field(XmpLong())
"""long integer, stream rate expressed in bits per second."""
[docs]
class SetDataAttr(RequestBodyStruct):
l2_bps: int = field(XmpLong())
"""long integer, stream rate expressed in bits per second."""
[docs]
def get(self) -> Token[GetDataAttr]:
"""Get the rate of the traffic transmitted for a stream, expressed in units of bits-
per-second at layer-2, thus including the Ethernet header but excluding the
inter-frame gap. Get value is only valid if the rate was the last set using
this command.
:return: stream rate expressed in bits per second
:rtype: PS_RATEL2BPS.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
def set(self, l2_bps: int) -> Token[None]:
"""Set the rate of the traffic transmitted for a stream, expressed in units of bits-
per-second at layer-2, thus including the Ethernet header but excluding the
inter-frame gap. The bandwidth consumption is somewhat dependent on the length
of the packets generated for the stream, and also on the inter-frame gap for the
port. The sum of the bandwidth consumption for all the enabled streams must not
exceed the effective rate for the port. Setting this command also instructs
the Manager to attempt to keep the layer-2 bps rate unchanged in case it has to
cap stream rates.
:param l2_bps: stream rate expressed in bits per second
:type l2_bps: int
"""
return Token(self._connection, build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex], l2_bps=l2_bps))
[docs]
@register_command
@dataclass
class PS_BURST:
"""
The burstiness of the traffic transmitted for a stream, expressed in terms of
the number of packets in each burst, and how densely they are packed together.
The burstiness does not affect the bandwidth consumed by the stream, only the
spacing between the packets. A density value of 100 means that the packets are
packed tightly together, only spaced by the minimum inter-frame gap. A value of
0 means even, non-bursty, spacing. The exact spacing achieved depends on the
other enabled streams of the port.
"""
code: typing.ClassVar[int] = 174
pushed: typing.ClassVar[bool] = True
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class GetDataAttr(ResponseBodyStruct):
size: int = field(XmpInt())
"""integer, the number of packets lumped together in a burst."""
density: int = field(XmpInt())
"""integer, the percentage of the available spacing that is inserted between bursts."""
[docs]
class SetDataAttr(RequestBodyStruct):
size: int = field(XmpInt())
"""integer, the number of packets lumped together in a burst."""
density: int = field(XmpInt())
"""integer, the percentage of the available spacing that is inserted between bursts."""
[docs]
def get(self) -> Token[GetDataAttr]:
"""Get the burstiness of the traffic transmitted for a stream, expressed in terms of
the number of packets in each burst, and how densely they are packed together.
:return: the number of packets lumped together in a burst, and the percentage of the available spacing that is inserted between bursts
:rtype: ~PS_BURST.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
def set(self, size: int, density: int) -> Token[None]:
"""Set the burstiness of the traffic transmitted for a stream, expressed in terms of
the number of packets in each burst, and how densely they are packed together.
The burstiness does not affect the bandwidth consumed by the stream, only the
spacing between the packets. A density value of 100 means that the packets are
packed tightly together, only spaced by the minimum inter-frame gap. A value of
0 means even, non-bursty, spacing. The exact spacing achieved depends on the
other enabled streams of the port.
:param size: the number of packets lumped together in a burst
:type size: int
:param density: the percentage of the available spacing that is inserted between bursts
:type density: int
"""
return Token(self._connection, build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex], size=size, density=density))
[docs]
@register_command
@dataclass
class PS_MODIFIERCOUNT:
"""
The number of standard 16-bit modifiers active on the packet header of a stream.
Each modifier is specified using PS_MODIFIER.
"""
code: typing.ClassVar[int] = 177
pushed: typing.ClassVar[bool] = True
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class GetDataAttr(ResponseBodyStruct):
modifier_count: int = field(XmpInt())
"""integer, the number of modifiers for the stream."""
[docs]
class SetDataAttr(RequestBodyStruct):
modifier_count: int = field(XmpInt())
"""integer, the number of modifiers for the stream."""
[docs]
def get(self) -> Token[GetDataAttr]:
"""Get the number of standard 16-bit modifiers active on the packet header of a stream.
Each modifier is specified using PS_MODIFIER.
:return: the number of modifiers for the stream
:rtype: PS_MODIFIERCOUNT.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
def set(self, modifier_count: int) -> Token[None]:
"""Set the number of standard 16-bit modifiers active on the packet header of a stream.
Each modifier is specified using PS_MODIFIER.
:param modifier_count: the number of modifiers for the stream
:type modifier_count: int
"""
return Token(self._connection, build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex], modifier_count=modifier_count))
[docs]
@register_command
@dataclass
class PS_MODIFIER:
"""
A packet modifier for a stream header. The headers of each packet transmitted
for the stream will be varied according to the modifier specification. This
command requires two sub-indices, one for the stream and one for the modifier.
A modifier is positioned at a fixed place in the header, selects a number of
consecutive bits starting from that position, and applies an action to those
bits in each packet. Packets can be repeated so that a certain number of
identical packets are transmitted before applying the next modification.
"""
code: typing.ClassVar[int] = 178
pushed: typing.ClassVar[bool] = True
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
_modifier_xindex: int
[docs]
class GetDataAttr(ResponseBodyStruct):
position: int = field(XmpInt())
"""integer, the byte position from the start of the packet."""
mask: Hex = field(XmpHex(size=4))
"""four hex bytes, the mask specifying which bits to affect."""
action: ModifierAction = field(XmpInt())
"""coded integer, which action to perform on the affected bits."""
repetition: int = field(XmpInt())
"""integer, how many times to repeat on each packet."""
[docs]
class SetDataAttr(RequestBodyStruct):
position: int = field(XmpInt())
"""integer, the byte position from the start of the packet."""
mask: Hex = field(XmpHex(size=4))
"""four hex bytes, the mask specifying which bits to affect."""
action: ModifierAction = field(XmpInt())
"""coded integer, which action to perform on the affected bits."""
repetition: int = field(XmpInt())
"""integer, how many times to repeat on each packet."""
[docs]
def get(self) -> Token[GetDataAttr]:
"""Get a packet modifier for a stream header. The headers of each packet transmitted
for the stream will be varied according to the modifier specification. This
command requires two sub-indices, one for the stream and one for the modifier.
A modifier is positioned at a fixed place in the header, selects a number of
consecutive bits starting from that position, and applies an action to those
bits in each packet. Packets can be repeated so that a certain number of
identical packets are transmitted before applying the next modification.
:return: the byte position from the start of the packet, the mask specifying which bits to affect, which action to perform on the affected bits,
and how many times to repeat on each packet
:rtype: PS_MODIFIER.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port, indices=[self._stream_xindex, self._modifier_xindex]))
[docs]
def set(self, position: int, mask: Hex, action: ModifierAction, repetition: int) -> Token[None]:
"""Set a packet modifier for a stream header. The headers of each packet transmitted
for the stream will be varied according to the modifier specification. This
command requires two sub-indices, one for the stream and one for the modifier.
A modifier is positioned at a fixed place in the header, selects a number of
consecutive bits starting from that position, and applies an action to those
bits in each packet. Packets can be repeated so that a certain number of
identical packets are transmitted before applying the next modification.
:param position: the byte position from the start of the packet
:type position: int
:param mask: the mask specifying which bits to affect
:type mask: str
:param action: which action to perform on the affected bits
:type action: ModifierAction
:param repetition: how many times to repeat on each packet
:type repetition: int
"""
return Token(
self._connection,
build_set_request(
self,
module=self._module,
port=self._port,
indices=[self._stream_xindex, self._modifier_xindex],
position=position,
mask=mask,
action=action,
repetition=repetition
)
)
set_inc = functools.partialmethod(set, action=ModifierAction.INC)
"""Set a packet modifier action to incrementing.
"""
set_dec = functools.partialmethod(set, action=ModifierAction.DEC)
"""Set a packet modifier action to decrementing.
"""
set_random = functools.partialmethod(set, action=ModifierAction.RANDOM)
"""Set a packet modifier action to random.
"""
[docs]
@register_command
@dataclass
class PS_PACKETLENGTH:
"""
The length distribution of the packets transmitted for a stream. The length of
the packets transmitted for a stream can be varied from packet to packet,
according to a choice of distributions within a specified min...max range. The
length of each packet is reflected in the size of the payload portion of the
packet, whereas the header has constant length. Length variation complements,
and is independent of, the content variation produced by header modifiers.
"""
code: typing.ClassVar[int] = 179
pushed: typing.ClassVar[bool] = True
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class GetDataAttr(ResponseBodyStruct):
length_type: LengthType = field(XmpInt())
"""coded integer, the kind of distribution of packet length."""
min_val: int = field(XmpInt())
"""integer, lower limit on the packet length."""
max_val: int = field(XmpInt())
"""integer, upper limit on the packet length."""
[docs]
class SetDataAttr(RequestBodyStruct):
length_type: LengthType = field(XmpInt())
"""coded integer, the kind of distribution of packet length."""
min_val: int = field(XmpInt())
"""integer, lower limit on the packet length."""
max_val: int = field(XmpInt())
"""integer, upper limit on the packet length."""
[docs]
def get(self) -> Token[GetDataAttr]:
"""Get the length distribution of the packets transmitted for a stream. The length of
the packets transmitted for a stream can be varied from packet to packet,
according to a choice of distributions within a specified min..max range. The
length of each packet is reflected in the size of the payload portion of the
packet, whereas the header has constant length. Length variation complements,
and is independent of, the content variation produced by header modifiers.
:return: the kind of distribution of packet length, lower limit on the packet length, and upper limit on the packet length.
:rtype: PS_PACKETLENGTH.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
def set(self, length_type: LengthType, min_val: int, max_val: int) -> Token[None]:
"""Set the length distribution of the packets transmitted for a stream. The length of
the packets transmitted for a stream can be varied from packet to packet,
according to a choice of distributions within a specified min..max range. The
length of each packet is reflected in the size of the payload portion of the
packet, whereas the header has constant length. Length variation complements,
and is independent of, the content variation produced by header modifiers.
:param length_type: the kind of distribution of packet length
:type length_type: LengthType
:param min_val: lower limit on the packet length
:type min_val: int
:param max_val: upper limit on the packet length
:type max_val: int
"""
return Token(
self._connection,
build_set_request(
self,
module=self._module,
port=self._port,
indices=[self._stream_xindex],
length_type=length_type,
min_val=min_val,
max_val=max_val
)
)
set_fixed = functools.partialmethod(set, LengthType.FIXED)
"""Set the packet length distribution to Fixed.
"""
set_incrementing = functools.partialmethod(set, LengthType.INCREMENTING)
"""Set the packet length distribution to Incrementing. Length per packet: {min, min+1, min+2,
...., max-2, max-1, max...}.
"""
set_butterfly = functools.partialmethod(set, LengthType.BUTTERFLY)
"""Set the packet length distribution to Butterfly. Length per packet: {min, max, min+1, max-1, min+2, max-2,
...}.
"""
set_random = functools.partialmethod(set, LengthType.RANDOM)
"""Set the packet length distribution to Random.
"""
set_mix = functools.partialmethod(set, LengthType.MIX)
"""Set the packet length distribution to Mix.
"""
[docs]
@register_command
@dataclass
class PS_PAYLOAD:
"""
The payload content of the packets transmitted for a stream. The payload portion
of a packet starts after the header and continues up until the test payload or
the frame checksum. The payload may vary in length and is filled with either an
incrementing sequence of byte values or a repeated multi-byte pattern. Length
variation complements and is independent of the content variation produced by
header modifiers.
"""
code: typing.ClassVar[int] = 180
pushed: typing.ClassVar[bool] = True
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class GetDataAttr(ResponseBodyStruct):
payload_type: PayloadType = field(XmpByte())
"""coded byte, the kind of payload content."""
hex_data: Hex = field(XmpHex())
"""list of hex bytes, a pattern of bytes to be repeated. The maximum length of the pattern is 18 bytes. Only used if the type is set to PATTERN."""
[docs]
class SetDataAttr(RequestBodyStruct):
payload_type: PayloadType = field(XmpByte())
"""coded byte, the kind of payload content."""
hex_data: Hex = field(XmpHex())
"""list of hex bytes, a pattern of bytes to be repeated. The maximum length of the pattern is 18 bytes. Only used if the type is set to PATTERN."""
[docs]
def get(self) -> Token[GetDataAttr]:
"""Get the payload content of the packets transmitted for a stream. The payload portion
of a packet starts after the header and continues up until the test payload or
the frame checksum. The payload may vary in length and is filled with either an
incrementing sequence of byte values or a repeated multi-byte pattern. Length
variation complements and is independent of the content variation produced by
header modifiers.
:return: the kind of payload content, and a pattern of bytes to be repeated.
:rtype: PS_PAYLOAD.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
def set(self, payload_type: PayloadType, hex_data: Hex) -> Token[None]:
"""Set the payload content of the packets transmitted for a stream. The payload portion
of a packet starts after the header and continues up until the test payload or
the frame checksum. The payload may vary in length and is filled with either an
incrementing sequence of byte values or a repeated multi-byte pattern. Length
variation complements and is independent of the content variation produced by
header modifiers.
:param payload_type: the kind of payload content
:type payload_type: PayloadType
:param hex_data: a pattern of bytes to be repeated. The maximum length of the pattern is 18 bytes. Only used if the type is set to PATTERN.
:type hex_data: Hex
"""
return Token(self._connection, build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex], payload_type=payload_type, hex_data=hex_data))
set_pattern = functools.partialmethod(set, PayloadType.PATTERN)
"""Set payload type to the custom pattern.
"""
set_inc_byte = functools.partialmethod(set, PayloadType.INC8)
"""Set payload type to Incrementing 0xFF (8-bit mode).
"""
set_prbs = functools.partialmethod(set, PayloadType.PRBS)
"""Set payload type to PRBS.
"""
set_random = functools.partialmethod(set, PayloadType.RANDOM)
"""Set payload type to Random.
"""
set_dec_byte = functools.partialmethod(set, PayloadType.DEC8)
"""Set payload type to Decrementing 0xFF (8-bit mode).
"""
set_inc_word = functools.partialmethod(set, PayloadType.INC16)
"""Set payload type to Incrementing 0xFFFF (16-bit mode).
"""
set_dec_word = functools.partialmethod(set, PayloadType.DEC16)
"""Set payload type to Decrementing 0xFFFF (16-bit mode).
"""
[docs]
@register_command
@dataclass
class PS_IPV4GATEWAY:
"""
An IPv4 gateway configuration specified for a stream.
"""
code: typing.ClassVar[int] = 181
pushed: typing.ClassVar[bool] = True
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class GetDataAttr(ResponseBodyStruct):
gateway: ipaddress.IPv4Address = field(XmpIPv4Address())
"""address, the IPv4 gateway address of the stream."""
[docs]
class SetDataAttr(RequestBodyStruct):
gateway: ipaddress.IPv4Address = field(XmpIPv4Address())
"""address, IPv4 gateway address of the stream."""
[docs]
def get(self) -> Token[GetDataAttr]:
"""Get the IPv4 gateway address of a stream.
:return: the IPv4 gateway address of the stream
:rtype: PS_IPV4GATEWAY.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
def set(self, gateway: ipaddress.IPv4Address) -> Token[None]:
"""Set the IPv4 gateway address of a stream.
:param gateway: the IPv4 gateway address of the stream
:type gateway: Union[str, ipaddress.IPv4Address, int]
"""
return Token(self._connection, build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex], gateway=gateway))
[docs]
@register_command
@dataclass
class PS_IPV6GATEWAY:
"""
An IPv6 gateway configuration specified for a stream.
"""
code: typing.ClassVar[int] = 182
pushed: typing.ClassVar[bool] = True
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class GetDataAttr(ResponseBodyStruct):
gateway: ipaddress.IPv6Address = field(XmpIPv6Address())
"""address, the IPv6 gateway address of the stream."""
[docs]
class SetDataAttr(RequestBodyStruct):
gateway: ipaddress.IPv6Address = field(XmpIPv6Address())
"""address, IPv6 gateway address of the stream."""
[docs]
def get(self) -> Token[GetDataAttr]:
"""Get the IPv6 gateway address of a stream.
:return: the IPv6 gateway address of the stream
:rtype: PS_IPV6GATEWAY.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
def set(self, gateway: ipaddress.IPv6Address) -> Token[None]:
"""Set the IPv6 gateway address of a stream.
:param gateway: the IPv6 gateway address of the stream
:type gateway: Union[str, ipaddress.IPv6Address, int]
"""
return Token(self._connection, build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex], gateway=gateway))
[docs]
@register_command
@dataclass
class PS_BURSTGAP:
"""
When the port is in in Burst TX mode, this command defines the gap between packets in a burst
(inter-packet gap) and the gap after a burst defined in one stream stops until a
burst defined in the next stream starts (inter-burst gap).
"""
code: typing.ClassVar[int] = 183
pushed: typing.ClassVar[bool] = True
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class GetDataAttr(ResponseBodyStruct):
inter_packet_gap: int = field(XmpInt())
"""integer, Burst Inter Packet Gap (in bytes)."""
inter_burst_gap: int = field(XmpInt())
"""integer, Inter Burst Gap (in bytes)."""
[docs]
class SetDataAttr(RequestBodyStruct):
inter_packet_gap: int = field(XmpInt())
"""integer, Burst Inter Packet Gap (in bytes)."""
inter_burst_gap: int = field(XmpInt())
"""integer, Inter Burst Gap (in bytes)."""
[docs]
def get(self) -> Token[GetDataAttr]:
"""Get the gap between packets in a burst (inter-packet gap) and the gap after a burst defined in one stream stops until a
burst defined in the next stream starts (inter-burst gap).
:return: the gap between packets in a burst
:rtype: PS_BURSTGAP.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
def set(self, inter_packet_gap: int, inter_burst_gap: int) -> Token[None]:
"""Set the gap between packets in a burst (inter-packet gap) and the gap after a burst defined in one stream stops until a
burst defined in the next stream starts (inter-burst gap).
:param inter_packet_gap: Burst Inter Packet Gap (in bytes).
:type inter_packet_gap: int
:param inter_burst_gap: Inter Burst Gap (in bytes).
:type inter_burst_gap: int
"""
return Token(
self._connection,
build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex], inter_packet_gap=inter_packet_gap, inter_burst_gap=inter_burst_gap)
)
[docs]
@register_command
@dataclass
class PS_INJECTFCSERR:
"""
Force a frame checksum error in one of the packets currently being transmitted
from a stream. This can aid in analyzing the error-detection functionality of
the system under test. Traffic must be on for the port, and the stream must be
enabled.
"""
code: typing.ClassVar[int] = 185
pushed: typing.ClassVar[bool] = False
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class SetDataAttr(RequestBodyStruct):
pass
[docs]
def set(self) -> Token[None]:
"""Set a frame checksum error in one of the packets currently being transmitted
from a stream. This can aid in analyzing the error-detection functionality of
the system under test. Traffic must be on for the port, and the stream must be
enabled.
"""
return Token(self._connection, build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
@register_command
@dataclass
class PS_INJECTSEQERR:
"""
Force a sequence error by skipping a test payload sequence number in one of the
packets currently being transmitted from a stream. This can aid in analyzing the
error-detection functionality of the system under test. Traffic must be on for
the port, and the stream must be enabled and include test payloads.
"""
code: typing.ClassVar[int] = 186
pushed: typing.ClassVar[bool] = False
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class SetDataAttr(RequestBodyStruct):
pass
[docs]
def set(self) -> Token[None]:
"""Set a sequence error by skipping a test payload sequence number in one of the
packets currently being transmitted from a stream. This can aid in analyzing the
error-detection functionality of the system under test. Traffic must be on for
the port, and the stream must be enabled and include test payloads.
"""
return Token(self._connection, build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
@register_command
@dataclass
class PS_INJECTMISERR:
"""
Force a misorder error by swapping the test payload sequence numbers in two of
the packets currently being transmitted from a stream. This can aid in analyzing
the error-detection functionality of the system under test. Traffic must be on
for the port, and the stream must be enabled and include test payloads.
"""
code: typing.ClassVar[int] = 187
pushed: typing.ClassVar[bool] = False
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class SetDataAttr(RequestBodyStruct):
pass
[docs]
def set(self) -> Token[None]:
"""Set a misorder error by swapping the test payload sequence numbers in two of
the packets currently being transmitted from a stream. This can aid in analyzing
the error-detection functionality of the system under test. Traffic must be on
for the port, and the stream must be enabled and include test payloads.
"""
return Token(self._connection, build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
@register_command
@dataclass
class PS_INJECTPLDERR:
"""
Force a payload integrity error in one of the packets currently being
transmitted from a stream. Payload integrity validation is only available for
incrementing payloads, and the error is created by changing a byte from the
incrementing sequence. The packet will have a correct frame checksum, but the
receiving Xena chassis will detect the invalid payload based on information in
the test payload. Traffic must be on for the port, and the stream must be
enabled and include test payloads.
"""
code: typing.ClassVar[int] = 188
pushed: typing.ClassVar[bool] = False
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class SetDataAttr(RequestBodyStruct):
pass
[docs]
def set(self) -> Token[None]:
"""Set a payload integrity error in one of the packets currently being
transmitted from a stream. Payload integrity validation is only available for
incrementing payloads, and the error is created by changing a byte from the
incrementing sequence. The packet will have a correct frame checksum, but the
receiving Xena chassis will detect the invalid payload based on information in
the test payload. Traffic must be on for the port, and the stream must be
enabled and include test payloads.
"""
return Token(self._connection, build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
@register_command
@dataclass
class PS_INJECTTPLDERR:
"""
Force a test payload error in one of the packets currently being transmitted
from a stream. This means that the test payload will not be recognized at the
receiving port, so it will be counted as a no-test-payload packet, and there
will be a lost packet for the stream. Traffic must be on for the port, and the
stream must be enabled and include test payloads.
"""
code: typing.ClassVar[int] = 189
pushed: typing.ClassVar[bool] = False
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class SetDataAttr(RequestBodyStruct):
pass
[docs]
def set(self) -> Token[None]:
"""Set a test payload error in one of the packets currently being transmitted
from a stream. This means that the test payload will not be recognized at the
receiving port, so it will be counted as a no-test-payload packet, and there
will be a lost packet for the stream. Traffic must be on for the port, and the
stream must be enabled and include test payloads.
"""
return Token(self._connection, build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
@register_command
@dataclass
class PS_MODIFIEREXT:
"""
An extended packet modifier for a stream header. The headers of each packet
transmitted for the stream will be varied according to the modifier
specification. The modifier acts on 32 bits and takes up the space for two
16-bit modifiers to do this. This command requires two sub-indices, one for
the stream and one for the modifier. A modifier is positioned at a fixed place
in the header, selects a number of consecutive bits starting from that position,
and applies an action to those bits in each packet. Packets can be repeated so
that a certain number of identical packets are transmitted before applying the
next modification.
"""
code: typing.ClassVar[int] = 190
pushed: typing.ClassVar[bool] = True
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
_modifier_xindex: int
[docs]
class GetDataAttr(ResponseBodyStruct):
position: int = field(XmpInt())
"""integer, the byte position from the start of the packet. Cannot be < 1!"""
mask: Hex = field(XmpHex(size=4))
"""four hex bytes, the mask specifying which bits to affect."""
action: ModifierAction = field(XmpInt())
"""coded integer, which action to perform on the affected bits."""
repetition: int = field(XmpInt())
"""integer, how many times to repeat on each packet. Note: For now the only value supported is 1."""
[docs]
class SetDataAttr(RequestBodyStruct):
position: int = field(XmpInt())
"""integer, the byte position from the start of the packet. Cannot be < 1!"""
mask: Hex = field(XmpHex(size=4))
"""four hex bytes, the mask specifying which bits to affect."""
action: ModifierAction = field(XmpInt())
"""coded integer, which action to perform on the affected bits."""
repetition: int = field(XmpInt())
"""integer, how many times to repeat on each packet. Note: For now the only value supported is 1."""
[docs]
def get(self) -> Token[GetDataAttr]:
"""Get an extended packet modifier for a stream header. The headers of each packet
transmitted for the stream will be varied according to the modifier
specification. The modifier acts on 24 bits and takes up the space for two
16-bit modifiers to do this. This command requires two sub-indices, one for
the stream and one for the modifier. A modifier is positioned at a fixed place
in the header, selects a number of consecutive bits starting from that position,
and applies an action to those bits in each packet. Packets can be repeated so
that a certain number of identical packets are transmitted before applying the
next modification.
:return: the byte position from the start of the packet. Cannot be < 1!,
the mask specifying which bits to affect, which action to perform on the affected bits,
and how many times to repeat on each packet. Note: For now the only value supported is 1.
:rtype: PS_MODIFIEREXT.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port, indices=[self._stream_xindex, self._modifier_xindex]))
[docs]
def set(self, position: int, mask: Hex, action: ModifierAction, repetition: int) -> Token[None]:
"""Set an extended packet modifier for a stream header. The headers of each packet
transmitted for the stream will be varied according to the modifier
specification. The modifier acts on 24 bits and takes up the space for two
16-bit modifiers to do this. This command requires two sub-indices, one for
the stream and one for the modifier. A modifier is positioned at a fixed place
in the header, selects a number of consecutive bits starting from that position,
and applies an action to those bits in each packet. Packets can be repeated so
that a certain number of identical packets are transmitted before applying the
next modification.
:param position: the byte position from the start of the packet. Cannot be < 1!
:type position: int
:param mask: the mask specifying which bits to affect
:type mask: str
:param action: which action to perform on the affected bits,
:type action: ModifierAction
:param repetition: how many times to repeat on each packet. Note: For now the only value supported is 1.
:type repetition: int
"""
return Token(
self._connection,
build_set_request(
self,
module=self._module,
port=self._port,
indices=[self._stream_xindex, self._modifier_xindex],
position=position,
mask=mask,
action=action,
repetition=repetition
)
)
set_inc = functools.partialmethod(set, action=ModifierAction.INC)
"""Set modifier action to Incrementing.
"""
set_dec = functools.partialmethod(set, action=ModifierAction.DEC)
"""Set modifier action to Decrementing.
"""
set_random = functools.partialmethod(set, action=ModifierAction.RANDOM)
"""Set modifier action to Random.
"""
[docs]
@register_command
@dataclass
class PS_MODIFIEREXTCOUNT:
"""
The number of extended 24-bit modifiers active on the packet header of a stream.
Each modifier is specified using PS_MODIFIEREXT.
"""
code: typing.ClassVar[int] = 191
pushed: typing.ClassVar[bool] = True
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class GetDataAttr(ResponseBodyStruct):
ext_modifier_count: int = field(XmpInt())
"""integer, the number of extended 24-bit modifiers for the stream."""
[docs]
class SetDataAttr(RequestBodyStruct):
ext_modifier_count: int = field(XmpInt())
"""integer, the number of extended 24-bit modifiers for the stream."""
[docs]
def get(self) -> Token[GetDataAttr]:
"""Get the number of extended 24-bit modifiers active on the packet header of a stream.
Each modifier is specified using PS_MODIFIEREXT.
:return: the number of extended 24-bit modifiers for the stream
:rtype: PS_MODIFIEREXTCOUNT.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
def set(self, ext_modifier_count: int) -> Token[None]:
"""Set the number of extended 24-bit modifiers active on the packet header of a stream.
Each modifier is specified using PS_MODIFIEREXT.
:param ext_modifier_count: the number of extended 24-bit modifiers for the stream
:type ext_modifier_count: int
"""
return Token(self._connection, build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex], ext_modifier_count=ext_modifier_count))
[docs]
@register_command
@dataclass
class PS_CDFOFFSET:
"""
This command is part of the Custom Data Field (CDF) feature. The CDF offset
for the stream is the location in the stream data packets where the various CDF
data will be inserted. All fields for a given stream uses the same offset
value. The default value is zero (0) which means that the CDF data will be
inserted at the very start of the packet, thus overwriting the packet protocol
headers. If you want the CDF data to start immediately after the end of the
packet protocol headers you will have to set the CDF field offset manually. The
feature requires that the P_PAYLOADMODE command on the parent port has been
set to CDF. This enables the feature for all streams on this port.
"""
code: typing.ClassVar[int] = 195
pushed: typing.ClassVar[bool] = True
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class GetDataAttr(ResponseBodyStruct):
offset: int = field(XmpInt())
"""integer, the location where the CDF data will be inserted."""
[docs]
class SetDataAttr(RequestBodyStruct):
offset: int = field(XmpInt())
"""integer, the location where the CDF data will be inserted."""
[docs]
def get(self) -> Token[GetDataAttr]:
"""Get the CDF offset for the stream. This command is part of the Custom Data Field (CDF) feature. The CDF offset
for the stream is the location in the stream data packets where the various CDF
data will be inserted. All fields for a given stream uses the same offset
value. The default value is zero (0) which means that the CDF data will be
inserted at the very start of the packet, thus overwriting the packet protocol
headers. If you want the CDF data to start immediately after the end of the
packet protocol headers you will have to set the CDF field offset manually. The
feature requires that the P_PAYLOADMODE command on the parent port has been
set to CDF. This enables the feature for all streams on this port.
:return: the location where the CDF data will be inserted
:rtype: PS_CDFOFFSET.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
def set(self, offset: int) -> Token[None]:
"""Set the CDF offset for the stream. This command is part of the Custom Data Field (CDF) feature. The CDF offset
for the stream is the location in the stream data packets where the various CDF
data will be inserted. All fields for a given stream uses the same offset
value. The default value is zero (0) which means that the CDF data will be
inserted at the very start of the packet, thus overwriting the packet protocol
headers. If you want the CDF data to start immediately after the end of the
packet protocol headers you will have to set the CDF field offset manually. The
feature requires that the P_PAYLOADMODE command on the parent port has been
set to CDF. This enables the feature for all streams on this port.
:param offset: the location where the CDF data will be inserted
:type offset: int
"""
return Token(self._connection, build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex], offset=offset))
[docs]
@register_command
@dataclass
class PS_CDFCOUNT:
"""
This command is part of the Custom Data Field (CDF) feature. It controls the
number of custom data fields available for each stream. You can set a different number
of fields for each stream. Changing the field count value to a larger value will
leave all existing fields intact. Changing the field count value to a smaller
value will remove all existing fields with an index larger than or equal to the
new count. The feature requires that the P_PAYLOADMODE command on the parent
port has been set to CDF. This enables the feature for all streams on this port.
"""
code: typing.ClassVar[int] = 196
pushed: typing.ClassVar[bool] = True
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class GetDataAttr(ResponseBodyStruct):
cdf_count: int = field(XmpInt())
"""integer, the number of CDF data fields to allocate for the stream."""
[docs]
class SetDataAttr(RequestBodyStruct):
cdf_count: int = field(XmpInt())
"""integer, the number of CDF data fields to allocate for the stream."""
[docs]
def get(self) -> Token[GetDataAttr]:
"""Get the number of custom data fields available for each stream. You can set a different number
of fields for each stream. Changing the field count value to a larger value will
leave all existing fields intact. Changing the field count value to a smaller
value will remove all existing fields with an index larger than or equal to the
new count. The feature requires that the P_PAYLOADMODE command on the parent
port has been set to CDF. This enables the feature for all streams on this port.
:return: the number of CDF data fields to allocate for the stream
:rtype: PS_CDFCOUNT.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
def set(self, cdf_count: int) -> Token[None]:
"""Set the number of custom data fields available for each stream. You can set a different number
of fields for each stream. Changing the field count value to a larger value will
leave all existing fields intact. Changing the field count value to a smaller
value will remove all existing fields with an index larger than or equal to the
new count. The feature requires that the P_PAYLOADMODE command on the parent
port has been set to CDF. This enables the feature for all streams on this port.
:param cdf_count: the number of CDF data fields to allocate for the stream
:type cdf_count: int
"""
return Token(self._connection, build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex], cdf_count=cdf_count))
[docs]
@register_command
@dataclass
class PS_CDFDATA:
"""
This command is part of the Custom Data Field (CDF) feature. It controls the
actual field data for a single field. It is possible to define fields with
different data lengths for each stream. If the length of a data field exceeds
(packet length - CDF offset) defined for the stream the field data will be
truncated when transmitted. The feature requires that the P_PAYLOADMODE
command on the parent port has been set to CDF. This enables the feature for
all streams on this port.
"""
code: typing.ClassVar[int] = 197
pushed: typing.ClassVar[bool] = True
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
_custom_data_field_xindex: int
[docs]
class GetDataAttr(ResponseBodyStruct):
hex_data: Hex = field(XmpHex())
"""list of hex bytes, the actual field data for a single field."""
[docs]
class SetDataAttr(RequestBodyStruct):
hex_data: Hex = field(XmpHex())
"""list of hex bytes, a pattern of bytes to be used."""
[docs]
def get(self) -> Token[GetDataAttr]:
"""Get the actual field data for a single field. It is possible to define fields with
different data lengths for each stream. If the length of a data field exceeds
(packet length - CDF offset) defined for the stream the field data will be
truncated when transmitted. The feature requires that the P_PAYLOADMODE
command on the parent port has been set to CDF. This enables the feature for
all streams on this port.
:return: the actual field data for a single field
:rtype: PS_CDFDATA.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port, indices=[self._stream_xindex, self._custom_data_field_xindex]))
[docs]
def set(self, hex_data: Hex) -> Token[None]:
"""Set the actual field data for a single field. It is possible to define fields with
different data lengths for each stream. If the length of a data field exceeds
(packet length - CDF offset) defined for the stream the field data will be
truncated when transmitted. The feature requires that the P_PAYLOADMODE
command on the parent port has been set to CDF. This enables the feature for
all streams on this port.
:param hex_data: a pattern of bytes to be used
:type hex_data: Hex
"""
return Token(
self._connection,
build_set_request(
self,
module=self._module,
port=self._port,
indices=[self._stream_xindex, self._custom_data_field_xindex],
hex_data=hex_data
)
)
[docs]
@register_command
@dataclass
class PS_EXTPAYLOAD:
"""
This command controls the extended payload feature. The PS_PAYLOAD command
described above only allow the user to specify an 18-byte pattern (when
PS_PAYLOAD is set to PATTERN). The PS_EXTPAYLOAD command allow the definition
of a much larger (up to MTU) payload buffer for each stream. The extended
payload will be inserted immediately after the end of the protocol segment area.
The feature requires the P_PAYLOADMODE command on the parent port being set to
EXTPL. This enables the feature for all streams on this port.
"""
code: typing.ClassVar[int] = 199
pushed: typing.ClassVar[bool] = True
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class GetDataAttr(ResponseBodyStruct):
hex_data: Hex = field(XmpHex())
"""list of hex bytes, a pattern of bytes to be repeated."""
[docs]
class SetDataAttr(RequestBodyStruct):
hex_data: Hex = field(XmpHex())
"""list of hex bytes, a pattern of bytes to be repeated."""
[docs]
def get(self) -> Token[GetDataAttr]:
"""Get the extended payload in bytes of a stream.
:return: the extended payload in bytes of a stream
:rtype: PS_EXTPAYLOAD.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
def set(self, hex_data: Hex) -> Token[None]:
"""Set the extended payload in bytes of a stream. The PS_EXTPAYLOAD command allow the definition
of a much larger (up to MTU) payload buffer for each stream. The extended
payload will be inserted immediately after the end of the protocol segment area.
The feature requires the P_PAYLOADMODE command on the parent port being set to
EXTPL. This enables the feature for all streams on this port.
:param hex_data: the extended payload in bytes of a stream
:type hex_data: Hex
"""
return Token(self._connection, build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex], hex_data=hex_data))
[docs]
@register_command
@dataclass
class PS_PFCPRIORITY:
"""
Set and get the Priority Flow Control (PFC) Cos value of a stream.
"""
code: typing.ClassVar[int] = 219
pushed: typing.ClassVar[bool] = True
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class GetDataAttr(ResponseBodyStruct):
cos: PFCMode = field(XmpByte())
"""coded byte, the Priority Flow Control (PFC) Cos value of a stream."""
[docs]
class SetDataAttr(RequestBodyStruct):
cos: PFCMode = field(XmpByte())
"""coded byte, the Priority Flow Control (PFC) Cos value of a stream."""
[docs]
def get(self) -> Token[GetDataAttr]:
"""Get the the Priority Flow Control (PFC) Cos value of a stream.
:return: the Priority Flow Control (PFC) Cos value of a stream.
:rtype: PS_PFCPRIORITY.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
def set(self, cos: PFCMode) -> Token[None]:
"""Set the Priority Flow Control (PFC) Cos value of a stream.
:param cos: the Priority Flow Control (PFC) Cos value of a stream.
:type cos: PFCMode
"""
return Token(self._connection, build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex], cos=cos))
[docs]
@register_command
@dataclass
class PS_OPTIONS:
"""
Define the set of active option flags for the stream. The set form sets the flags listed in <options>, and clears the flags not listed. To clear all flags, simply omit <options> in the command.
"""
code: typing.ClassVar[int] = 220
pushed: typing.ClassVar[bool] = False
_connection: 'interfaces.IConnection'
_module: int
_port: int
_stream_xindex: int
[docs]
class GetDataAttr(ResponseBodyStruct):
options: typing.List[StreamOption] = field(XmpSequence(types_chunk=[XmpByte()]))
"""coded byte, This flag affects the INC8/DEC8/INC16/DEC16 payload types (refer to the PS_PAYLOAD command): With the flag set, the first payload byte/word after the header will be 0 (INC8/INC16) or -1 (DEC8/DEC16). With the flag unset, the default is used: The first payload byte/word of the payload will be equal to <length of header> (INC8/INC16), or -<length of header> - 1 (DEC8/DEC16)."""
[docs]
class SetDataAttr(RequestBodyStruct):
options: typing.List[StreamOption] = field(XmpSequence(types_chunk=[XmpByte()]))
"""coded byte, This flag affects the INC8/DEC8/INC16/DEC16 payload types (refer to the PS_PAYLOAD command): With the flag set, the first payload byte/word after the header will be 0 (INC8/INC16) or -1 (DEC8/DEC16). With the flag unset, the default is used: The first payload byte/word of the payload will be equal to <length of header> (INC8/INC16), or -<length of header> - 1 (DEC8/DEC16)."""
[docs]
def get(self) -> Token[GetDataAttr]:
"""Define the set of active “option flags” for the stream. The “set” form sets the flags listed in <options>, and clears the flags not listed. To clear all flags, simply omit <options> in the command.
:return: the option flags
:rtype: PS_OPTION.GetDataAttr
"""
return Token(self._connection, build_get_request(self, module=self._module, port=self._port, indices=[self._stream_xindex]))
[docs]
def set(self, options: typing.List[StreamOption]) -> Token[None]:
"""Define the set of active “option flags” for the stream. The “set” form sets the flags listed in <options>, and clears the flags not listed. To clear all flags, simply omit <options> in the command.
:param options: the option flags
:type options: StreamOption
"""
return Token(self._connection, build_set_request(self, module=self._module, port=self._port, indices=[self._stream_xindex], options=options))