File: test_wiset.py

package info (click to toggle)
pyroute2 0.8.1-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,704 kB
  • sloc: python: 50,245; makefile: 280; javascript: 183; ansic: 81; sh: 44; awk: 17
file content (362 lines) | stat: -rw-r--r-- 11,020 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
from time import sleep

from pr2test.marks import require_root
from utils import require_kernel

from pyroute2.netlink.exceptions import IPSetError
from pyroute2.wiset import (
    IPStats,
    WiSet,
    get_ipset_socket,
    load_all_ipsets,
    load_ipset,
)
from pyroute2.wiset import test_ipset_exist as ipset_exists

pytestmark = [require_root()]


def test_create_one_ipset(ipset_name, wiset_sock):
    with WiSet(name=ipset_name, sock=wiset_sock) as myset:
        myset.create()

        list_wiset = load_all_ipsets(sock=wiset_sock)
        assert ipset_exists(ipset_name, sock=wiset_sock)
        myset.destroy()

        assert not ipset_exists(ipset_name, sock=wiset_sock)
        assert ipset_name in list_wiset
        assert ipset_name not in load_all_ipsets(sock=wiset_sock)


def test_create_ipset_twice(ipset_name, wiset_sock):
    with WiSet(name=ipset_name, sock=wiset_sock) as myset:
        myset.create()

        try:
            myset.create(exclusive=True)
            assert False
        except IPSetError:
            pass

        myset.create(exclusive=False)
        myset.destroy()
        assert ipset_name not in load_all_ipsets(sock=wiset_sock)


def test_check_ipset_stats(ipset_name, wiset_sock):
    def test_stats(myset, res=None, counters=False):
        myset.counters = counters
        myset.create()
        myset.add("8.8.8.8", packets=res, bytes=res)
        myset.update_content()
        stats = myset.content
        myset.flush()

        assert stats["8.8.8.8"].packets == res
        assert stats["8.8.8.8"].bytes == res

        myset.add("8.8.8.8")
        myset.update_content()
        stats = myset.content
        if res is not None:
            res = 0
        assert stats["8.8.8.8"].packets == res
        assert stats["8.8.8.8"].bytes == res
        myset.destroy()

    with WiSet(name=ipset_name, sock=wiset_sock) as myset:
        test_stats(myset, res=10, counters=True)
        test_stats(myset)


def test_ipset_with_comment(ipset_name, wiset_sock):
    comment = "test comment"

    with WiSet(name=ipset_name, sock=wiset_sock, comment=True) as myset:
        myset.create()
        myset.add("8.8.8.8", comment=comment)
        set_list = myset.content

    assert set_list["8.8.8.8"].comment == comment


def test_ipset_with_skbinfo(ipset_name, wiset_sock):
    with WiSet(name=ipset_name, sock=wiset_sock, skbinfo=True) as myset:
        myset.create()
        myset.add("192.168.1.1", skbmark=(0xC8, 0xC8))
        myset.add("192.168.1.2", skbmark=(0xC9, 0xFFFFFFFF))
        myset.add("192.168.1.3", skbmark="0xca/0xca")
        myset.add("192.168.1.4", skbmark="0xCB")
        set_list = myset.content
        myset.destroy()

    assert set_list["192.168.1.1"].skbmark == "0xc8/0xc8"
    assert set_list["192.168.1.2"].skbmark == "0xc9"
    assert set_list["192.168.1.3"].skbmark == "0xca/0xca"
    assert set_list["192.168.1.4"].skbmark == "0xcb"


def test_list_on_large_set(ipset_name, wiset_sock):
    set_size = 30000
    base_ip = "10.10.%d.%d"

    with WiSet(name=ipset_name, sock=wiset_sock) as myset:
        myset.create()
        for i in range(0, set_size):
            myset.add(base_ip % (i / 255, i % 255))
        stats_len = len(myset.content)
        stats_len2 = len(
            load_ipset(ipset_name, content=True, sock=wiset_sock).content
        )

    assert stats_len == set_size
    assert stats_len2 == set_size


