1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
|
#!/usr/bin/env python3
#
# USAGE: test_map_batch_ops.py
#
# Copyright (c) Emilien Gobillot
# Licensed under the Apache License, Version 2.0 (the "License")
from __future__ import print_function
from unittest import main, skipUnless, TestCase
from utils import kernel_version_ge
from bcc import BPF
import os
import ctypes as ct
@skipUnless(kernel_version_ge(5, 6), "requires kernel >= 5.6")
class TestMapBatch(TestCase):
MAPSIZE = 1024
SUBSET_SIZE = 32
def fill_hashmap(self):
b = BPF(text=b"""BPF_HASH(map, int, int, %d);""" % self.MAPSIZE)
hmap = b[b"map"]
for i in range(0, self.MAPSIZE):
hmap[ct.c_int(i)] = ct.c_int(i)
return hmap
def prepare_keys_subset(self, hmap, count=None):
if not count:
count = self.SUBSET_SIZE
keys = (hmap.Key * count)()
i = 0
for k, _ in sorted(hmap.items_lookup_batch(), key=lambda k:k[0].value):
if i < count:
keys[i] = k.value
i += 1
else:
break
return keys
def prepare_values_subset(self, hmap, count=None):
if not count:
count = self.SUBSET_SIZE
values = (hmap.Leaf * count)()
i = 0
for _, v in sorted(hmap.items_lookup_batch(), key=lambda k:k[0].value):
if i < count:
values[i] = v.value * v.value
i += 1
else:
break
return values
def check_hashmap_values(self, it):
i = 0
for k, v in sorted(it, key=lambda kv:kv[0].value):
self.assertEqual(k.value, i)
self.assertEqual(v.value, i)
i += 1
return i
def test_lookup_and_delete_batch_all_keys(self):
# fill the hashmap
hmap = self.fill_hashmap()
# check values and count them
count = self.check_hashmap_values(hmap.items_lookup_and_delete_batch())
self.assertEqual(count, self.MAPSIZE)
# and check the delete has worked, i.e map is now empty
count = sum(1 for _ in hmap.items())
self.assertEqual(count, 0)
def test_lookup_batch_all_keys(self):
# fill the hashmap
hmap = self.fill_hashmap()
# check values and count them
count = self.check_hashmap_values(hmap.items_lookup_batch())
self.assertEqual(count, self.MAPSIZE)
def test_delete_batch_all_keys(self):
# Delete all key/value in the map
# fill the hashmap
hmap = self.fill_hashmap()
hmap.items_delete_batch()
# check the delete has worked, i.e map is now empty
count = sum(1 for _ in hmap.items())
self.assertEqual(count, 0)
def test_delete_batch_subset(self):
# Delete only a subset of key/value in the map
# fill the hashmap
hmap = self.fill_hashmap()
keys = self.prepare_keys_subset(hmap)
hmap.items_delete_batch(keys)
# check the delete has worked, i.e map is now empty
count = sum(1 for _ in hmap.items())
self.assertEqual(count, self.MAPSIZE - self.SUBSET_SIZE)
def test_update_batch_all_keys(self):
hmap = self.fill_hashmap()
# preparing keys and new values arrays
keys = (hmap.Key * self.MAPSIZE)()
new_values = (hmap.Leaf * self.MAPSIZE)()
for i in range(self.MAPSIZE):
keys[i] = ct.c_int(i)
new_values[i] = ct.c_int(-1)
hmap.items_update_batch(keys, new_values)
# check the update has worked, i.e sum of values is -NUM_KEYS
count = sum(v.value for v in hmap.values())
self.assertEqual(count, -1*self.MAPSIZE)
def test_update_batch_subset(self):
# fill the hashmap
hmap = self.fill_hashmap()
keys = self.prepare_keys_subset(hmap, count=self.SUBSET_SIZE)
new_values = self.prepare_values_subset(hmap, count=self.SUBSET_SIZE)
hmap.items_update_batch(keys, new_values)
# check all the values in the map
# the first self.SUBSET_SIZE keys follow this rule value = keys * keys
# the remaning keys follow this rule : value = keys
i = 0
for k, v in sorted(hmap.items_lookup_batch(),
key=lambda kv:kv[0].value):
if i < self.SUBSET_SIZE:
# values are the square of the keys
self.assertEqual(v.value, k.value * k.value)
i += 1
else:
# values = keys
self.assertEqual(v.value, k.value)
self.assertEqual(i, self.SUBSET_SIZE)
if __name__ == "__main__":
main()
|