Quick Start#

The XOA Python API offers more than just object-oriented APIs and functions for executing test scripts. It also provides a seamless integration with CLI commands and port configuration files from ValkyrieManager. , enabling you to effortlessly work with them.

Note

Integration with CLI commands and ValkyrieManager is supported by version >= 2.1.1.

Scripting with XOA Python API#

The simple code example demonstrates some basics of using HL-API and :term: HL-FUNC:

  • Establish connection to a Valkyrie tester.

  • Reserve a port.

  • Create a stream on the port.

  • Configure the stream.

  • Start traffic.

  • Collect statistics.

  • Stop traffic

We will first walk you through step-by-step covering the topics above. At the end, you will see the whole example. If you want to try it out, you can simply copy and paste it into your environment and run. Remember to change the IP address to your tester’s.

This is boilerplate.

import asyncio

from xoa_driver import testers
from xoa_driver import modules
from xoa_driver import ports
from xoa_driver import enums
from xoa_driver import utils
from xoa_driver.hlfuncs import mgmt

async def my_awesome_func():
def main():
    try:
        loop = asyncio.get_event_loop()
        loop.create_task(my_awesome_func())
        loop.run_forever()
    except KeyboardInterrupt:
        pass

if __name__ == "__main__":
    main()

To establish a connection to a tester is simple.

    # Establish connection to a Valkyrie tester 10.10.10.10 with username JonDoe.
    async with testers.L23Tester("10.10.10.10", "xoa") as tester:

Access module index 0 on the tester. The method obtain() is for accessing a test resource that cannot be deleted, such as a module or a port. You can read more about this method in Module Manager and Port Manager.

        # Access module index 0 on the tester
        my_module = tester.modules.obtain(0)

You need to check the type of the test module afterwards, so the driver can allow you to access the methods and attributes of module.

        if isinstance(my_module, modules.ModuleChimera):
            return None # commands which used in this example are not supported by Chimera Module

After that, the driver knows you are using the desired module, and then you can access ports on the module. Let’s use two ports, one as TX, the other RX.

        # Get the port 0 on module 0 as TX port
        my_tx_port = my_module.ports.obtain(0)
        # Get the port 1 on module 0 as RX port
        my_rx_port = my_module.ports.obtain(1)

        # Reserve the TX port and reset it.
        await mgmt.reserve_port(my_tx_port)
        await mgmt.reset_port(my_tx_port)

        # Reserve the RX port and reset it.
        await mgmt.reserve_port(my_rx_port)
        await mgmt.reset_port(my_rx_port)

Now we have two ports ready to configure. Let’s start creating a stream on the TX port.

        # Create a stream on the TX port
        my_stream = await my_tx_port.streams.create()
        my_tpld_value = 0

        # Prepare stream header protocol
        header_protocol = [enums.ProtocolOption.ETHERNET, enums.ProtocolOption.IP]

        # Simple batch configure the stream on the TX port
        await utils.apply(
            my_stream.tpld_id.set(my_tpld_value), # Create the TPLD index of stream
            my_stream.packet.length.set(length_type=enums.LengthType.FIXED, min_val=1000, max_val=1000), # Configure the packet size to fixed 1000 bytes
            my_stream.packet.header.protocol.set(header_protocol), # Configure the packet type
            my_stream.enable.set_on(), # Enable streams
            my_stream.rate.fraction.set(1000000) # Configure the stream rate 100% (1,000,000 ppm)
        )

The await utils.apply() lets us group several commands bound for the same port into a larger “command”. This is called Sequential Grouping.

Then, we want to clear the statistics counters of both TX and RX ports. We can use Parallel Grouping to group commands bound for different ports into a larger “command”.

        # Batch clear statistics on TX and RX ports
        await asyncio.gather(
            my_tx_port.statistics.tx.clear.set(),
            my_tx_port.statistics.rx.clear.set(),
            my_rx_port.statistics.tx.clear.set(),
            my_rx_port.statistics.rx.clear.set()
        )

Now, let’s start the traffic on the TX port for roughly 10 seconds and stop. It is “roughly” because we use sleep() to control the duration. It may feel accurate to you but for a Valkyrie tester that can generate 800Gbps traffic with time measurement to nanosecond range, sleep() is far from accurate in terms of time controlling. If your test requires high-accuracy time control, don’t use software to control time. Instead, limit the port’s TX time so that you can have down to microsecond-range traffic duration.

        # Start traffic on the TX port
        await my_tx_port.traffic.state.set_start()

        # Test duration 10 seconds
        await asyncio.sleep(10)

        # Stop traffic on the TX port
        await my_tx_port.traffic.state.set_stop()