def test_remove_entry(ipset_name, wiset_sock):
    ip = "1.1.1.1"

    with WiSet(name=ipset_name, sock=wiset_sock, counters=True) as myset:
        myset.create()
        myset.add(ip)
        assert ip in myset.content
        myset.delete(ip)
        myset.update_content()
        assert ip not in myset.content


def test_flush(ipset_name, wiset_sock):
    ip_list = ["1.2.3.4", "1.1.1.1", "7.7.7.7"]

    with WiSet(name=ipset_name, sock=wiset_sock) as myset:
        myset.create()
        for ip in ip_list:
            myset.add(ip)
            assert myset.test(ip)
        myset.flush()
        for ip in ip_list:
            assert not myset.test(ip)


def test_list_in(ipset_name, wiset_sock):
    ip_list_good = ["1.2.3.4", "1.1.1.1", "7.7.7.7"]
    ip_list_bad = ["4.4.4.4", "5.5.5.5", "6.6.6.6"]

    with WiSet(name=ipset_name, sock=wiset_sock) as myset:
        myset.create()
        myset.replace_entries(ip_list_good)
        res_test = myset.test_list(ip_list_good + ip_list_bad)
        for ip in ip_list_good:
            assert ip in res_test
        for ip in ip_list_bad:
            assert ip not in res_test


def test_timeout(ipset_name, wiset_sock):
    ip = "1.2.3.4"
    timeout = 2

    with WiSet(name=ipset_name, sock=wiset_sock, timeout=timeout) as myset:
        myset.create()
        myset.add(ip)
        sleep(3)
        myset.update_content()
        assert ip not in myset.content
        assert timeout == load_ipset(ipset_name, sock=wiset_sock).timeout
        myset.add(ip, timeout=0)
        myset.update_content()
        assert myset.content[ip].timeout == 0


def test_basic_attribute_reads(ipset_name, wiset_sock):
    for value in [True, False]:
        myset = WiSet(
            name=ipset_name, sock=wiset_sock, counters=value, comment=value
        )
        if wiset_sock is None:
            myset.open_netlink()
        myset.create()
        from_netlink = load_ipset(ipset_name, sock=wiset_sock)
        assert value == from_netlink.comment
        assert value == from_netlink.counters
        myset.destroy()
        if wiset_sock is None:
            myset.close_netlink()


def test_replace_content(ipset_name, wiset_sock):
    list_a = ["1.2.3.4", "2.3.4.5", "6.7.8.9"]
    list_b = ["1.1.1.1", "2.2.2.2", "3.3.3.3"]

    def test_replace(content_a, content_b):
        myset = WiSet(name=ipset_name, sock=wiset_sock)
        myset.create()
        myset.insert_list(content_a)
        myset.update_content()
        for value in content_a:
            assert value in myset.content
        myset.replace_entries(content_b)
        myset.update_content()
        for old, new in zip(content_a, content_b):
            assert old not in myset.content
            assert new in myset.content
        myset.destroy()

    test_replace(list_a, list_b)
    test_replace(set(list_a), set(list_b))


def test_replace_content_with_comment(ipset_name, wiset_sock):
    list_a = [
        {'entry': "1.2.3.4", 'comment': 'foo'},
        {'entry': "2.3.4.5", 'comment': 'foo'},
        {'entry': "6.7.8.9", 'comment': 'bar'},
    ]
    list_b = [
        {'entry': "1.1.1.1", 'comment': 'foo'},
        {'entry': "2.2.2.2", 'comment': 'bar'},
        {'entry': "3.3.3.3", 'comment': 'foo'},
    ]

    def test_replace(content_a, content_b):
        myset = WiSet(name=ipset_name, sock=wiset_sock, comment=True)
        myset.create()
        myset.insert_list(content_a)
        myset.update_content()
        for value in content_a:
            assert value['entry'] in myset.content
            assert value['comment'] == (myset.content[value['entry']].comment)
        myset.replace_entries(content_b)
        myset.update_content()
        for value in content_a:
            assert value['entry'] not in myset.content
        for value in content_b:
            assert value['entry'] in myset.content
            assert value['comment'] == (myset.content[value['entry']].comment)

    test_replace(list_a, list_b)


