Quick Start
Scripting with XOA Chimera Core
The code example demonstrates how to use XOA Chimera Core to automate what you can do with ValkyrieManager (GUI).
Connect to a Valkyrie chassis, and select a port of a Chimera module
Configure Chimera port
Configure flow’s basic filter
Configure flow’s extended filter
Configure impairment - Drop
Configure impairment - Misordering
Configure impairment - Latency & Jitter
Configure impairment - Duplication
Configure impairment - Corruption
Configure bandwidth control - Policer
Configure bandwidth control - Shaper
Flow statistics
Port statistics
import asyncio
from ipaddress import IPv4Address, IPv6Address
from chimera_core.controller import MainController
from chimera_core.types import distributions, enums, dataset
CHASSIS_IP = "87.61.110.118"
USERNAME = "chimera-core"
MODULE_IDX = 2
PORT_IDX = 0
FLOW_IDX = 1
async def my_awesome_func(stop_event: asyncio.Event) -> None:
# create credential object
credentials = dataset.Credentials(
product=dataset.EProductType.VALKYRIE,
host=CHASSIS_IP)
#----------------------------------------------
# 1. Connect to Valkyrie chassis and select port
# ---------------------------------------------
# region Connect to Valkyrie chassis and select port
# create chimera core controller object
controller = await MainController()
# add chimera emulator into the chassis inventory and get the ID
tester_id = await controller.add_tester(credentials=credentials)
# create tester object
tester = await controller.use_tester(tester_id, username=USERNAME, reserve=False, debug=False)
# create module object
module = await tester.use_module(module_id=MODULE_IDX, reserve=False)
# free the module in case it is reserved by others
await module.free(should_free_sub_resources=True)
await module.reserve()
# create port object and reserver the port
port = await tester.use_port(module_id=MODULE_IDX, port_id=PORT_IDX, reserve=False)
# reserve the port
await port.reserve()
# reset port
await port.reset()
# endregion
#----------------------------------------------
# 2. Configure Chimera port
# ---------------------------------------------
# region Configure Chimera port
port_config = await port.config.get()
port_config.comment = "My Chimera Port"
port_config.set_fcs_error_mode_discard()
port_config.set_fcs_error_mode_pass()
port_config.set_link_flap(enable=enums.OnOff.ON, duration=100, period=1000, repetition=0)
port_config.set_link_flap_off()
port_config.set_pma_error_pulse(enable=enums.OnOff.ON, duration=100, period=1000, repetition=0, coeff=100, exp=-4)
port_config.set_pma_error_pulse_off()
port_config.set_impairment_off()
port_config.set_impairment_on()
port_config.tpld_mode = enums.TPLDMode.NORMAL
port_config.tpld_mode = enums.TPLDMode.MICRO
await port.config.set(port_config)
# endregion
#----------------------------------------------
# 3. Configure flow's basic filter on a port
# ---------------------------------------------
# region Flow configuration + basic filter on a port
# Configure flow properties
flow = port.flows[FLOW_IDX]
flow_config = await flow.get()
flow_config.comment = "Flow description"
await flow.set(config=flow_config)
# Initialize shadow filter on the flow
shadow_filter = flow.shadow_filter
await shadow_filter.init()
await shadow_filter.clear()
# Configure shadow filter to BASIC mode
basic_filter = await shadow_filter.use_basic_mode()
basic_filter_config = await basic_filter.get()
await basic_filter.set(basic_filter_config)
await shadow_filter.enable()
await shadow_filter.apply()
#------------------
# Ethernet subfilter
#------------------
# Use and configure basic-mode shadow filter's Ethernet subfilter
ethernet_subfilter = basic_filter_config.layer_2.use_ethernet()
ethernet_subfilter.exclude()
ethernet_subfilter.include()
ethernet_subfilter.src_addr.on(value=dataset.Hex("AAAAAAAAAAAA"), mask=dataset.Hex("FFFFFFFFFFFF"))
ethernet_subfilter.dest_addr.on(value=dataset.Hex("BBBBBBBBBBBB"), mask=dataset.Hex("FFFFFFFFFFFF"))
#------------------
# Layer 2+ subfilter
#------------------
# Not use basic-mode shadow filter's Layer 2+ subfilter
layer_2_plus_subfilter = basic_filter_config.layer_2_plus.use_none()
# Use and configure basic-mode shadow filter's Layer2+ subfilter (One VLAN tag)
layer_2_plus_subfilter = basic_filter_config.layer_2_plus.use_1_vlan_tag()
layer_2_plus_subfilter.off()
layer_2_plus_subfilter.exclude()
layer_2_plus_subfilter.include()
layer_2_plus_subfilter.tag_inner.on(value=1234, mask=dataset.Hex("FFF"))
layer_2_plus_subfilter.pcp_inner.on(value=3, mask=dataset.Hex("7"))
# Use and configure basic-mode shadow filter's Layer2+ subfilter (Two VLAN tag)
layer_2_plus_subfilter = basic_filter_config.layer_2_plus.use_2_vlan_tags()
layer_2_plus_subfilter.off()
layer_2_plus_subfilter.exclude()
layer_2_plus_subfilter.include()
layer_2_plus_subfilter.tag_inner.on(value=1234, mask=dataset.Hex("FFF"))
layer_2_plus_subfilter.pcp_inner.on(value=3, mask=dataset.Hex("7"))
layer_2_plus_subfilter.tag_outer.on(value=2345, mask=dataset.Hex("FFF"))
layer_2_plus_subfilter.pcp_outer.on(value=0, mask=dataset.Hex("7"))
# Use and configure basic-mode shadow filter's Layer2+ subfilter (MPLS)
layer_2_plus_subfilter = basic_filter_config.layer_2_plus.use_mpls()
layer_2_plus_subfilter.off()
layer_2_plus_subfilter.exclude()
layer_2_plus_subfilter.include()
layer_2_plus_subfilter.label.on(value=1000, mask=dataset.Hex("FFFFF"))
layer_2_plus_subfilter.toc.on(value=0, mask=dataset.Hex("7"))
#------------------
# Layer 3 subfilter
#------------------
# Not use basic-mode shadow filter's Layer 3 subfilter
layer_3_subfilter = basic_filter_config.layer_3.use_none()
# Use and configure basic-mode shadow filter's Layer 3 subfilter (IPv4)
layer_3_subfilter = basic_filter_config.layer_3.use_ipv4()
layer_3_subfilter.off()
layer_3_subfilter.exclude()
layer_3_subfilter.include()
layer_3_subfilter.src_addr.on(value=IPv4Address("10.0.0.2"), mask=dataset.Hex("FFFFFFFF"))
layer_3_subfilter.dest_addr.on(value=IPv4Address("11.0.0.2"), mask=dataset.Hex("FFFFFFFF"))
layer_3_subfilter.dscp.on(value=0, mask=dataset.Hex("FC"))
# Use and configure basic-mode shadow filter's Layer 3 subfilter (IPv6)
layer_3_subfilter = basic_filter_config.layer_3.use_ipv6()
layer_3_subfilter.exclude()
layer_3_subfilter.include()
layer_3_subfilter.src_addr.on(value=IPv6Address("2001::2"), mask=dataset.Hex("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"))
layer_3_subfilter.dest_addr.on(value=IPv6Address("2002::2"), mask=dataset.Hex("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"))
layer_3_subfilter.tc.on(value=0, mask=dataset.Hex("FC"))
#------------------
# Layer 4 subfilter
#------------------
# Not use basic-mode shadow filter's Layer 4 subfilter
layer_4_subfilter = basic_filter_config.layer_4.use_none()
# Use and configure basic-mode shadow filter's Layer 4 subfilter (TCP)
layer_4_subfilter = basic_filter_config.layer_4.use_tcp()
layer_4_subfilter.off()
layer_4_subfilter.exclude()
layer_4_subfilter.include()
layer_4_subfilter.src_port.on(value=1234, mask=dataset.Hex("FFFF"))
layer_4_subfilter.dest_port.on(value=80, mask=dataset.Hex("FFFF"))
# Use and configure basic-mode shadow filter's Layer 4 subfilter (UDP)
layer_4_subfilter = basic_filter_config.layer_4.use_udp()
layer_4_subfilter.off()
layer_4_subfilter.exclude()
layer_4_subfilter.include()
layer_4_subfilter.src_port.on(value=1234, mask=dataset.Hex("FFFF"))
layer_4_subfilter.dest_port.on(value=80, mask=dataset.Hex("FFFF"))
#------------------
# Layer Xena subfilter
#------------------
# Not use basic-mode shadow filter's Layer Xena subfilter
layer_xena_subfilter = basic_filter_config.layer_xena.use_none()
# Use and configure basic-mode shadow filter's TPLD subfilter
layer_xena_subfilter = basic_filter_config.layer_xena.use_tpld()
layer_xena_subfilter.exclude()
layer_xena_subfilter.include()
layer_xena_subfilter[0].on(tpld_id=2)
layer_xena_subfilter[0].off()
layer_xena_subfilter[1].on(tpld_id=4)
layer_xena_subfilter[1].off()
layer_xena_subfilter[2].on(tpld_id=6)
layer_xena_subfilter[2].off()
layer_xena_subfilter[3].on(tpld_id=8)
layer_xena_subfilter[3].off()
layer_xena_subfilter[4].on(tpld_id=10)
layer_xena_subfilter[4].off()
layer_xena_subfilter[5].on(tpld_id=20)
layer_xena_subfilter[5].off()
layer_xena_subfilter[6].on(tpld_id=40)
layer_xena_subfilter[6].off()
layer_xena_subfilter[7].on(tpld_id=60)
layer_xena_subfilter[7].off()
layer_xena_subfilter[8].on(tpld_id=80)
layer_xena_subfilter[8].off()
layer_xena_subfilter[9].on(tpld_id=100)
layer_xena_subfilter[9].off()
layer_xena_subfilter[10].on(tpld_id=102)
layer_xena_subfilter[10].off()
layer_xena_subfilter[11].on(tpld_id=104)
layer_xena_subfilter[11].off()
layer_xena_subfilter[12].on(tpld_id=106)
layer_xena_subfilter[12].off()
layer_xena_subfilter[13].on(tpld_id=108)
layer_xena_subfilter[13].off()
layer_xena_subfilter[14].on(tpld_id=110)
layer_xena_subfilter[14].off()
layer_xena_subfilter[15].on(tpld_id=200)
layer_xena_subfilter[15].off()
#------------------
# Layer Any subfilter
#------------------
# Not use basic-mode shadow filter's Layer Any subfilter
layer_any_subfilter = basic_filter_config.layer_any.use_none()
# Use and configure basic-mode shadow filter's Layer 4 subfilter (TCP)
layer_any_subfilter = basic_filter_config.layer_any.use_any_field()
layer_any_subfilter.off()
layer_any_subfilter.exclude()
layer_any_subfilter.include()
layer_any_subfilter.on(position=0, value=dataset.Hex("112233445566"), mask=dataset.Hex("112233445566"))
# Enable and apply the basic filter settings
await basic_filter.set(basic_filter_config)
await shadow_filter.enable()
await shadow_filter.apply()
# endregion
#----------------------------------------------
# 4. Configure flow's extended filter on a port
# ---------------------------------------------
# region Flow configuration + extended filter on a port
# Configure flow properties
flow = port.flows[FLOW_IDX]
flow_config = await flow.get()
flow_config.comment = "Flow description"
await flow.set(config=flow_config)
# Initialize shadow filter on the flow
shadow_filter = flow.shadow_filter
await shadow_filter.init()
await shadow_filter.clear()
# Configure shadow filter to EXTENDED mode
extended_filter = await shadow_filter.use_extended_mode()
extended_filter_config = await extended_filter.get()
ethernet = dataset.ProtocolSegement(
protocol_type=dataset.ProtocolOption.ETHERNET,
value='00001111',
mask='11110000',
)
ipv4_1 = dataset.ProtocolSegement(
protocol_type=dataset.ProtocolOption.IP,
value='00001111',
mask='11110000',
)
ipv4_2 = dataset.ProtocolSegement(
protocol_type=dataset.ProtocolOption.IP,
value='00001111',
mask='11110000',
)
extended_filter_config.protocol_segments = (ethernet, ipv4_1, ipv4_2)
await extended_filter.set(extended_filter_config)
await shadow_filter.enable()
await shadow_filter.apply()
# endregion
#----------------------------------------------
# 5. Configure impairment - Drop
# ---------------------------------------------
# region Configure impairment - Drop
# Fixed Burst distribution for impairment Drop
dist = distributions.drop.FixedBurst(burst_size=5)
dist.repeat(period=5)
dist.one_shot()
# Random Burst distribution for impairment Drop
dist = distributions.drop.RandomBurst(minimum=1, maximum=10, probability=10_000)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Fixed Rate distribution for impairment Drop
dist = distributions.drop.FixedRate(probability=10_000)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Bit Error Rate distribution for impairment Drop
dist = distributions.drop.BitErrorRate(coefficient=1, exponent=1)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Random Rate distribution for impairment Drop
dist = distributions.drop.RandomRate(probability=10_000)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Gilbert Elliot distribution for impairment Drop
dist = distributions.drop.GilbertElliot(good_state_impair_prob=0, good_state_trans_prob=0, bad_state_impair_prob=0, bad_state_trans_prob=0)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Uniform distribution for impairment Drop
dist = distributions.drop.Uniform(minimum=1, maximum=1)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Gaussian distribution for impairment Drop
dist = distributions.drop.Gaussian(mean=1, sd=1)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Poisson distribution for impairment Drop
dist = distributions.drop.Poisson(lamda=9)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Gamma distribution for impairment Drop
dist = distributions.drop.Gamma(shape=1, scale=1)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Custom distribution for impairment Drop
data_x=[0, 1] * 256
custom_distribution = await port.custom_distributions.add(
linear=False,
entry_count = len(data_x),
data_x=data_x,
comment="Example Custom Distribution"
)
dist = distributions.drop.Custom(custom_distribution=custom_distribution)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Set distribution and start impairment Drop
drop_config = await flow.drop.get()
drop_config.set_distribution(dist)
await flow.drop.start(drop_config)
await flow.drop.stop(drop_config)
# endregion
#----------------------------------------------
# 6. Configure impairment - Misordering
# ---------------------------------------------
# region Configure impairment - Misordering
# Fixed Burst distribution for impairment Misordering
dist = distributions.misordering.FixedBurst(burst_size=1)
dist.repeat(period=5)
dist.one_shot()
# Fixed Rate distribution for impairment Misordering
dist = distributions.misordering.FixedRate(probability=10_000)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Set distribution and start impairment Misordering
misordering_config = await flow.misordering.get()
misordering_config.depth = 1
misordering_config.set_distribution(dist)
await flow.misordering.start(misordering_config)
await flow.misordering.stop(misordering_config)
# endregion
#----------------------------------------------
# 7. Configure impairment - Latency & Jitter
# ---------------------------------------------
# region Configure impairment - Latency & Jitter
# Fixed Burst distribution for impairment Latency & Jitter
dist = distributions.latency_jitter.ConstantDelay(delay=100)
# Random Burst distribution for impairment Latency & Jitter
dist = distributions.latency_jitter.AccumulateBurst(burst_delay=1300)
dist.repeat(period=1)
dist.one_shot()
# Step distribution for impairment Latency & Jitter
dist = distributions.latency_jitter.Step(min=1300, max=77000)
dist.continuous()
# Uniform distribution for impairment Latency & Jitter
dist = distributions.latency_jitter.Uniform(minimum=1, maximum=1)
dist.continuous()
# Gaussian distribution for impairment Latency & Jitter
dist = distributions.latency_jitter.Gaussian(mean=1, sd=1)
dist.continuous()
# Poisson distribution for impairment Latency & Jitter
dist = distributions.latency_jitter.Poisson(lamda=9)
dist.continuous()
# Gamma distribution for impairment Latency & Jitter
dist = distributions.latency_jitter.Gamma(shape=1, scale=1)
dist.continuous()
# Custom distribution for impairment Latency & Jitter
data_x=[0, 1] * 256
custom_distribution = await port.custom_distributions.add(
linear=False,
entry_count = len(data_x),
data_x=data_x,
comment="Example Custom Distribution"
)
dist = distributions.latency_jitter.Custom(custom_distribution=custom_distribution)
dist.continuous()
# Set distribution and start impairment Latency & Jitter
latency_jitter_config = await flow.latency_jitter.get()
latency_jitter_config.set_distribution(dist)
await flow.latency_jitter.start(latency_jitter_config)
await flow.latency_jitter.stop(latency_jitter_config)
# endregion
#----------------------------------------------
# 8. Configure impairment - Duplication
# ---------------------------------------------
# region Configure impairment - Duplication
# Fixed Burst distribution for impairment Duplication
dist = distributions.duplication.FixedBurst(burst_size=5)
dist.repeat(period=5)
dist.one_shot()
# Random Burst distribution for impairment Duplication
dist = distributions.duplication.RandomBurst(minimum=1, maximum=10, probability=10_000)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Fixed Rate distribution for impairment Duplication
dist = distributions.duplication.FixedRate(probability=10_000)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Bit Error Rate distribution for impairment Duplication
dist = distributions.duplication.BitErrorRate(coefficient=1, exponent=1)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Random Rate distribution for impairment Duplication
dist = distributions.duplication.RandomRate(probability=10_000)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Gilbert Elliot distribution for impairment Duplication
dist = distributions.duplication.GilbertElliot(good_state_impair_prob=0, good_state_trans_prob=0, bad_state_impair_prob=0, bad_state_trans_prob=0)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Uniform distribution for impairment Duplication
dist = distributions.duplication.Uniform(minimum=1, maximum=1)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Gaussian distribution for impairment Duplication
dist = distributions.duplication.Gaussian(mean=1, sd=1)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Poisson distribution for impairment Duplication
dist = distributions.duplication.Poisson(lamda=9)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Gamma distribution for impairment Duplication
dist = distributions.duplication.Gamma(shape=1, scale=1)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Custom distribution for impairment Duplication
data_x=[0, 1] * 256
custom_distribution = await port.custom_distributions.add(
linear=False,
entry_count = len(data_x),
data_x=data_x,
comment="Example Custom Distribution"
)
dist = distributions.duplication.Custom(custom_distribution=custom_distribution)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Set distribution and start impairment Duplication
duplication_config = await flow.duplication.get()
duplication_config.set_distribution(dist)
await flow.duplication.start(duplication_config)
await flow.duplication.stop(duplication_config)
# endregion
#----------------------------------------------
# 9. Configure impairment - Corruption
# ---------------------------------------------
# region Configure impairment - Corruption
# Fixed Burst distribution for impairment Corruption
dist = distributions.corruption.FixedBurst(burst_size=5)
dist.repeat(period=5)
dist.one_shot()
# Random Burst distribution for impairment Corruption
dist = distributions.corruption.RandomBurst(minimum=1, maximum=10, probability=10_000)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Fixed Rate distribution for impairment Corruption
dist = distributions.corruption.FixedRate(probability=10_000)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Bit Error Rate distribution for impairment Corruption
dist = distributions.corruption.BitErrorRate(coefficient=1, exponent=1)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Random Rate distribution for impairment Corruption
dist = distributions.corruption.RandomRate(probability=10_000)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Gilbert Elliot distribution for impairment Corruption
dist = distributions.corruption.GilbertElliot(good_state_impair_prob=0, good_state_trans_prob=0, bad_state_impair_prob=0, bad_state_trans_prob=0)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Uniform distribution for impairment Corruption
dist = distributions.corruption.Uniform(minimum=1, maximum=1)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Gaussian distribution for impairment Corruption
dist = distributions.corruption.Gaussian(mean=1, sd=1)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Poisson distribution for impairment Corruption
dist = distributions.corruption.Poisson(lamda=9)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Gamma distribution for impairment Corruption
dist = distributions.corruption.Gamma(shape=1, scale=1)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Custom distribution for impairment Corruption
data_x=[0, 1] * 256
custom_distribution = await port.custom_distributions.add(
linear=False,
entry_count = len(data_x),
data_x=data_x,
comment="Example Custom Distribution"
)
dist = distributions.corruption.Custom(custom_distribution=custom_distribution)
dist.repeat_pattern(duration=1, period=1)
dist.continuous()
# Set distribution and start impairment Corruption
corruption_config = await flow.corruption.get()
corruption_config.corruption_type = enums.CorruptionType.ETH
corruption_config.corruption_type = enums.CorruptionType.IP
corruption_config.corruption_type = enums.CorruptionType.TCP
corruption_config.corruption_type = enums.CorruptionType.UDP
corruption_config.corruption_type = enums.CorruptionType.BER
corruption_config.set_distribution(dist)
await flow.corruption.start(corruption_config)
await flow.corruption.stop(corruption_config)
# endregion
#----------------------------------------------
# 10. Configure bandwidth control - Policer
# ---------------------------------------------
# region Configure bandwidth control - Policer
# Set and start bandwidth control Policer
policer_config = await flow.policer.get()
policer_config.set_control_mode(mode=enums.PolicerMode.L1)
policer_config.set_control_on_l1()
policer_config.set_control_mode(mode=enums.PolicerMode.L2)
policer_config.set_control_on_l2()
policer_config.mode = enums.PolicerMode.L1
policer_config.mode = enums.PolicerMode.L2
policer_config.cir = 10_000
policer_config.cbs = 1_000
policer_config.set_on_off(on_off=enums.OnOff.ON)
policer_config.set_on()
policer_config.set_on_off(on_off=enums.OnOff.OFF)
policer_config.set_off()
await flow.policer.start(policer_config)
await flow.policer.stop(policer_config)
# endregion
#----------------------------------------------
# 11. Configure bandwidth control - Shaper
# ---------------------------------------------
# region Configure bandwidth control - Shaper
# Set and start bandwidth control Shaper
shaper_config = await flow.shaper.get()
shaper_config.set_control_mode(mode=enums.PolicerMode.L1)
shaper_config.set_control_on_l1()
shaper_config.set_control_mode(mode=enums.PolicerMode.L2)
shaper_config.set_control_on_l2()
shaper_config.mode = enums.PolicerMode.L1
shaper_config.mode = enums.PolicerMode.L2
shaper_config.cir = 10_000
shaper_config.cbs = 1_000
shaper_config.buffer_size = 1_000
shaper_config.set_on_off(on_off=enums.OnOff.ON)
shaper_config.set_on()
shaper_config.set_on_off(on_off=enums.OnOff.OFF)
shaper_config.set_off()
await flow.shaper.start(shaper_config)
await flow.shaper.stop(shaper_config)
# endregion
#----------------------------------------------
# 12. Flow statistics
# ---------------------------------------------
# region Flow Statistics
rx_total = await flow.statistics.rx.total.get()
rx_total.byte_count
rx_total.packet_count
rx_total.l2_bps
rx_total.pps
tx_total = await flow.statistics.tx.total.get()
tx_total.byte_count
tx_total.packet_count
tx_total.l2_bps
tx_total.pps
flow_drop_total = await flow.statistics.total.dropped.get()
flow_drop_total.pkt_drop_count_total
flow_drop_total.pkt_drop_count_programmed
flow_drop_total.pkt_drop_count_bandwidth
flow_drop_total.pkt_drop_count_other
flow_drop_total.pkt_drop_ratio_total
flow_drop_total.pkt_drop_ratio_programmed
flow_drop_total.pkt_drop_ratio_bandwidth
flow_drop_total.pkt_drop_ratio_other
flow_corrupted_total = await flow.statistics.total.corrupted.get()
flow_corrupted_total.fcs_corrupted_pkt_count
flow_corrupted_total.fcs_corrupted_pkt_ratio
flow_corrupted_total.ip_corrupted_pkt_count
flow_corrupted_total.ip_corrupted_pkt_ratio
flow_corrupted_total.tcp_corrupted_pkt_count
flow_corrupted_total.tcp_corrupted_pkt_ratio
flow_corrupted_total.total_corrupted_pkt_count
flow_corrupted_total.total_corrupted_pkt_ratio
flow_corrupted_total.udp_corrupted_pkt_count
flow_corrupted_total.udp_corrupted_pkt_ratio
flow_delayed_total = await flow.statistics.total.delayed.get()
flow_delayed_total.pkt_count
flow_delayed_total.ratio
flow_jittered_total = await flow.statistics.total.jittered.get()
flow_jittered_total.pkt_count
flow_jittered_total.ratio
flow_duplicated_total = await flow.statistics.total.duplicated.get()
flow_duplicated_total.pkt_count
flow_duplicated_total.ratio
flow_misordered_total = await flow.statistics.total.misordered.get()
flow_misordered_total.pkt_count
flow_misordered_total.ratio
await flow.statistics.tx.clear.set()
await flow.statistics.rx.clear.set()
await flow.statistics.clear.set()
# endregion
#----------------------------------------------
# 13. Port statistics
# ---------------------------------------------
# region Port Statistics
port_drop = await port.config.statistics.dropped.get()
port_drop.pkt_drop_count_total
port_drop.pkt_drop_count_programmed
port_drop.pkt_drop_count_bandwidth
port_drop.pkt_drop_count_other
port_drop.pkt_drop_ratio_total
port_drop.pkt_drop_ratio_programmed
port_drop.pkt_drop_ratio_bandwidth
port_drop.pkt_drop_ratio_other
port_corrupted = await port.config.statistics.corrupted.get()
port_corrupted.fcs_corrupted_pkt_count
port_corrupted.fcs_corrupted_pkt_ratio
port_corrupted.ip_corrupted_pkt_count
port_corrupted.ip_corrupted_pkt_ratio
port_corrupted.tcp_corrupted_pkt_count
port_corrupted.tcp_corrupted_pkt_ratio
port_corrupted.total_corrupted_pkt_count
port_corrupted.total_corrupted_pkt_ratio
port_corrupted.udp_corrupted_pkt_count
port_corrupted.udp_corrupted_pkt_ratio
port_delayed = await port.config.statistics.delayed.get()
port_delayed.pkt_count
port_delayed.ratio
port_jittered = await port.config.statistics.jittered.get()
port_jittered.pkt_count
port_jittered.ratio
port_duplicated = await port.config.statistics.duplicated.get()
port_duplicated.pkt_count
port_duplicated.ratio
port_misordered = await port.config.statistics.misordered.get()
port_misordered.pkt_count
port_misordered.ratio
await port.config.statistics.clear.set()
# endregion
async def main() -> None:
stop_event = asyncio.Event()
await my_awesome_func(stop_event=stop_event)
if __name__ == "__main__":
asyncio.run(main())