After the traffic is stopped, we query statistic counters. You can also query counter as the traffic is running to get live statistics.

        # Wait 2 seconds for the counters to finish
        await asyncio.sleep(2)

        # Query TX statistics
        tx_total, tx_stream = await utils.apply(
            my_tx_port.statistics.tx.total.get(),

            # let the resource manager tell you the stream index so you don't have to remember it
            my_tx_port.statistics.tx.obtain_from_stream(my_stream).get()
        )
        print(f"Total TX byte count since cleared: {tx_total.byte_count_since_cleared}")
        print(f"Total TX packet count since cleared: {tx_total.packet_count_since_cleared}")
        print(f"Stream 0 TX byte count since cleared: {tx_stream.byte_count_since_cleared}")
        print(f"Stream 0 TX packet count since cleared: {tx_stream.packet_count_since_cleared}")

        # if you have forgot what TPLD ID assigned to a stream, you can query it 
        tpld_obj = await my_stream.tpld_id.get()
        # then access the RX stat object
        rx_stats_obj = my_rx_port.statistics.rx.access_tpld(tpld_obj.test_payload_identifier)
        # then query each stats of a TPLD ID
        rx_total, rx_traffic, rx_latency, rx_jitter, rx_error = await utils.apply(
            my_rx_port.statistics.rx.total.get(),
            rx_stats_obj.traffic.get(),
            rx_stats_obj.latency.get(),
            rx_stats_obj.jitter.get(),
            rx_stats_obj.errors.get()
        )

        print(f"Total RX byte count since cleared: {rx_total.byte_count_since_cleared}")
        print(f"Total RX packet count since cleared: {rx_total.packet_count_since_cleared}")
        print(f"Stream 0 RX byte count since cleared: {rx_traffic.byte_count_since_cleared}")
        print(f"Stream 0 RX packet count since cleared: {rx_traffic.packet_count_since_cleared}")
        print(f"Stream 0 RX min latency: {rx_latency.min_val}")
        print(f"Stream 0 RX max latency: {rx_latency.max_val}")
        print(f"Stream 0 RX avg latency: {rx_latency.avg_val}")
        print(f"Stream 0 RX min jitter: {rx_jitter.min_val}")
        print(f"Stream 0 RX max jitter: {rx_jitter.max_val}")
        print(f"Stream 0 RX avg jitter: {rx_jitter.avg_val}")
        print(f"Stream 0 RX number of non-incrementing-sequence-number events: {rx_error.non_incre_seq_event_count}")
        print(f"Stream 0 RX number of swapped-sequence-number misorder events: {rx_error.swapped_seq_misorder_event_count}")
        print(f"Stream 0 RX number of packets with non-incrementing payload content: {rx_error.non_incre_payload_packet_count}")

At last, release the ports (It is absolutely OK if you don’t release them.)

        # Release the ports
        await asyncio.gather(
            my_tx_port.reservation.set_release(),
            my_rx_port.reservation.set_release()
        )

The entire example is here.

Quick start for some basic.#
import asyncio

from xoa_driver import testers
from xoa_driver import modules
from xoa_driver import ports
from xoa_driver import enums
from xoa_driver import utils
from xoa_driver.hlfuncs import mgmt