def test_hash_net_ipset(ipset_name, wiset_sock):
    to_add = ["192.168.1.0/24", "192.168.2.0/23", "10.0.0.0/8"]
    atype = "hash:net"

    with WiSet(name=ipset_name, attr_type=atype, sock=wiset_sock) as myset:
        myset.create()
        myset.insert_list(to_add)
        for value in to_add:
            assert value in myset.content


def test_two_dimensions_ipset(ipset_name, wiset_sock):
    to_add = ["192.168.1.0/24,eth0", "192.168.2.0/23,eth1", "10.0.0.0/8,tun0"]
    atype = "hash:net,iface"

    with WiSet(name=ipset_name, attr_type=atype, sock=wiset_sock) as myset:
        myset.create()
        myset.insert_list(to_add)
        for value in to_add:
            assert value in myset.content


def test_stats_consistency(ipset_name, wiset_sock):
    """Test several way to fill the statistics of one IPSet"""
    entries = ["1.2.3.4", "1.2.3.5", "1.2.3.6"]

    myset = WiSet(name=ipset_name, sock=wiset_sock)
    myset.create()
    myset.insert_list(entries)

    myset_lists = load_all_ipsets(sock=wiset_sock, content=True)[ipset_name]
    for value in entries:
        assert value in myset_lists.content

    myset_list = load_ipset(ipset_name, sock=wiset_sock, content=True)
    for value in entries:
        assert value in myset_list.content


def test_hashnet_with_comment(ipset_name, wiset_sock):
    comment = "abcdef"
    myset = WiSet(
        name=ipset_name, attr_type="hash:net", comment=True, sock=wiset_sock
    )
    myset.create()

    inherit_sock = wiset_sock is not None
    myset = load_ipset(ipset_name, sock=wiset_sock, inherit_sock=inherit_sock)
    assert myset.comment

    myset.add("192.168.1.1", comment=comment)
    myset.update_content()

    assert myset.content["192.168.1.1/32"].comment == comment


def test_add_ipstats(ipset_name, wiset_sock):
    data = IPStats(
        packets=10,
        bytes=1000,
        comment="hello world",
        skbmark="0x10/0x10",
        timeout=None,
    )
    myset = WiSet(
        name=ipset_name,
        attr_type="hash:net",
        comment=True,
        skbinfo=True,
        counters=True,
        sock=wiset_sock,
    )
    myset.create()
    myset.add("198.51.100.0/24", **data._asdict())

    assert "198.51.100.0/24" in myset.content
    assert data == myset.content["198.51.100.0/24"]


def test_revision(ipset_name, wiset_sock):
    myset = WiSet(name=ipset_name, attr_type="hash:net", sock=wiset_sock)

    myset.create()
    assert load_ipset(ipset_name, sock=wiset_sock).revision >= 6


def test_force_attr_revision(ipset_name, wiset_sock):
    sock = get_ipset_socket(attr_revision=2)

    myset = WiSet(name=ipset_name, attr_type="hash:net", sock=wiset_sock)
    myset.create()
    assert load_ipset(ipset_name, sock=wiset_sock).revision >= 2
    sock.close()


def test_physdev(ipset_name):
    myset = WiSet(name=ipset_name, attr_type="hash:net,iface")
    myset.create()
    myset.add("192.168.0.0/24,eth0", physdev=False)
    myset.add("192.168.1.0/24,eth0", physdev=True)

    content = myset.content

    assert content["192.168.0.0/24,eth0"].physdev is False
    assert content["192.168.1.0/24,eth0"].physdev is True


def test_wildcard_entries(ipset_name):
    require_kernel(5, 5)
    myset = WiSet(name=ipset_name, attr_type="hash:net,iface")
    myset.create()
    myset.add("192.168.0.0/24,eth", wildcard=True)
    myset.add("192.168.1.0/24,wlan0", wildcard=False)

    content = myset.content

    assert content["192.168.0.0/24,eth"].wildcard is True
    assert content["192.168.1.0/24,wlan0"].wildcard is False


def test_invalid_load_ipset():
    assert load_ipset("ipsetdoesnotexists") is None