Data IO System

XOA Core allows users to subscribe to different messages generated by different test suites and subsystems (ResourcesManager, ExecutorManager), so you can get test statistics/results, monitor test progress, receiver errors and warnings, etc.

Message Subscription

XOA Core provide two types of messages for you to subscribe to:

  • Subsystem Type

  • Test Execution Type

Subsystem Type Message

Subscribe to subsystem-type messages to receive messages exchanged between your code and the testers.

Subscribe to the messages from tester resource management subsystem.
async for msg in my_controller.listen_changes(<subsystem-type>}:
    # do whatever you want to the message

There are two types of subsystems that you can subscribe to:

Subsystem types
types.PIPE_EXECUTOR
types.PIPE_RESOURCES
  • types.PIPE_EXECUTOR messages tells information about the test executor.

  • types.PIPE_RESOURCES messages tells information about the test resources, such as port reservation status, link sync status, tester connection status and so on.

Test Execution Type

A test execution can generate different types of messages, such as test statistics, test progress, test state, errors, and warnings. To subscribe to a specific type, you can use the _filter argument.

Subscribe to statistics of test execution
async for msg in my_controller.listen_changes(execution_id, _filter={<filter_type>}):
    # do whatever you want to the message

The _filter argument is an set of filter types. The first parameter of _filter argument is a mandatory identifier of the subsystem or the test suite execution. Available filters types are as shown below:

Available filters types
class EMsgType(Enum):
    STATE = "STATE"
    DATA = "DATA"
    STATISTICS = "STATISTICS"
    PROGRESS = "PROGRESS"
    WARNING = "WARNING"
    ERROR = "ERROR"

Note

_filter argument is optional. If it is not provided, all message types will be returned from this test suite execution.

The example below demonstrates how to subscribe to test resource notifications and test results.

from __future__ import annotations
from xoa_core import (
    controller,
    types,
)
import asyncio
import json
from pathlib import Path

PROJECT_PATH = Path(__file__).parent
XOA_CONFIG = PROJECT_PATH / "xoa_2544_config.json"
PLUGINS_PATH = PROJECT_PATH / "test_suites"


async def subscribe(ctrl: "controller.MainController", channel_name: str, fltr: set["types.EMsgType"] | None = None) -> None:
    async for msg in ctrl.listen_changes(channel_name, _filter=fltr):
        print(msg)


async def main() -> None:
    # Define your tester login credentials
    my_tester_credential = types.Credentials(
        product=types.EProductType.VALKYRIE,
        host="10.20.30.40"
    )

    # Create a default instance of the controller class.
    ctrl = await controller.MainController()

    # Register the plugins folder.
    ctrl.register_lib(str(PLUGINS_PATH))

    # Add tester credentials into teh controller. If already added, it will be ignored.
    # If you want to add a list of testers, you need to iterate through the list.
    await ctrl.add_tester(my_tester_credential)

    # Subscribe to test resource notifications.
    asyncio.create_task(subscribe(ctrl, channel_name=types.PIPE_RESOURCES))

    # Load your XOA 2544 config and run.
    with open(XOA_CONFIG, "r") as f:

        # Get rfc2544 test suite information from the core's registration
        info = ctrl.get_test_suite_info("RFC-2544")
        if not info:
            print("Test suite RFC-2544 is not recognized.")
            return None

        # Test suite name: "RFC-2544" is received from call of c.get_available_test_suites()
        test_exec_id = ctrl.start_test_suite("RFC-2544", json.load(f))

        # The example here only shows a print of test result data.
        asyncio.create_task(
            subscribe(ctrl, channel_name=test_exec_id, fltr={types.EMsgType.STATISTICS})
        )

    # By the next line, we prevent the script from being immediately
    # terminated as the test execution and subscription are non blockable, and they ran asynchronously,
    await asyncio.Event().wait()


if __name__ == "__main__":
    asyncio.run(main())