File: packet.py

package info (click to toggle)
dpdk 25.11-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 127,892 kB
  • sloc: ansic: 2,358,479; python: 16,426; sh: 4,474; makefile: 1,713; awk: 70
file content (352 lines) | stat: -rw-r--r-- 12,336 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
# SPDX-License-Identifier: BSD-3-Clause
# Copyright(c) 2025 Arm Limited

"""Packet utilities for test suites.

The module provides helpers for:
    * Packet sending and verification,
    * Packet adjustments and modification.

Example:
    .. code:: python

        from scapy.layers.inet import IP
        from scapy.layers.l2 import Ether
        from api.packet import send_packet_and_capture, get_expected_packet, match_all_packets

        pkt = Ether()/IP()/b"payload"
        received = send_packet_and_capture(pkt)
        expected = get_expected_packet(pkt)
        match_all_packets([expected], received)
"""

from collections import Counter
from typing import cast

from scapy.layers.inet import IP
from scapy.layers.l2 import Ether
from scapy.packet import Packet, Padding, raw

from api.test import fail, log_debug
from framework.context import get_ctx
from framework.exception import InternalError
from framework.testbed_model.traffic_generator.capturing_traffic_generator import (
    PacketFilteringConfig,
)
from framework.testbed_model.traffic_generator.performance_traffic_generator import (
    PerformanceTrafficStats,
)
from framework.utils import get_packet_summaries


def send_packet_and_capture(
    packet: Packet,
    filter_config: PacketFilteringConfig = PacketFilteringConfig(),
    duration: float = 1,
) -> list[Packet]:
    """Send and receive `packet` using the associated TG.

    Send `packet` through the appropriate interface and receive on the appropriate interface.
    Modify the packet with l3/l2 addresses corresponding to the testbed and desired traffic.

    Args:
        packet: The packet to send.
        filter_config: The filter to use when capturing packets.
        duration: Capture traffic for this amount of time after sending `packet`.

    Returns:
        A list of received packets.
    """
    return send_packets_and_capture(
        [packet],
        filter_config,
        duration,
    )


def send_packets_and_capture(
    packets: list[Packet],
    filter_config: PacketFilteringConfig = PacketFilteringConfig(),
    duration: float = 1,
) -> list[Packet]:
    """Send and receive `packets` using the associated TG.

    Send `packets` through the appropriate interface and receive on the appropriate interface.
    Modify the packets with l3/l2 addresses corresponding to the testbed and desired traffic.

    Args:
        packets: The packets to send.
        filter_config: The filter to use when capturing packets.
        duration: Capture traffic for this amount of time after sending `packet`.

    Returns:
        A list of received packets.
    """
    from framework.context import get_ctx
    from framework.testbed_model.traffic_generator.capturing_traffic_generator import (
        CapturingTrafficGenerator,
    )

    assert isinstance(
        get_ctx().func_tg, CapturingTrafficGenerator
    ), "Cannot capture with a non-capturing traffic generator"
    tg: CapturingTrafficGenerator = cast(CapturingTrafficGenerator, get_ctx().func_tg)
    # TODO: implement @requires for types of traffic generator
    packets = adjust_addresses(packets)
    return tg.send_packets_and_capture(
        packets,
        get_ctx().topology.tg_port_egress,
        get_ctx().topology.tg_port_ingress,
        filter_config,
        duration,
    )


def send_packets(
    packets: list[Packet],
) -> None:
    """Send packets using the traffic generator and do not capture received traffic.

    Args:
        packets: Packets to send.
    """
    packets = adjust_addresses(packets)
    tg = get_ctx().func_tg
    if tg:
        tg.send_packets(packets, get_ctx().topology.tg_port_egress)


def get_expected_packets(
    packets: list[Packet],
    sent_from_tg: bool = False,
) -> list[Packet]:
    """Inject the proper L2/L3 addresses into `packets`.

    Inject the L2/L3 addresses expected at the receiving end of the traffic generator.

    Args:
        packets: The packets to modify.
        sent_from_tg: If :data:`True` packet was sent from the TG.

    Returns:
        `packets` with injected L2/L3 addresses.
    """
    return adjust_addresses(packets, not sent_from_tg)


def get_expected_packet(
    packet: Packet,
    sent_from_tg: bool = False,
) -> Packet:
    """Inject the proper L2/L3 addresses into `packet`.

    Inject the L2/L3 addresses expected at the receiving end of the traffic generator.

    Args:
        packet: The packet to modify.
        sent_from_tg: If :data:`True` packet was sent from the TG.

    Returns:
        `packet` with injected L2/L3 addresses.
    """
    return get_expected_packets([packet], sent_from_tg)[0]


def adjust_addresses(packets: list[Packet], expected: bool = False) -> list[Packet]:
    """L2 and L3 address additions in both directions.

    Copies of `packets` will be made, modified and returned in this method.

    Only missing addresses are added to packets, existing addresses will not be overridden. If
    any packet in `packets` has multiple IP layers (using GRE, for example) only the inner-most
    IP layer will have its addresses adjusted.

    Assumptions:
        Two links between SUT and TG, one link is TG -> SUT, the other SUT -> TG.

    Args:
        packets: The packets to modify.
        expected: If :data:`True`, the direction is SUT -> TG,
            otherwise the direction is TG -> SUT.

    Returns:
        A list containing copies of all packets in `packets` after modification.

    Raises:
        InternalError: If no tests are running.
    """
    from framework.test_suite import TestSuite

    if get_ctx().local.current_test_suite is None:
        raise InternalError("No current test suite, tests aren't running?")
    current_test_suite: TestSuite = cast(TestSuite, get_ctx().local.current_test_suite)
    return current_test_suite._adjust_addresses(packets, expected)