async def my_awesome_func():
    
    # Establish connection to a Valkyrie tester 10.10.10.10 with username JonDoe.
    async with testers.L23Tester("10.10.10.10", "xoa") as tester:
        
        # Access module index 0 on the tester
        my_module = tester.modules.obtain(0)

        if isinstance(my_module, modules.ModuleChimera):
            return None # commands which used in this example are not supported by Chimera Module

        # Get the port 0 on module 0 as TX port
        my_tx_port = my_module.ports.obtain(0)
        # Get the port 1 on module 0 as RX port
        my_rx_port = my_module.ports.obtain(1)

        # Reserve the TX port and reset it.
        await mgmt.reserve_port(my_tx_port)
        await mgmt.reset_port(my_tx_port)

        # Reserve the RX port and reset it.
        await mgmt.reserve_port(my_rx_port)
        await mgmt.reset_port(my_rx_port)

        # Create a stream on the TX port
        my_stream = await my_tx_port.streams.create()
        my_tpld_value = 0

        # Prepare stream header protocol
        header_protocol = [enums.ProtocolOption.ETHERNET, enums.ProtocolOption.IP]

        # Simple batch configure the stream on the TX port
        await utils.apply(
            my_stream.tpld_id.set(my_tpld_value), # Create the TPLD index of stream
            my_stream.packet.length.set(length_type=enums.LengthType.FIXED, min_val=1000, max_val=1000), # Configure the packet size to fixed 1000 bytes
            my_stream.packet.header.protocol.set(header_protocol), # Configure the packet type
            my_stream.enable.set_on(), # Enable streams
            my_stream.rate.fraction.set(1000000) # Configure the stream rate 100% (1,000,000 ppm)
        )

        # Batch clear statistics on TX and RX ports
        await asyncio.gather(
            my_tx_port.statistics.tx.clear.set(),
            my_tx_port.statistics.rx.clear.set(),
            my_rx_port.statistics.tx.clear.set(),
            my_rx_port.statistics.rx.clear.set()
        )

        # Start traffic on the TX port
        await my_tx_port.traffic.state.set_start()

        # Test duration 10 seconds
        await asyncio.sleep(10)

        # Stop traffic on the TX port
        await my_tx_port.traffic.state.set_stop()

        # Wait 2 seconds for the counters to finish
        await asyncio.sleep(2)

        # Query TX statistics
        tx_total, tx_stream = await utils.apply(
            my_tx_port.statistics.tx.total.get(),

            # let the resource manager tell you the stream index so you don't have to remember it
            my_tx_port.statistics.tx.obtain_from_stream(my_stream).get()
        )
        print(f"Total TX byte count since cleared: {tx_total.byte_count_since_cleared}")
        print(f"Total TX packet count since cleared: {tx_total.packet_count_since_cleared}")
        print(f"Stream 0 TX byte count since cleared: {tx_stream.byte_count_since_cleared}")
        print(f"Stream 0 TX packet count since cleared: {tx_stream.packet_count_since_cleared}")

        # if you have forgot what TPLD ID assigned to a stream, you can query it 
        tpld_obj = await my_stream.tpld_id.get()
        # then access the RX stat object
        rx_stats_obj = my_rx_port.statistics.rx.access_tpld(tpld_obj.test_payload_identifier)
        # then query each stats of a TPLD ID
        rx_total, rx_traffic, rx_latency, rx_jitter, rx_error = await utils.apply(
            my_rx_port.statistics.rx.total.get(),
            rx_stats_obj.traffic.get(),
            rx_stats_obj.latency.get(),
            rx_stats_obj.jitter.get(),
            rx_stats_obj.errors.get()
        )

        print(f"Total RX byte count since cleared: {rx_total.byte_count_since_cleared}")
        print(f"Total RX packet count since cleared: {rx_total.packet_count_since_cleared}")
        print(f"Stream 0 RX byte count since cleared: {rx_traffic.byte_count_since_cleared}")
        print(f"Stream 0 RX packet count since cleared: {rx_traffic.packet_count_since_cleared}")
        print(f"Stream 0 RX min latency: {rx_latency.min_val}")
        print(f"Stream 0 RX max latency: {rx_latency.max_val}")
        print(f"Stream 0 RX avg latency: {rx_latency.avg_val}")
        print(f"Stream 0 RX min jitter: {rx_jitter.min_val}")
        print(f"Stream 0 RX max jitter: {rx_jitter.max_val}")
        print(f"Stream 0 RX avg jitter: {rx_jitter.avg_val}")
        print(f"Stream 0 RX number of non-incrementing-sequence-number events: {rx_error.non_incre_seq_event_count}")
        print(f"Stream 0 RX number of swapped-sequence-number misorder events: {rx_error.swapped_seq_misorder_event_count}")
        print(f"Stream 0 RX number of packets with non-incrementing payload content: {rx_error.non_incre_payload_packet_count}")

        # Release the ports
        await asyncio.gather(
            my_tx_port.reservation.set_release(),
            my_rx_port.reservation.set_release()
        )

def main():
    try:
        loop = asyncio.get_event_loop()
        loop.create_task(my_awesome_func())
        loop.run_forever()
    except KeyboardInterrupt:
        pass

if __name__ == "__main__":
    main()

Integrate with CLI and ValkyrieManager#

The simple code example demonstrates how to use XOA Python API :

  • Establish connection to a Valkyrie tester.

  • Reserve a port.

  • Port configuration from .xpc file

  • Port configuration from CLI commands

  • Module configuration from file

  • Module configuration from CLI commands

  • Chassis configuration from file

  • Chassis configuration from CLI commands

We will first walk you through step-by-step covering the topics above. At the end, you will see the whole example. If you want to try it out, you can simply copy and paste it into your environment and run. Remember to change the IP address to your tester’s.

This is boilerplate.

import asyncio

from xoa_driver import testers
from xoa_driver import modules
from xoa_driver import ports
from xoa_driver import enums
from xoa_driver import utils
from xoa_driver.hlfuncs import mgmt, cli

async def my_awesome_func(stop_event: asyncio.Event):
async def main():
    stop_event = asyncio.Event()
    try:
        await my_awesome_func(stop_event)
    except KeyboardInterrupt:
        stop_event.set()


