2.2. Quick Start
The XOA Driver offers more than just object-oriented APIs and functions for executing test scripts. It also provides a seamless integration with XOA CLI commands and port configuration files from XenaManager, enabling you to effortlessly work with them.
2.2.1. Basic Example
The simple code example demonstrates some basics of using HL-API and HL-FUNC:
Connect to a tester
Reserve a port as TX and another one as RX
Configure TX port
Configure a stream on the TX port
Start traffic on the TX port
Wait for 10 seconds
Collect statistics on the TX port
Collect statistics on the RX port
Release the ports
Disconnect from the chassis
We will first walk you through step-by-step covering the topics above. At the end, you will see the whole example. If you want to try it out, you can simply copy and paste it into your environment and run. Remember to change the IP address to your tester.
This is boilerplate.
import asyncio
from xoa_driver import testers
from xoa_driver import modules
from xoa_driver import ports
from xoa_driver import enums
from xoa_driver import utils
from xoa_driver.hlfuncs import mgmt, headers
from xoa_driver.misc import Hex
import ipaddress
import logging
#---------------------------
# GLOBAL PARAMS
#---------------------------
CHASSIS_IP = "demo.xenanetworks.com"
USERNAME = "quick_start"
PORT1 = "0/0"
PORT2 = "0/1"
async def my_awesome_func(chassis: str, username: str, port_str1: str, port_str2: str):
stop_event = asyncio.Event()
try:
await my_awesome_func(
chassis=CHASSIS_IP,
username=USERNAME,
port_str1=PORT1,
port_str2=PORT2
)
except KeyboardInterrupt:
stop_event.set()
if __name__ == "__main__":
asyncio.run(main())
To establish a connection to a tester is simple.
# Establish connection to a Valkyrie tester using Python context manager
# The connection will be automatically terminated when it is out of the block
async with testers.L23Tester(host=chassis, username=username, password="xena", port=22606, enable_logging=False) as tester:
Access module on the tester. The method obtain()
is for accessing a test resource that cannot be deleted, such as a module or a port. You can read more about this method in Module Manager and Port Manager.
module_obj1 = tester.modules.obtain(_mid1)
module_obj2 = tester.modules.obtain(_mid2)
After that, the driver knows you are using the desired module, and then you can access ports on the module. Let’s use two ports, one as TX, the other RX.
# Get the port on module as TX port
tx_port = module_obj1.ports.obtain(_pid1)
# Get the port on module as RX port
rx_port = module_obj2.ports.obtain(_pid2)
# Forcibly reserve the ports and reset
await mgmt.reserve_port(tx_port, reset=True)
await mgmt.reserve_port(rx_port, reset=True)
await asyncio.sleep(2)
Now we have two ports ready to configure. Let’s first configure the TX port.
#################################################
# TX Port Configuration #
#################################################
# Simple batch configure the TX port
await utils.apply(
tx_port.comment.set(comment="this is tx port"),
tx_port.interframe_gap.set(min_byte_count=20),
tx_port.loop_back.set(mode=enums.LoopbackMode.NONE),
tx_port.tx_config.packet_limit.set(packet_count_limit=1_000_000),
tx_port.tx_config.enable.set(on_off=enums.OnOff.ON),
tx_port.net_config.mac_address.set(mac_address=Hex("BBBBBBBBBBBB")),
tx_port.net_config.ipv4.address.set(
ipv4_address=ipaddress.IPv4Address("10.10.10.10"),
subnet_mask=ipaddress.IPv4Address("255.255.255.0"),
gateway=ipaddress.IPv4Address("10.10.10.1"),
wild=ipaddress.IPv4Address("0.0.0.0")),
# for more port configuration, please go to https://docs.xenanetworks.com/projects/xoa-python-api
)
The await utils.apply()
lets us group several commands bound for the same port into a larger “command”. This is called Sequential Grouping.
Note
After reset, the attributes of the port are set to default values. You can use the get()
medthod to query the default values.
You don’t necessarily need to set all the attributes. The default values are usually good enough for most tests. Just pick the ones you need to set.
Now we can create a stream on the TX port.
#################################################
# Stream Configuration #
#################################################
# Create a stream on the port
# Stream index is automatically assigned
my_stream = await tx_port.streams.create()
stream_index = my_stream.idx
logging.info(f"TX stream index: {stream_index}")
eth = headers.Ethernet()
eth.dst_mac = "aaaa.aaaa.aaaa"
eth.src_mac = "bbbb.bbbb.bbbb"
eth.ethertype = headers.EtherType.VLAN
vlan = headers.VLAN()
vlan.id = 100
vlan.type = headers.EtherType.IPv4
ip = headers.IPV4()
ip.proto = headers.IPProtocol.NONE
ip.src = "10.10.10.10"
ip.dst = "11.11.11.11"
ip.total_length = 1000 - int(len(str(eth))/2) - int(len(str(vlan))/2) - 4
# Simple batch configure the stream on the TX port
await utils.apply(
my_stream.tpld_id.set(test_payload_identifier=0),
my_stream.enable.set_on(),
my_stream.comment.set(comment="this is a stream"),
my_stream.payload.content.set(payload_type=enums.PayloadType.PATTERN, hex_data=Hex("DEAD")),
my_stream.rate.pps.set(stream_rate_pps=100_000),
my_stream.packet.length.set(length_type=enums.LengthType.FIXED, min_val=1000, max_val=1000),
my_stream.packet.header.protocol.set(segments=[
enums.ProtocolOption.ETHERNET,
enums.ProtocolOption.VLAN,
enums.ProtocolOption.IP]),
my_stream.packet.header.data.set(hex_data=Hex(str(eth)+str(vlan)+str(ip)))
# for more stream configuration, please go to https://docs.xenanetworks.com/projects/xoa-python-api
)
Then, we want to clear the statistics counters of both TX and RX ports before starting the traffic. This is to make sure the counters are not polluted by any previous test. We can use Parallel Grouping to group commands bound for different ports into a larger “command”.
#################################################
# Traffic Control #
#################################################
# Batch clear statistics on TX and RX ports
await asyncio.gather(
tx_port.statistics.tx.clear.set(),
tx_port.statistics.rx.clear.set(),
rx_port.statistics.tx.clear.set(),
rx_port.statistics.rx.clear.set()
)
Now, let’s start the traffic on the TX port for roughly 10 seconds and stop. It is “roughly” because we use sleep()
to control the duration. It may feel accurate to you but for a Xena tester that can generate 800Gbps traffic with time measurement to nanosecond range, sleep()
is far from accurate in terms of time controlling. If your test requires high-accuracy time control, don’t use software to control time. Instead, limit the port’s TX time so that you can have down to microsecond-range traffic duration.
# Start traffic on the TX port
await tx_port.traffic.state.set_start()
# Test duration 10 seconds
await asyncio.sleep(10)
# Stop traffic on the TX port
await tx_port.traffic.state.set_stop()
# Wait 2 seconds for the counters to finish
await asyncio.sleep(2)
After the traffic is stopped, we query statistic counters.
#################################################
# Statistics #
#################################################
# Query TX statistics
tx_total, tx_stream = await utils.apply(
# port level statistics
tx_port.statistics.tx.total.get(),
# stream level statistics
# let the resource manager tell you the stream index so you don't have to remember it
tx_port.statistics.tx.obtain_from_stream(my_stream).get()
)
logging.info(f"Total TX byte count since cleared: {tx_total.byte_count_since_cleared}")
logging.info(f"Total TX packet count since cleared: {tx_total.packet_count_since_cleared}")
logging.info(f"Stream {my_stream.idx} TX byte count since cleared: {tx_stream.byte_count_since_cleared}")
logging.info(f"Stream {my_stream.idx} TX packet count since cleared: {tx_stream.packet_count_since_cleared}")
# if you have forgot what TPLD ID assigned to a stream, you can query it
resp = await my_stream.tpld_id.get()
tpld_id = resp.test_payload_identifier
received_tplds = await rx_port.statistics.rx.obtain_available_tplds()
for i in received_tplds:
logging.info(f"RX TPLD index: {i}")
# then access the RX stat object
rx_stats_obj = rx_port.statistics.rx.access_tpld(tpld_id)
# then query each stats of a TPLD ID
rx_total, rx_traffic, rx_latency, rx_jitter, rx_error = await utils.apply(
# port level statistics
rx_port.statistics.rx.total.get(),
# tpld level traffic stats
rx_stats_obj.traffic.get(),
# tpld level latency stats
rx_stats_obj.latency.get(),
# tpld level jitter stats
rx_stats_obj.jitter.get(),
# tpld level error stats
rx_stats_obj.errors.get()
)
logging.info(f"Total RX byte count since cleared: {rx_total.byte_count_since_cleared}")
logging.info(f"Total RX packet count since cleared: {rx_total.packet_count_since_cleared}")
logging.info(f"TPLD {tpld_id} RX byte count since cleared: {rx_traffic.byte_count_since_cleared}")
logging.info(f"TPLD {tpld_id} RX packet count since cleared: {rx_traffic.packet_count_since_cleared}")
logging.info(f"TPLD {tpld_id} RX min latency: {rx_latency.min_val}")
logging.info(f"TPLD {tpld_id} RX max latency: {rx_latency.max_val}")
logging.info(f"TPLD {tpld_id} RX avg latency: {rx_latency.avg_val}")
logging.info(f"TPLD {tpld_id} RX min jitter: {rx_jitter.min_val}")
logging.info(f"TPLD {tpld_id} RX max jitter: {rx_jitter.max_val}")
logging.info(f"TPLD {tpld_id} RX avg jitter: {rx_jitter.avg_val}")
logging.info(f"TPLD {tpld_id} RX Lost Packets: {rx_error.non_incre_seq_event_count}")
logging.info(f"TPLD {tpld_id} RX Misordered: {rx_error.swapped_seq_misorder_event_count}")
logging.info(f"TPLD {tpld_id} RX Payload Errors: {rx_error.non_incre_payload_packet_count}")
At last, release the ports as a good practice, so your team members know the ports are free to use.
from xoa_driver import modules
from xoa_driver import ports
from xoa_driver import enums
from xoa_driver import utils
from xoa_driver.hlfuncs import mgmt, headers
from xoa_driver.misc import Hex
import ipaddress
import logging
#---------------------------
# GLOBAL PARAMS
#---------------------------
CHASSIS_IP = "demo.xenanetworks.com"
USERNAME = "quick_start"
PORT1 = "0/0"
PORT2 = "0/1"
async def my_awesome_func(chassis: str, username: str, port_str1: str, port_str2: str):
# configure basic logger
logging.basicConfig(
format="%(asctime)s %(message)s",
level=logging.DEBUG,
handlers=[
logging.FileHandler(filename="test.log", mode="a"),
logging.StreamHandler()]
)
# Establish connection to a Valkyrie tester using Python context manager
# The connection will be automatically terminated when it is out of the block
async with testers.L23Tester(host=chassis, username=username, password="xena", port=22606, enable_logging=False) as tester:
logging.info(f"===================================")
logging.info(f"{'Connect to chassis:':<20}{chassis}")
logging.info(f"{'Username:':<20}{username}")
# Access modules on the tester
_mid1 = int(port_str1.split("/")[0])
_pid1 = int(port_str1.split("/")[1])
_mid2 = int(port_str2.split("/")[0])
_pid2 = int(port_str2.split("/")[1])
module_obj1 = tester.modules.obtain(_mid1)
module_obj2 = tester.modules.obtain(_mid2)
if isinstance(module_obj1, modules.E100ChimeraModule):
return None # commands which used in this example are not supported by Chimera Module
if isinstance(module_obj2, modules.E100ChimeraModule):
return None # commands which used in this example are not supported by Chimera Module
# Get the port on module as TX port
tx_port = module_obj1.ports.obtain(_pid1)
# Get the port on module as RX port
rx_port = module_obj2.ports.obtain(_pid2)
# Forcibly reserve the ports and reset
await mgmt.reserve_port(tx_port, reset=True)
await mgmt.reserve_port(rx_port, reset=True)
await asyncio.sleep(2)
#################################################
# TX Port Configuration #
#################################################
# Simple batch configure the TX port
await utils.apply(
tx_port.comment.set(comment="this is tx port"),
tx_port.interframe_gap.set(min_byte_count=20),
tx_port.loop_back.set(mode=enums.LoopbackMode.NONE),
tx_port.tx_config.packet_limit.set(packet_count_limit=1_000_000),
tx_port.tx_config.enable.set(on_off=enums.OnOff.ON),
tx_port.net_config.mac_address.set(mac_address=Hex("BBBBBBBBBBBB")),
tx_port.net_config.ipv4.address.set(
ipv4_address=ipaddress.IPv4Address("10.10.10.10"),
subnet_mask=ipaddress.IPv4Address("255.255.255.0"),
gateway=ipaddress.IPv4Address("10.10.10.1"),
wild=ipaddress.IPv4Address("0.0.0.0")),
# for more port configuration, please go to https://docs.xenanetworks.com/projects/xoa-python-api
)
#################################################
# Stream Configuration #
#################################################
# Create a stream on the port
# Stream index is automatically assigned
my_stream = await tx_port.streams.create()
stream_index = my_stream.idx
logging.info(f"TX stream index: {stream_index}")
eth = headers.Ethernet()
eth.dst_mac = "aaaa.aaaa.aaaa"
eth.src_mac = "bbbb.bbbb.bbbb"
eth.ethertype = headers.EtherType.VLAN
vlan = headers.VLAN()
vlan.id = 100
vlan.type = headers.EtherType.IPv4
ip = headers.IPV4()
ip.proto = headers.IPProtocol.NONE
ip.src = "10.10.10.10"
ip.dst = "11.11.11.11"
ip.total_length = 1000 - int(len(str(eth))/2) - int(len(str(vlan))/2) - 4
# Simple batch configure the stream on the TX port
await utils.apply(
my_stream.tpld_id.set(test_payload_identifier=0),
my_stream.enable.set_on(),
my_stream.comment.set(comment="this is a stream"),
my_stream.payload.content.set(payload_type=enums.PayloadType.PATTERN, hex_data=Hex("DEAD")),
my_stream.rate.pps.set(stream_rate_pps=100_000),
my_stream.packet.length.set(length_type=enums.LengthType.FIXED, min_val=1000, max_val=1000),
my_stream.packet.header.protocol.set(segments=[
enums.ProtocolOption.ETHERNET,
enums.ProtocolOption.VLAN,
enums.ProtocolOption.IP]),
my_stream.packet.header.data.set(hex_data=Hex(str(eth)+str(vlan)+str(ip)))
# for more stream configuration, please go to https://docs.xenanetworks.com/projects/xoa-python-api
)
#################################################
# Traffic Control #
#################################################
# Batch clear statistics on TX and RX ports
await asyncio.gather(
tx_port.statistics.tx.clear.set(),
tx_port.statistics.rx.clear.set(),
rx_port.statistics.tx.clear.set(),
rx_port.statistics.rx.clear.set()
)
# Start traffic on the TX port
await tx_port.traffic.state.set_start()
# Test duration 10 seconds
await asyncio.sleep(10)
# Stop traffic on the TX port
await tx_port.traffic.state.set_stop()
# Wait 2 seconds for the counters to finish
await asyncio.sleep(2)
#################################################
# Statistics #
#################################################
# Query TX statistics
tx_total, tx_stream = await utils.apply(
# port level statistics
tx_port.statistics.tx.total.get(),
# stream level statistics
# let the resource manager tell you the stream index so you don't have to remember it
tx_port.statistics.tx.obtain_from_stream(my_stream).get()
)
logging.info(f"Total TX byte count since cleared: {tx_total.byte_count_since_cleared}")
logging.info(f"Total TX packet count since cleared: {tx_total.packet_count_since_cleared}")
logging.info(f"Stream {my_stream.idx} TX byte count since cleared: {tx_stream.byte_count_since_cleared}")
logging.info(f"Stream {my_stream.idx} TX packet count since cleared: {tx_stream.packet_count_since_cleared}")
# if you have forgot what TPLD ID assigned to a stream, you can query it
resp = await my_stream.tpld_id.get()
tpld_id = resp.test_payload_identifier
received_tplds = await rx_port.statistics.rx.obtain_available_tplds()
for i in received_tplds:
logging.info(f"RX TPLD index: {i}")
# then access the RX stat object
rx_stats_obj = rx_port.statistics.rx.access_tpld(tpld_id)
# then query each stats of a TPLD ID
rx_total, rx_traffic, rx_latency, rx_jitter, rx_error = await utils.apply(
# port level statistics
rx_port.statistics.rx.total.get(),
# tpld level traffic stats
rx_stats_obj.traffic.get(),
# tpld level latency stats
rx_stats_obj.latency.get(),
# tpld level jitter stats
rx_stats_obj.jitter.get(),
# tpld level error stats
rx_stats_obj.errors.get()
)
logging.info(f"Total RX byte count since cleared: {rx_total.byte_count_since_cleared}")
logging.info(f"Total RX packet count since cleared: {rx_total.packet_count_since_cleared}")
logging.info(f"TPLD {tpld_id} RX byte count since cleared: {rx_traffic.byte_count_since_cleared}")
logging.info(f"TPLD {tpld_id} RX packet count since cleared: {rx_traffic.packet_count_since_cleared}")
logging.info(f"TPLD {tpld_id} RX min latency: {rx_latency.min_val}")
logging.info(f"TPLD {tpld_id} RX max latency: {rx_latency.max_val}")
logging.info(f"TPLD {tpld_id} RX avg latency: {rx_latency.avg_val}")
logging.info(f"TPLD {tpld_id} RX min jitter: {rx_jitter.min_val}")
logging.info(f"TPLD {tpld_id} RX max jitter: {rx_jitter.max_val}")
logging.info(f"TPLD {tpld_id} RX avg jitter: {rx_jitter.avg_val}")
logging.info(f"TPLD {tpld_id} RX Lost Packets: {rx_error.non_incre_seq_event_count}")
logging.info(f"TPLD {tpld_id} RX Misordered: {rx_error.swapped_seq_misorder_event_count}")
logging.info(f"TPLD {tpld_id} RX Payload Errors: {rx_error.non_incre_payload_packet_count}")
#################################################
# Release #
#################################################
# Release the ports
await mgmt.release_ports(tx_port, rx_port)
The entire example is here.
################################################################
#
# QUICK START
#
# What this script example does:
# 1. Connect to a tester
# 2. Reserve a port as TX and another one as RX
# 3. Configure TX port
# 4. Configure a stream on the TX port
# 5. Start traffic on the TX port
# 6. Wait for 10 seconds
# 7. Collect statistics on the TX port
# 8. Collect statistics on the RX port
# 9. Release the ports
# 10. Disconnect from the chassis
#
################################################################
import asyncio
from xoa_driver import testers
from xoa_driver import modules
from xoa_driver import ports
from xoa_driver import enums
from xoa_driver import utils
from xoa_driver.hlfuncs import mgmt, headers
from xoa_driver.misc import Hex
import ipaddress
import logging
#---------------------------
# GLOBAL PARAMS
#---------------------------
CHASSIS_IP = "demo.xenanetworks.com"
USERNAME = "quick_start"
PORT1 = "0/0"
PORT2 = "0/1"
async def my_awesome_func(chassis: str, username: str, port_str1: str, port_str2: str):
# configure basic logger
logging.basicConfig(
format="%(asctime)s %(message)s",
level=logging.DEBUG,
handlers=[
logging.FileHandler(filename="test.log", mode="a"),
logging.StreamHandler()]
)
# Establish connection to a Valkyrie tester using Python context manager
# The connection will be automatically terminated when it is out of the block
async with testers.L23Tester(host=chassis, username=username, password="xena", port=22606, enable_logging=False) as tester:
logging.info(f"===================================")
logging.info(f"{'Connect to chassis:':<20}{chassis}")
logging.info(f"{'Username:':<20}{username}")
# Access modules on the tester
_mid1 = int(port_str1.split("/")[0])
_pid1 = int(port_str1.split("/")[1])
_mid2 = int(port_str2.split("/")[0])
_pid2 = int(port_str2.split("/")[1])
module_obj1 = tester.modules.obtain(_mid1)
module_obj2 = tester.modules.obtain(_mid2)
if isinstance(module_obj1, modules.E100ChimeraModule):
return None # commands which used in this example are not supported by Chimera Module
if isinstance(module_obj2, modules.E100ChimeraModule):
return None # commands which used in this example are not supported by Chimera Module
# Get the port on module as TX port
tx_port = module_obj1.ports.obtain(_pid1)
# Get the port on module as RX port
rx_port = module_obj2.ports.obtain(_pid2)
# Forcibly reserve the ports and reset
await mgmt.reserve_port(tx_port, reset=True)
await mgmt.reserve_port(rx_port, reset=True)
await asyncio.sleep(2)
#################################################
# TX Port Configuration #
#################################################
# Simple batch configure the TX port
await utils.apply(
tx_port.comment.set(comment="this is tx port"),
tx_port.interframe_gap.set(min_byte_count=20),
tx_port.loop_back.set(mode=enums.LoopbackMode.NONE),
tx_port.tx_config.packet_limit.set(packet_count_limit=1_000_000),
tx_port.tx_config.enable.set(on_off=enums.OnOff.ON),
tx_port.net_config.mac_address.set(mac_address=Hex("BBBBBBBBBBBB")),
tx_port.net_config.ipv4.address.set(
ipv4_address=ipaddress.IPv4Address("10.10.10.10"),
subnet_mask=ipaddress.IPv4Address("255.255.255.0"),
gateway=ipaddress.IPv4Address("10.10.10.1"),
wild=ipaddress.IPv4Address("0.0.0.0")),
# for more port configuration, please go to https://docs.xenanetworks.com/projects/xoa-python-api
)
#################################################
# Stream Configuration #
#################################################
# Create a stream on the port
# Stream index is automatically assigned
my_stream = await tx_port.streams.create()
stream_index = my_stream.idx
logging.info(f"TX stream index: {stream_index}")
eth = headers.Ethernet()
eth.dst_mac = "aaaa.aaaa.aaaa"
eth.src_mac = "bbbb.bbbb.bbbb"
eth.ethertype = headers.EtherType.VLAN
vlan = headers.VLAN()
vlan.id = 100
vlan.type = headers.EtherType.IPv4
ip = headers.IPV4()
ip.proto = headers.IPProtocol.NONE
ip.src = "10.10.10.10"
ip.dst = "11.11.11.11"
ip.total_length = 1000 - int(len(str(eth))/2) - int(len(str(vlan))/2) - 4
# Simple batch configure the stream on the TX port
await utils.apply(
my_stream.tpld_id.set(test_payload_identifier=0),
my_stream.enable.set_on(),
my_stream.comment.set(comment="this is a stream"),
my_stream.payload.content.set(payload_type=enums.PayloadType.PATTERN, hex_data=Hex("DEAD")),
my_stream.rate.pps.set(stream_rate_pps=100_000),
my_stream.packet.length.set(length_type=enums.LengthType.FIXED, min_val=1000, max_val=1000),
my_stream.packet.header.protocol.set(segments=[
enums.ProtocolOption.ETHERNET,
enums.ProtocolOption.VLAN,
enums.ProtocolOption.IP]),
my_stream.packet.header.data.set(hex_data=Hex(str(eth)+str(vlan)+str(ip)))
# for more stream configuration, please go to https://docs.xenanetworks.com/projects/xoa-python-api
)
#################################################
# Traffic Control #
#################################################
# Batch clear statistics on TX and RX ports
await asyncio.gather(
tx_port.statistics.tx.clear.set(),
tx_port.statistics.rx.clear.set(),
rx_port.statistics.tx.clear.set(),
rx_port.statistics.rx.clear.set()
)
# Start traffic on the TX port
await tx_port.traffic.state.set_start()
# Test duration 10 seconds
await asyncio.sleep(10)
# Stop traffic on the TX port
await tx_port.traffic.state.set_stop()
# Wait 2 seconds for the counters to finish
await asyncio.sleep(2)
#################################################
# Statistics #
#################################################
# Query TX statistics
tx_total, tx_stream = await utils.apply(
# port level statistics
tx_port.statistics.tx.total.get(),
# stream level statistics
# let the resource manager tell you the stream index so you don't have to remember it
tx_port.statistics.tx.obtain_from_stream(my_stream).get()
)
logging.info(f"Total TX byte count since cleared: {tx_total.byte_count_since_cleared}")
logging.info(f"Total TX packet count since cleared: {tx_total.packet_count_since_cleared}")
logging.info(f"Stream {my_stream.idx} TX byte count since cleared: {tx_stream.byte_count_since_cleared}")
logging.info(f"Stream {my_stream.idx} TX packet count since cleared: {tx_stream.packet_count_since_cleared}")
# if you have forgot what TPLD ID assigned to a stream, you can query it
resp = await my_stream.tpld_id.get()
tpld_id = resp.test_payload_identifier
received_tplds = await rx_port.statistics.rx.obtain_available_tplds()
for i in received_tplds:
logging.info(f"RX TPLD index: {i}")
# then access the RX stat object
rx_stats_obj = rx_port.statistics.rx.access_tpld(tpld_id)
# then query each stats of a TPLD ID
rx_total, rx_traffic, rx_latency, rx_jitter, rx_error = await utils.apply(
# port level statistics
rx_port.statistics.rx.total.get(),
# tpld level traffic stats
rx_stats_obj.traffic.get(),
# tpld level latency stats
rx_stats_obj.latency.get(),
# tpld level jitter stats
rx_stats_obj.jitter.get(),
# tpld level error stats
rx_stats_obj.errors.get()
)
logging.info(f"Total RX byte count since cleared: {rx_total.byte_count_since_cleared}")
logging.info(f"Total RX packet count since cleared: {rx_total.packet_count_since_cleared}")
logging.info(f"TPLD {tpld_id} RX byte count since cleared: {rx_traffic.byte_count_since_cleared}")
logging.info(f"TPLD {tpld_id} RX packet count since cleared: {rx_traffic.packet_count_since_cleared}")
logging.info(f"TPLD {tpld_id} RX min latency: {rx_latency.min_val}")
logging.info(f"TPLD {tpld_id} RX max latency: {rx_latency.max_val}")
logging.info(f"TPLD {tpld_id} RX avg latency: {rx_latency.avg_val}")
logging.info(f"TPLD {tpld_id} RX min jitter: {rx_jitter.min_val}")
logging.info(f"TPLD {tpld_id} RX max jitter: {rx_jitter.max_val}")
logging.info(f"TPLD {tpld_id} RX avg jitter: {rx_jitter.avg_val}")
logging.info(f"TPLD {tpld_id} RX Lost Packets: {rx_error.non_incre_seq_event_count}")
logging.info(f"TPLD {tpld_id} RX Misordered: {rx_error.swapped_seq_misorder_event_count}")
logging.info(f"TPLD {tpld_id} RX Payload Errors: {rx_error.non_incre_payload_packet_count}")
#################################################
# Release #
#################################################
# Release the ports
await mgmt.release_ports(tx_port, rx_port)
async def main():
stop_event = asyncio.Event()
try:
await my_awesome_func(
chassis=CHASSIS_IP,
username=USERNAME,
port_str1=PORT1,
port_str2=PORT2
)
except KeyboardInterrupt:
stop_event.set()
if __name__ == "__main__":
asyncio.run(main())
2.2.2. Integrate with CLI and XenaManager
The simple code example demonstrates how to use XOA Driver :
Establish connection to a Xena tester.
Reserve a port.
Port configuration from .xpc file
Port configuration from CLI commands
Module configuration from file
Module configuration from CLI commands
Chassis configuration from file
Chassis configuration from CLI commands
We will first walk you through step-by-step covering the topics above. At the end, you will see the whole example. If you want to try it out, you can simply copy and paste it into your environment and run. Remember to change the IP address to your tester’s.
This is boilerplate.
import asyncio
from contextlib import suppress
from xoa_driver import (
testers,
modules,
ports,
enums,
utils,
exceptions
)
from xoa_driver.hlfuncs import (
mgmt,
cli
)
import logging
#---------------------------
# GLOBAL PARAMS
#---------------------------
CHASSIS_IP = "10.20.1.166"
USERNAME = "xoa"
PORT = "8/0"
#---------------------------
# cli_integration
#---------------------------
async def cli_integration(chassis: str, username: str, port_str: str):
stop_event = asyncio.Event()
try:
await cli_integration(
chassis=CHASSIS_IP,
username=USERNAME,
port_str=PORT
)
except KeyboardInterrupt:
stop_event.set()
if __name__ == "__main__":
asyncio.run(main())
You can use Save Port Configuration in XenaManager to download port configuration files, which contain CLI commands inside. To “upload” the port configuration file generated by XenaManager, simply do:
# In XenaManager, port configurations are saved into files with extension **.xpc** in the same command format as used by [XOA CLI](https://docs.xenanetworks.com/projects/xoa-cli/). This makes it very easy to go back and forth between a XenaManager environment and a XOA CLI environment. For example, exporting a port configuration from ValkyrieCLIManager generates a configuration file in a simple text format that can be edited using a text editing tool such as Microsoft Notepad. It can then be imported back into XenaManager.
await cli.port_config_from_file(port=port_obj, path="port_config.xpc")
# Alternatively, you can also configure port with CLI commands
In addition to set port configuration from an xpc file, you can also send CLI commands using XOA Driver.
await cli.port_config_from_string(
port=port_obj,
long_str="""
P_RESET
P_COMMENT \"This is a comment\"
P_MACADDRESS 0xAAAAAABBBB99
P_IPADDRESS 1.1.1.1 0.0.0.0 0.0.0.0 0.0.0.0
""")
# Release the port
You can set module or chassis configuration in the same way, either from a file or from command strings.
#########################################
# Use Tester Config File #
#########################################
# Reserve the tester
await mgmt.reserve_tester(tester=tester)
# Configure module with .xtc2 file's module config part
await cli.tester_config_from_file(tester=tester, path="tester_config.txt")
# Alternatively, you can also configure tester with CLI commands
await cli.tester_config_from_string(
tester=tester,
long_str="""
C_COMMENT \"This is a comment\"
""")
# Release the tester
await mgmt.release_tester(tester)
#########################################
# Use Module Config File #
#########################################
# access module on the tester
_mid = int(port_str.split("/")[0])
module_obj = tester.modules.obtain(_mid)
await mgmt.reserve_module(module=module_obj)
# Configure module with .xtc2 file's module config part
await cli.module_config_from_file(module=module_obj, path="module_config.xtc2")
# Alternatively, you can also configure module with CLI commands
await cli.module_config_from_string(
module=module_obj,
long_str="""
M_MEDIA QSFP28
M_CFPCONFIGEXT 8 25000 25000 25000 25000 25000 25000 25000 25000
M_COMMENT \"This is a comment\"
""")
# Release the module
await mgmt.release_module(module_obj)
The entire example is here.
################################################################
#
# CLI-XOA PYTHON INTEGRATION
#
# The XOA Python API seamlessly integrates with the XOA CLI,
# enabling users to work with CLI commands effortlessly within
# their Python scripts.
#
# The simple code example demonstrates how to use XOA Python API :
#
# * Establish connection to a Valkyrie tester.
# * Reserve a port.
# * Port configuration from `.xpc` file
# * Port configuration from CLI commands
# * Module configuration from file
# * Module configuration from CLI commands
# * Chassis configuration from file
# * Chassis configuration from CLI commands
#
################################################################
import asyncio
from contextlib import suppress
from xoa_driver import (
testers,
modules,
ports,
enums,
utils,
exceptions
)
from xoa_driver.hlfuncs import (
mgmt,
cli
)
import logging
#---------------------------
# GLOBAL PARAMS
#---------------------------
CHASSIS_IP = "10.20.1.166"
USERNAME = "xoa"
PORT = "8/0"
#---------------------------
# cli_integration
#---------------------------
async def cli_integration(chassis: str, username: str, port_str: str):
# configure basic logger
logging.basicConfig(
format="%(asctime)s %(message)s",
level=logging.DEBUG,
handlers=[
logging.FileHandler(filename="test.log", mode="a"),
logging.StreamHandler()]
)
# create tester instance and establish connection
async with testers.L23Tester(host=chassis, username=username, password="xena", port=22606, enable_logging=False) as tester:
logging.info(f"===================================")
logging.info(f"{'Connect to chassis:':<20}{chassis}")
logging.info(f"{'Username:':<20}{username}")
#########################################
# Use Tester Config File #
#########################################
# Reserve the tester
await mgmt.reserve_tester(tester=tester)
# Configure module with .xtc2 file's module config part
await cli.tester_config_from_file(tester=tester, path="tester_config.txt")
# Alternatively, you can also configure tester with CLI commands
await cli.tester_config_from_string(
tester=tester,
long_str="""
C_COMMENT \"This is a comment\"
""")
# Release the tester
await mgmt.release_tester(tester)
#########################################
# Use Module Config File #
#########################################
# access module on the tester
_mid = int(port_str.split("/")[0])
module_obj = tester.modules.obtain(_mid)
await mgmt.reserve_module(module=module_obj)
# Configure module with .xtc2 file's module config part
await cli.module_config_from_file(module=module_obj, path="module_config.xtc2")
# Alternatively, you can also configure module with CLI commands
await cli.module_config_from_string(
module=module_obj,
long_str="""
M_MEDIA QSFP28
M_CFPCONFIGEXT 8 25000 25000 25000 25000 25000 25000 25000 25000
M_COMMENT \"This is a comment\"
""")
# Release the module
await mgmt.release_module(module_obj)
if isinstance(module_obj, modules.ModuleChimera):
return None
#######################################
# Use Port Config File #
#######################################
# access port 0 on the module as the TX port
_pid = int(port_str.split("/")[1])
port_obj = module_obj.ports.obtain(_pid)
await mgmt.reserve_port(port=port_obj)
# Configure port with .xpc file generated by XenaManager
# In XenaManager, port configurations are saved into files with extension **.xpc** in the same command format as used by [XOA CLI](https://docs.xenanetworks.com/projects/xoa-cli/). This makes it very easy to go back and forth between a XenaManager environment and a XOA CLI environment. For example, exporting a port configuration from ValkyrieCLIManager generates a configuration file in a simple text format that can be edited using a text editing tool such as Microsoft Notepad. It can then be imported back into XenaManager.
await cli.port_config_from_file(port=port_obj, path="port_config.xpc")
# Alternatively, you can also configure port with CLI commands
await cli.port_config_from_string(
port=port_obj,
long_str="""
P_RESET
P_COMMENT \"This is a comment\"
P_MACADDRESS 0xAAAAAABBBB99
P_IPADDRESS 1.1.1.1 0.0.0.0 0.0.0.0 0.0.0.0
""")
# Release the port
await mgmt.release_port(port_obj)
async def main():
stop_event = asyncio.Event()
try:
await cli_integration(
chassis=CHASSIS_IP,
username=USERNAME,
port_str=PORT
)
except KeyboardInterrupt:
stop_event.set()
if __name__ == "__main__":
asyncio.run(main())