2.2. Quick Start
The XOA Python API offers more than just object-oriented APIs and functions for executing test scripts. It also provides a seamless integration with CLI commands and port configuration files from ValkyrieManager. , enabling you to effortlessly work with them.
Note
Integration with CLI commands and ValkyrieManager is supported by version >= 2.1.1.
2.2.1. Scripting with XOA Python API
The simple code example demonstrates some basics of using HL-API and :term: HL-FUNC:
Establish connection to a Valkyrie tester.
Reserve a port.
Create a stream on the port.
Configure the stream.
Start traffic.
Collect statistics.
Stop traffic
We will first walk you through step-by-step covering the topics above. At the end, you will see the whole example. If you want to try it out, you can simply copy and paste it into your environment and run. Remember to change the IP address to your tester’s.
This is boilerplate.
import asyncio
from xoa_driver import testers
from xoa_driver import modules
from xoa_driver import ports
from xoa_driver import enums
from xoa_driver import utils
from xoa_driver.hlfuncs import mgmt
async def my_awesome_func():
def main():
try:
loop = asyncio.get_event_loop()
loop.create_task(my_awesome_func())
loop.run_forever()
except KeyboardInterrupt:
pass
if __name__ == "__main__":
main()
To establish a connection to a tester is simple.
# Establish connection to a Valkyrie tester 10.10.10.10 with username JonDoe.
async with testers.L23Tester("10.10.10.10", "xoa") as tester:
Access module index 0 on the tester. The method obtain()
is for accessing a test resource that cannot be deleted, such as a module or a port. You can read more about this method in Module Manager and Port Manager.
# Access module index 0 on the tester
my_module = tester.modules.obtain(0)
You need to check the type of the test module afterwards, so the driver can allow you to access the methods and attributes of module.
if isinstance(my_module, modules.ModuleChimera):
return None # commands which used in this example are not supported by Chimera Module
After that, the driver knows you are using the desired module, and then you can access ports on the module. Let’s use two ports, one as TX, the other RX.
# Get the port 0 on module 0 as TX port
my_tx_port = my_module.ports.obtain(0)
# Get the port 1 on module 0 as RX port
my_rx_port = my_module.ports.obtain(1)
# Reserve the TX port and reset it.
await mgmt.reserve_port(my_tx_port)
await mgmt.reset_port(my_tx_port)
# Reserve the RX port and reset it.
await mgmt.reserve_port(my_rx_port)
await mgmt.reset_port(my_rx_port)
Now we have two ports ready to configure. Let’s start creating a stream on the TX port.
# Create a stream on the TX port
my_stream = await my_tx_port.streams.create()
my_tpld_value = 0
# Prepare stream header protocol
header_protocol = [enums.ProtocolOption.ETHERNET, enums.ProtocolOption.IP]
# Simple batch configure the stream on the TX port
await utils.apply(
my_stream.tpld_id.set(my_tpld_value), # Create the TPLD index of stream
my_stream.packet.length.set(length_type=enums.LengthType.FIXED, min_val=1000, max_val=1000), # Configure the packet size to fixed 1000 bytes
my_stream.packet.header.protocol.set(header_protocol), # Configure the packet type
my_stream.enable.set_on(), # Enable streams
my_stream.rate.fraction.set(1000000) # Configure the stream rate 100% (1,000,000 ppm)
)
The await utils.apply()
lets us group several commands bound for the same port into a larger “command”. This is called Sequential Grouping.
Then, we want to clear the statistics counters of both TX and RX ports. We can use Parallel Grouping to group commands bound for different ports into a larger “command”.
# Batch clear statistics on TX and RX ports
await asyncio.gather(
my_tx_port.statistics.tx.clear.set(),
my_tx_port.statistics.rx.clear.set(),
my_rx_port.statistics.tx.clear.set(),
my_rx_port.statistics.rx.clear.set()
)
Now, let’s start the traffic on the TX port for roughly 10 seconds and stop. It is “roughly” because we use sleep()
to control the duration. It may feel accurate to you but for a Valkyrie tester that can generate 800Gbps traffic with time measurement to nanosecond range, sleep()
is far from accurate in terms of time controlling. If your test requires high-accuracy time control, don’t use software to control time. Instead, limit the port’s TX time so that you can have down to microsecond-range traffic duration.
# Start traffic on the TX port
await my_tx_port.traffic.state.set_start()
# Test duration 10 seconds
await asyncio.sleep(10)
# Stop traffic on the TX port
await my_tx_port.traffic.state.set_stop()
After the traffic is stopped, we query statistic counters. You can also query counter as the traffic is running to get live statistics.
# Wait 2 seconds for the counters to finish
await asyncio.sleep(2)
# Query TX statistics
tx_total, tx_stream = await utils.apply(
my_tx_port.statistics.tx.total.get(),
# let the resource manager tell you the stream index so you don't have to remember it
my_tx_port.statistics.tx.obtain_from_stream(my_stream).get()
)
print(f"Total TX byte count since cleared: {tx_total.byte_count_since_cleared}")
print(f"Total TX packet count since cleared: {tx_total.packet_count_since_cleared}")
print(f"Stream 0 TX byte count since cleared: {tx_stream.byte_count_since_cleared}")
print(f"Stream 0 TX packet count since cleared: {tx_stream.packet_count_since_cleared}")
# if you have forgot what TPLD ID assigned to a stream, you can query it
tpld_obj = await my_stream.tpld_id.get()
# then access the RX stat object
rx_stats_obj = my_rx_port.statistics.rx.access_tpld(tpld_obj.test_payload_identifier)
# then query each stats of a TPLD ID
rx_total, rx_traffic, rx_latency, rx_jitter, rx_error = await utils.apply(
my_rx_port.statistics.rx.total.get(),
rx_stats_obj.traffic.get(),
rx_stats_obj.latency.get(),
rx_stats_obj.jitter.get(),
rx_stats_obj.errors.get()
)
print(f"Total RX byte count since cleared: {rx_total.byte_count_since_cleared}")
print(f"Total RX packet count since cleared: {rx_total.packet_count_since_cleared}")
print(f"Stream 0 RX byte count since cleared: {rx_traffic.byte_count_since_cleared}")
print(f"Stream 0 RX packet count since cleared: {rx_traffic.packet_count_since_cleared}")
print(f"Stream 0 RX min latency: {rx_latency.min_val}")
print(f"Stream 0 RX max latency: {rx_latency.max_val}")
print(f"Stream 0 RX avg latency: {rx_latency.avg_val}")
print(f"Stream 0 RX min jitter: {rx_jitter.min_val}")
print(f"Stream 0 RX max jitter: {rx_jitter.max_val}")
print(f"Stream 0 RX avg jitter: {rx_jitter.avg_val}")
print(f"Stream 0 RX number of non-incrementing-sequence-number events: {rx_error.non_incre_seq_event_count}")
print(f"Stream 0 RX number of swapped-sequence-number misorder events: {rx_error.swapped_seq_misorder_event_count}")
print(f"Stream 0 RX number of packets with non-incrementing payload content: {rx_error.non_incre_payload_packet_count}")
At last, release the ports (It is absolutely OK if you don’t release them.)
# Release the ports
await asyncio.gather(
my_tx_port.reservation.set_release(),
my_rx_port.reservation.set_release()
)
The entire example is here.
import asyncio
from xoa_driver import testers
from xoa_driver import modules
from xoa_driver import ports
from xoa_driver import enums
from xoa_driver import utils
from xoa_driver.hlfuncs import mgmt
async def my_awesome_func():
# Establish connection to a Valkyrie tester 10.10.10.10 with username JonDoe.
async with testers.L23Tester("10.10.10.10", "xoa") as tester:
# Access module index 0 on the tester
my_module = tester.modules.obtain(0)
if isinstance(my_module, modules.ModuleChimera):
return None # commands which used in this example are not supported by Chimera Module
# Get the port 0 on module 0 as TX port
my_tx_port = my_module.ports.obtain(0)
# Get the port 1 on module 0 as RX port
my_rx_port = my_module.ports.obtain(1)
# Reserve the TX port and reset it.
await mgmt.reserve_port(my_tx_port)
await mgmt.reset_port(my_tx_port)
# Reserve the RX port and reset it.
await mgmt.reserve_port(my_rx_port)
await mgmt.reset_port(my_rx_port)
# Create a stream on the TX port
my_stream = await my_tx_port.streams.create()
my_tpld_value = 0
# Prepare stream header protocol
header_protocol = [enums.ProtocolOption.ETHERNET, enums.ProtocolOption.IP]
# Simple batch configure the stream on the TX port
await utils.apply(
my_stream.tpld_id.set(my_tpld_value), # Create the TPLD index of stream
my_stream.packet.length.set(length_type=enums.LengthType.FIXED, min_val=1000, max_val=1000), # Configure the packet size to fixed 1000 bytes
my_stream.packet.header.protocol.set(header_protocol), # Configure the packet type
my_stream.enable.set_on(), # Enable streams
my_stream.rate.fraction.set(1000000) # Configure the stream rate 100% (1,000,000 ppm)
)
# Batch clear statistics on TX and RX ports
await asyncio.gather(
my_tx_port.statistics.tx.clear.set(),
my_tx_port.statistics.rx.clear.set(),
my_rx_port.statistics.tx.clear.set(),
my_rx_port.statistics.rx.clear.set()
)
# Start traffic on the TX port
await my_tx_port.traffic.state.set_start()
# Test duration 10 seconds
await asyncio.sleep(10)
# Stop traffic on the TX port
await my_tx_port.traffic.state.set_stop()
# Wait 2 seconds for the counters to finish
await asyncio.sleep(2)
# Query TX statistics
tx_total, tx_stream = await utils.apply(
my_tx_port.statistics.tx.total.get(),
# let the resource manager tell you the stream index so you don't have to remember it
my_tx_port.statistics.tx.obtain_from_stream(my_stream).get()
)
print(f"Total TX byte count since cleared: {tx_total.byte_count_since_cleared}")
print(f"Total TX packet count since cleared: {tx_total.packet_count_since_cleared}")
print(f"Stream 0 TX byte count since cleared: {tx_stream.byte_count_since_cleared}")
print(f"Stream 0 TX packet count since cleared: {tx_stream.packet_count_since_cleared}")
# if you have forgot what TPLD ID assigned to a stream, you can query it
tpld_obj = await my_stream.tpld_id.get()
# then access the RX stat object
rx_stats_obj = my_rx_port.statistics.rx.access_tpld(tpld_obj.test_payload_identifier)
# then query each stats of a TPLD ID
rx_total, rx_traffic, rx_latency, rx_jitter, rx_error = await utils.apply(
my_rx_port.statistics.rx.total.get(),
rx_stats_obj.traffic.get(),
rx_stats_obj.latency.get(),
rx_stats_obj.jitter.get(),
rx_stats_obj.errors.get()
)
print(f"Total RX byte count since cleared: {rx_total.byte_count_since_cleared}")
print(f"Total RX packet count since cleared: {rx_total.packet_count_since_cleared}")
print(f"Stream 0 RX byte count since cleared: {rx_traffic.byte_count_since_cleared}")
print(f"Stream 0 RX packet count since cleared: {rx_traffic.packet_count_since_cleared}")
print(f"Stream 0 RX min latency: {rx_latency.min_val}")
print(f"Stream 0 RX max latency: {rx_latency.max_val}")
print(f"Stream 0 RX avg latency: {rx_latency.avg_val}")
print(f"Stream 0 RX min jitter: {rx_jitter.min_val}")
print(f"Stream 0 RX max jitter: {rx_jitter.max_val}")
print(f"Stream 0 RX avg jitter: {rx_jitter.avg_val}")
print(f"Stream 0 RX number of non-incrementing-sequence-number events: {rx_error.non_incre_seq_event_count}")
print(f"Stream 0 RX number of swapped-sequence-number misorder events: {rx_error.swapped_seq_misorder_event_count}")
print(f"Stream 0 RX number of packets with non-incrementing payload content: {rx_error.non_incre_payload_packet_count}")
# Release the ports
await asyncio.gather(
my_tx_port.reservation.set_release(),
my_rx_port.reservation.set_release()
)
def main():
try:
loop = asyncio.get_event_loop()
loop.create_task(my_awesome_func())
loop.run_forever()
except KeyboardInterrupt:
pass
if __name__ == "__main__":
main()
2.2.2. Integrate with CLI and ValkyrieManager
The simple code example demonstrates how to use XOA Python API :
Establish connection to a Valkyrie tester.
Reserve a port.
Port configuration from .xpc file
Port configuration from CLI commands
Module configuration from file
Module configuration from CLI commands
Chassis configuration from file
Chassis configuration from CLI commands
We will first walk you through step-by-step covering the topics above. At the end, you will see the whole example. If you want to try it out, you can simply copy and paste it into your environment and run. Remember to change the IP address to your tester’s.
This is boilerplate.
import asyncio
from xoa_driver import testers
from xoa_driver import modules
from xoa_driver import ports
from xoa_driver import enums
from xoa_driver import utils
from xoa_driver.hlfuncs import mgmt, cli
async def my_awesome_func(stop_event: asyncio.Event):
async def main():
stop_event = asyncio.Event()
try:
await my_awesome_func(stop_event)
except KeyboardInterrupt:
stop_event.set()
if __name__ == "__main__":
asyncio.run(main())
To establish a connection to a tester is simple.
# create tester instance and establish connection
async with testers.L23Tester("10.10.10.10", "xoa") as tester:
Access module index 0 on the tester. The method obtain()
is for accessing a test resource that cannot be deleted, such as a module or a port. You can read more about this method in Module Manager and Port Manager.
# access module 0 on the tester
module = tester.modules.obtain(0)
You need to check the type of the test module afterwards, so the driver can allow you to access the methods and attributes of module.
if isinstance(module, modules.ModuleChimera):
return None
After that, the driver knows you are using the desired module, and then you can access ports on the module.
You can use Save Port Configuration in ValkyrieManager to download port configuration files, which contain CLI commands inside. To “upload” the port configuration file generated by ValkyrieManager, simply do:
# access port 0 on the module
port = module.ports.obtain(0)
# reserve the port
await mgmt.reserve_port(port=port)
# configure port with .xpc file generated by ValkyrieManager
await cli.port_config_from_file(port=port, path="port_config.xpc")
In addition to set port configuration from an xpc file, you can also send CLI commands using XOA Python API.
# configure port with CLI commands
await cli.port_config_from_string(
port=port,
long_str="""
P_RESET
P_COMMENT \"this is a comment\"
P_MACADDRESS 0xAAAAAABBBB99
P_IPADDRESS 1.1.1.1 0.0.0.0 0.0.0.0 0.0.0.0
""")
You can set module or chassis configuration in the same way, either from a file or from command strings.
# reserve the module
await mgmt.reserve_module(module=module)
# configure module by file
await cli.module_config_from_file(module=module, path="module_config.txt")
# configure module with CLI commands
await cli.module_config_from_string(
module=module,
long_str="""
M_COMMENT \"this is a comment\"
M_MEDIA QSFP_DD_NRZ
M_CFPCONFIGEXT 2 100000 100000
""")
# reserve the tester
await mgmt.reserve_tester(tester=tester)
# configure module by file
await cli.tester_config_from_file(tester=tester, path="tester_config.txt")
# configure module with CLI commands
await cli.tester_config_from_string(
tester=tester,
long_str="""
C_COMMENT \"this is a comment\"
C_NAME \"this is a name\"
""")
The entire example is here.
import asyncio
from xoa_driver import testers
from xoa_driver import modules
from xoa_driver import ports
from xoa_driver import enums
from xoa_driver import utils
from xoa_driver.hlfuncs import mgmt, cli
async def my_awesome_func(stop_event: asyncio.Event):
# create tester instance and establish connection
async with testers.L23Tester("10.10.10.10", "xoa") as tester:
# access module 0 on the tester
module = tester.modules.obtain(0)
if isinstance(module, modules.ModuleChimera):
return None
# access port 0 on the module
port = module.ports.obtain(0)
# reserve the port
await mgmt.reserve_port(port=port)
# configure port with .xpc file generated by ValkyrieManager
await cli.port_config_from_file(port=port, path="port_config.xpc")
# configure port with CLI commands
await cli.port_config_from_string(
port=port,
long_str="""
P_RESET
P_COMMENT \"this is a comment\"
P_MACADDRESS 0xAAAAAABBBB99
P_IPADDRESS 1.1.1.1 0.0.0.0 0.0.0.0 0.0.0.0
""")
# reserve the module
await mgmt.reserve_module(module=module)
# configure module by file
await cli.module_config_from_file(module=module, path="module_config.txt")
# configure module with CLI commands
await cli.module_config_from_string(
module=module,
long_str="""
M_COMMENT \"this is a comment\"
M_MEDIA QSFP_DD_NRZ
M_CFPCONFIGEXT 2 100000 100000
""")
# reserve the tester
await mgmt.reserve_tester(tester=tester)
# configure module by file
await cli.tester_config_from_file(tester=tester, path="tester_config.txt")
# configure module with CLI commands
await cli.tester_config_from_string(
tester=tester,
long_str="""
C_COMMENT \"this is a comment\"
C_NAME \"this is a name\"
""")
async def main():
stop_event = asyncio.Event()
try:
await my_awesome_func(stop_event)
except KeyboardInterrupt:
stop_event.set()
if __name__ == "__main__":
asyncio.run(main())