if __name__ == "__main__":
    asyncio.run(main())

To establish a connection to a tester is simple.

    # create tester instance and establish connection
    async with testers.L23Tester("10.10.10.10", "xoa") as tester:

Access module index 0 on the tester. The method obtain() is for accessing a test resource that cannot be deleted, such as a module or a port. You can read more about this method in Module Manager and Port Manager.

        # access module 0 on the tester
        module = tester.modules.obtain(0)

You need to check the type of the test module afterwards, so the driver can allow you to access the methods and attributes of module.

        if isinstance(module, modules.ModuleChimera):
            return None

After that, the driver knows you are using the desired module, and then you can access ports on the module.

You can use Save Port Configuration in ValkyrieManager to download port configuration files, which contain CLI commands inside. To “upload” the port configuration file generated by ValkyrieManager, simply do:

        # access port 0 on the module
        port = module.ports.obtain(0) 

        # reserve the port
        await mgmt.reserve_port(port=port)

        # configure port with .xpc file generated by ValkyrieManager
        await cli.port_config_from_file(port=port, path="port_config.xpc")

In addition to set port configuration from an xpc file, you can also send CLI commands using XOA Python API.

        # configure port with CLI commands
        await cli.port_config_from_string(
            port=port,
            long_str="""
            P_RESET
            P_COMMENT \"this is a comment\"
            P_MACADDRESS  0xAAAAAABBBB99
            P_IPADDRESS  1.1.1.1 0.0.0.0 0.0.0.0 0.0.0.0
            """)

You can set module or chassis configuration in the same way, either from a file or from command strings.

        # reserve the module
        await mgmt.reserve_module(module=module)

        # configure module by file
        await cli.module_config_from_file(module=module, path="module_config.txt")

        # configure module with CLI commands
        await cli.module_config_from_string(
            module=module,
            long_str="""
            M_COMMENT \"this is a comment\"
            M_MEDIA  QSFP_DD_NRZ
            M_CFPCONFIGEXT  2 100000 100000
            """)
        
        # reserve the tester
        await mgmt.reserve_tester(tester=tester)

        # configure module by file
        await cli.tester_config_from_file(tester=tester, path="tester_config.txt")

        # configure module with CLI commands
        await cli.tester_config_from_string(
            tester=tester,
            long_str="""
            C_COMMENT \"this is a comment\"
            C_NAME \"this is a name\"
            """)

The entire example is here.

Configuration By CLI#
import asyncio

from xoa_driver import testers
from xoa_driver import modules
from xoa_driver import ports
from xoa_driver import enums
from xoa_driver import utils
from xoa_driver.hlfuncs import mgmt, cli

async def my_awesome_func(stop_event: asyncio.Event):
    # create tester instance and establish connection
    async with testers.L23Tester("10.10.10.10", "xoa") as tester:

        # access module 0 on the tester
        module = tester.modules.obtain(0)
        if isinstance(module, modules.ModuleChimera):
            return None
        
        # access port 0 on the module
        port = module.ports.obtain(0) 

        # reserve the port
        await mgmt.reserve_port(port=port)

        # configure port with .xpc file generated by ValkyrieManager
        await cli.port_config_from_file(port=port, path="port_config.xpc")

        # configure port with CLI commands
        await cli.port_config_from_string(
            port=port,
            long_str="""
            P_RESET
            P_COMMENT \"this is a comment\"
            P_MACADDRESS  0xAAAAAABBBB99
            P_IPADDRESS  1.1.1.1 0.0.0.0 0.0.0.0 0.0.0.0
            """)
        
        # reserve the module
        await mgmt.reserve_module(module=module)

        # configure module by file
        await cli.module_config_from_file(module=module, path="module_config.txt")

        # configure module with CLI commands
        await cli.module_config_from_string(
            module=module,
            long_str="""
            M_COMMENT \"this is a comment\"
            M_MEDIA  QSFP_DD_NRZ
            M_CFPCONFIGEXT  2 100000 100000
            """)
        
        # reserve the tester
        await mgmt.reserve_tester(tester=tester)

        # configure module by file
        await cli.tester_config_from_file(tester=tester, path="tester_config.txt")

        # configure module with CLI commands
        await cli.tester_config_from_string(
            tester=tester,
            long_str="""
            C_COMMENT \"this is a comment\"
            C_NAME \"this is a name\"
            """)

async def main():
    stop_event = asyncio.Event()
    try:
        await my_awesome_func(stop_event)
    except KeyboardInterrupt:
        stop_event.set()


if __name__ == "__main__":
    asyncio.run(main())