File: test_map_batch_ops.py

package info (click to toggle)
bpfcc 0.31.0%2Bds-7
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 27,208 kB
  • sloc: ansic: 758,058; python: 40,671; cpp: 25,637; sh: 780; makefile: 279
file content (146 lines) | stat: -rwxr-xr-x 4,663 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!/usr/bin/env python3
#
# USAGE: test_map_batch_ops.py
#
# Copyright (c) Emilien Gobillot
# Licensed under the Apache License, Version 2.0 (the "License")

from __future__ import print_function
from unittest import main, skipUnless, TestCase
from utils import kernel_version_ge
from bcc import BPF

import os
import ctypes as ct


@skipUnless(kernel_version_ge(5, 6), "requires kernel >= 5.6")
class TestMapBatch(TestCase):
    MAPSIZE = 1024
    SUBSET_SIZE = 32

    def fill_hashmap(self):
        b = BPF(text=b"""BPF_HASH(map, int, int, %d);""" % self.MAPSIZE)
        hmap = b[b"map"]
        for i in range(0, self.MAPSIZE):
            hmap[ct.c_int(i)] = ct.c_int(i)
        return hmap

    def prepare_keys_subset(self, hmap, count=None):
        if not count:
            count = self.SUBSET_SIZE
        keys = (hmap.Key * count)()
        i = 0
        for k, _ in sorted(hmap.items_lookup_batch(), key=lambda k:k[0].value):
            if i < count:
                keys[i] = k.value
                i += 1
            else:
                break

        return keys

    def prepare_values_subset(self, hmap, count=None):
        if not count:
            count = self.SUBSET_SIZE
        values = (hmap.Leaf * count)()
        i = 0
        for _, v in sorted(hmap.items_lookup_batch(), key=lambda k:k[0].value):
            if i < count:
                values[i] = v.value * v.value
                i += 1
            else:
                break
        return values

    def check_hashmap_values(self, it):
        i = 0
        for k, v in sorted(it, key=lambda kv:kv[0].value):
            self.assertEqual(k.value, i)
            self.assertEqual(v.value, i)
            i += 1
        return i

    def test_lookup_and_delete_batch_all_keys(self):
        # fill the hashmap
        hmap = self.fill_hashmap()

        # check values and count them
        count = self.check_hashmap_values(hmap.items_lookup_and_delete_batch())
        self.assertEqual(count, self.MAPSIZE)

        # and check the delete has worked, i.e map is now empty
        count = sum(1 for _ in hmap.items())
        self.assertEqual(count, 0)

    def test_lookup_batch_all_keys(self):
        # fill the hashmap
        hmap = self.fill_hashmap()

        # check values and count them
        count = self.check_hashmap_values(hmap.items_lookup_batch())
        self.assertEqual(count, self.MAPSIZE)

    def test_delete_batch_all_keys(self):
        # Delete all key/value in the map
        # fill the hashmap
        hmap = self.fill_hashmap()
        hmap.items_delete_batch()

        # check the delete has worked, i.e map is now empty
        count = sum(1 for _ in hmap.items())
        self.assertEqual(count, 0)

    def test_delete_batch_subset(self):
        # Delete only a subset of key/value in the map
        # fill the hashmap
        hmap = self.fill_hashmap()
        keys = self.prepare_keys_subset(hmap)

        hmap.items_delete_batch(keys)
        # check the delete has worked, i.e map is now empty
        count = sum(1 for _ in hmap.items())
        self.assertEqual(count, self.MAPSIZE - self.SUBSET_SIZE)

    def test_update_batch_all_keys(self):
        hmap = self.fill_hashmap()

        # preparing keys and new values arrays
        keys = (hmap.Key * self.MAPSIZE)()
        new_values = (hmap.Leaf * self.MAPSIZE)()
        for i in range(self.MAPSIZE):
            keys[i] = ct.c_int(i)
            new_values[i] = ct.c_int(-1)
        hmap.items_update_batch(keys, new_values)

        # check the update has worked, i.e sum of values is -NUM_KEYS
        count = sum(v.value for v in hmap.values())
        self.assertEqual(count, -1*self.MAPSIZE)

    def test_update_batch_subset(self):
        # fill the hashmap
        hmap = self.fill_hashmap()
        keys = self.prepare_keys_subset(hmap, count=self.SUBSET_SIZE)
        new_values = self.prepare_values_subset(hmap, count=self.SUBSET_SIZE)

        hmap.items_update_batch(keys, new_values)

        # check all the values in the map
        # the first self.SUBSET_SIZE keys follow this rule value = keys * keys
        # the remaning keys follow this rule : value = keys
        i = 0
        for k, v in sorted(hmap.items_lookup_batch(),
                           key=lambda kv:kv[0].value):
            if i < self.SUBSET_SIZE:
                # values are the square of the keys
                self.assertEqual(v.value, k.value * k.value)
                i += 1
            else:
                # values = keys
                self.assertEqual(v.value, k.value)

        self.assertEqual(i, self.SUBSET_SIZE)


if __name__ == "__main__":
    main()