1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
|
# SPDX-License-Identifier: BSD-3-Clause
# Copyright(c) 2024 University of New Hampshire
"""Rx/Tx queue start and stop functionality suite.
This suite tests the ability of the poll mode driver to start and
stop either the Rx or Tx queue (depending on the port) during runtime,
and verify that packets are not received when one is disabled.
Given a paired port topology, the Rx queue will be disabled on port 0,
and the Tx queue will be disabled on port 1.
"""
from scapy.layers.inet import IP
from scapy.layers.l2 import Ether
from scapy.packet import Raw
from api.capabilities import (
LinkTopology,
NicCapability,
requires_link_topology,
requires_nic_capability,
)
from api.packet import send_packet_and_capture
from api.test import verify
from api.testpmd import TestPmd
from api.testpmd.config import SimpleForwardingModes
from framework.test_suite import TestSuite, func_test
@requires_link_topology(LinkTopology.TWO_LINKS)
@requires_nic_capability(NicCapability.RUNTIME_RX_QUEUE_SETUP)
@requires_nic_capability(NicCapability.RUNTIME_TX_QUEUE_SETUP)
class TestQueueStartStop(TestSuite):
"""DPDK Queue start/stop test suite.
Ensures Rx/Tx queue on a port can be disabled and enabled.
Verifies packets are not received when either queue is disabled.
The suite contains four test cases, two Rx queue start/stop and
two Tx queue start/stop, which each disable the corresponding
queue and verify that packets are not received/forwarded. There
are two cases that verify deferred start mode produces the expected
behavior in both the Rx and Tx queue.
"""
def _send_packet_and_verify(self, should_receive: bool = True) -> None:
"""Generate a packet, send to the DUT, and verify it is forwarded back.
Args:
should_receive: Indicate whether the packet should be received.
"""
packet = Ether() / IP() / Raw(load="xxxxx")
received = send_packet_and_capture(packet)
contains_packet = any(
packet.haslayer(Raw) and b"xxxxx" in packet.load for packet in received
)
verify(
should_receive == contains_packet,
f"Packet was {'dropped' if should_receive else 'received'}",
)
@func_test
def rx_queue_start_stop(self) -> None:
"""Rx queue start stop test.
Steps:
* Launch testpmd, stop Rx queue 0 on port 0.
* Stop testpmd, start Rx queue 0 on port 0, start testpmd.
Verify:
* Send a packet on port 0 after Rx queue is stopped and the packet is not received.
* Send a packet on port 0 after Rx queue is started and the packet is received.
"""
with TestPmd() as testpmd:
testpmd.set_forward_mode(SimpleForwardingModes.mac)
testpmd.stop_port_queue(0, 0, True)
testpmd.start()
self._send_packet_and_verify(should_receive=False)
testpmd.stop()
testpmd.start_port_queue(0, 0, True)
testpmd.start()
self._send_packet_and_verify(should_receive=True)
@func_test
def tx_queue_start_stop(self) -> None:
"""Tx queue start stop test.
Steps:
* Launch testpmd, stop Tx queue 0 on port 0.
* Stop testpmd, start Tx queue 0 on port 0, start testpmd.
Verify:
* Send a packet on port 0 after Tx queue is stopped and the packet is not received.
* Send a packet on port 0 after Tx queue is started and the packet is received.
"""
with TestPmd() as testpmd:
testpmd.set_forward_mode(SimpleForwardingModes.mac)
testpmd.stop_port_queue(1, 0, False)
testpmd.start()
self._send_packet_and_verify(should_receive=False)
testpmd.stop()
testpmd.start_port_queue(1, 0, False)
testpmd.start()
self._send_packet_and_verify(should_receive=True)
@func_test
def rx_queue_deferred_start(self) -> None:
"""Rx queue deferred start stop test.
Steps:
* Stop all ports, enable deferred start mode on port 0 Rx queue 0, start all ports.
* Launch testpmd, send a packet.
* Stop testpmd, start port 0 Rx queue 0.
* Start testpmd, send a packet.
Verify:
* Send a packet on port 0 after deferred start is set and the packet is not received.
* Send a packet on port 0 after Rx queue 0 is started and the packet is received.
"""
with TestPmd() as testpmd:
testpmd.set_forward_mode(SimpleForwardingModes.mac)
testpmd.stop_all_ports()
testpmd.set_queue_deferred_start(0, 0, True, True)
testpmd.start_all_ports()
testpmd.start()
self._send_packet_and_verify(should_receive=False)
testpmd.stop()
testpmd.start_port_queue(0, 0, True)
testpmd.start()
self._send_packet_and_verify(should_receive=True)
@func_test
def tx_queue_deferred_start(self) -> None:
"""Tx queue start stop test.
Steps:
* Stop all ports, enable deferred start mode on port 1 Tx queue 0, start all ports.
* Launch testpmd, send a packet.
* Stop testpmd, start port 1 Tx queue 0.
* Start testpmd, send a packet.
Verify:
* Send a packet on port 1 after deferred start is set and the packet is not received.
* Send a packet on port 1 after Tx queue 0 is started and the packet is received.
"""
with TestPmd() as testpmd:
testpmd.set_forward_mode(SimpleForwardingModes.mac)
testpmd.stop_all_ports()
testpmd.set_queue_deferred_start(1, 0, False, True)
testpmd.start_all_ports()
testpmd.start()
self._send_packet_and_verify(should_receive=False)
testpmd.stop()
testpmd.start_port_queue(1, 0, False)
testpmd.start()
self._send_packet_and_verify(should_receive=True)
|