def match_all_packets(
    expected_packets: list[Packet],
    received_packets: list[Packet],
    verify: bool = True,
) -> bool:
    """Matches all the expected packets against the received ones.

    Matching is performed by counting down the occurrences in a dictionary which keys are the
    raw packet bytes. No deep packet comparison is performed. All the unexpected packets (noise)
    are automatically ignored.

    Args:
        expected_packets: The packets we are expecting to receive.
        received_packets: All the packets that were received.
        verify: If :data:`True`, and there are missing packets an exception will be raised.

    Raises:
        TestCaseVerifyError: if and not all the `expected_packets` were found in
            `received_packets`.

    Returns:
        :data:`True` If there are no missing packets.
    """
    expected_packets_counters = Counter(map(raw, expected_packets))
    received_packets_counters = Counter(map(raw, received_packets))
    # The number of expected packets is subtracted by the number of received packets, ignoring
    # any unexpected packets and capping at zero.
    missing_packets_counters = expected_packets_counters - received_packets_counters
    missing_packets_count = missing_packets_counters.total()
    log_debug(
        f"match_all_packets: expected {len(expected_packets)}, "
        f"received {len(received_packets)}, missing {missing_packets_count}"
    )

    if missing_packets_count != 0:
        if verify:
            fail(
                f"Not all packets were received, expected {len(expected_packets)} "
                f"but {missing_packets_count} were missing."
            )
        return False

    return True


def verify_packets(expected_packet: Packet, received_packets: list[Packet]) -> None:
    """Verify that `expected_packet` has been received.

    Go through `received_packets` and check that `expected_packet` is among them.
    If not, raise an exception and log the last 10 commands
    executed on both the SUT and TG.

    Args:
        expected_packet: The packet we're expecting to receive.
        received_packets: The packets where we're looking for `expected_packet`.

    Raises:
        TestCaseVerifyError: `expected_packet` is not among `received_packets`.
    """
    for received_packet in received_packets:
        if _compare_packets(expected_packet, received_packet):
            break
    else:
        log_debug(
            f"The expected packet {expected_packet.summary()} "
            f"not found among received {get_packet_summaries(received_packets)}"
        )
        fail("An expected packet not found among received packets.")


def _compare_packets(expected_packet: Packet, received_packet: Packet) -> bool:
    log_debug(f"Comparing packets: \n{expected_packet.summary()}\n{received_packet.summary()}")

    l3 = IP in expected_packet.layers()
    log_debug("Found l3 layer")

    received_payload = received_packet
    expected_payload = expected_packet
    while received_payload and expected_payload:
        log_debug("Comparing payloads:")
        log_debug(f"Received: {received_payload}")
        log_debug(f"Expected: {expected_payload}")
        if type(received_payload) is type(expected_payload):
            log_debug("The layers are the same.")
            if type(received_payload) is Ether:
                if not _verify_l2_frame(received_payload, l3):
                    return False
            elif type(received_payload) is IP:
                assert type(expected_payload) is IP
                if not _verify_l3_packet(received_payload, expected_payload):
                    return False
        else:
            # Different layers => different packets
            return False
        received_payload = received_payload.payload
        expected_payload = expected_payload.payload

    if expected_payload:
        log_debug(f"The expected packet did not contain {expected_payload}.")
        return False
    if received_payload and received_payload.__class__ != Padding:
        log_debug("The received payload had extra layers which were not padding.")
        return False
    return True


def _verify_l2_frame(received_packet: Ether, contains_l3: bool) -> bool:
    """Verify the L2 frame of `received_packet`.

    Args:
        received_packet: The received L2 frame to verify.
        contains_l3: If :data:`True`, the packet contains an L3 layer.
    """
    log_debug("Looking at the Ether layer.")
    log_debug(
        f"Comparing received dst mac '{received_packet.dst}' "
        f"with expected '{get_ctx().topology.tg_port_ingress.mac_address}'."
    )
    if received_packet.dst != get_ctx().topology.tg_port_ingress.mac_address:
        return False

    expected_src_mac = get_ctx().topology.tg_port_egress.mac_address
    if contains_l3:
        expected_src_mac = get_ctx().topology.sut_port_egress.mac_address
    log_debug(
        f"Comparing received src mac '{received_packet.src}' "
        f"with expected '{expected_src_mac}'."
    )
    if received_packet.src != expected_src_mac:
        return False

    return True


def _verify_l3_packet(received_packet: IP, expected_packet: IP) -> bool:
    log_debug("Looking at the IP layer.")
    if received_packet.src != expected_packet.src or received_packet.dst != expected_packet.dst:
        return False
    return True


def assess_performance_by_packet(
    packet: Packet, duration: float, send_mpps: int | None = None
) -> PerformanceTrafficStats:
    """Send a given packet for a given duration and assess basic performance statistics.

    Send `packet` and assess NIC performance for a given duration, corresponding to the test
    suite's given topology.

    Args:
        packet: The packet to send.
        duration: Performance test duration (in seconds).
        send_mpps: The millions packets per second send rate.

    Returns:
        Performance statistics of the generated test.
    """
    from framework.testbed_model.traffic_generator.performance_traffic_generator import (
        PerformanceTrafficGenerator,
    )

    assert isinstance(
        get_ctx().perf_tg, PerformanceTrafficGenerator
    ), "Cannot send performance traffic with non-performance traffic generator"
    tg: PerformanceTrafficGenerator = cast(PerformanceTrafficGenerator, get_ctx().perf_tg)
    # TODO: implement @requires for types of traffic generator
    return tg.calculate_traffic_and_stats(packet, duration, send_mpps)