File: nftables_sets.py

package info (click to toggle)
pyroute2 0.8.1-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,700 kB
  • sloc: python: 50,245; makefile: 280; javascript: 183; ansic: 81; sh: 44; awk: 17
file content (53 lines) | stat: -rw-r--r-- 1,522 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import time

from pyroute2.netlink.nfnetlink.nftsocket import NFPROTO_IPV4
from pyroute2.nftables.main import NFTables
from pyroute2.nftables.main import NFTSetElem


def test_ipv4_addr_set():
    with NFTables(nfgen_family=NFPROTO_IPV4) as nft:
        nft.table("add", name="filter")
        my_set = nft.sets("add", table="filter", name="test0", key_type="ipv4_addr",
                          comment="my test set", timeout=0)

        # With str
        nft.set_elems(
            "add",
            table="filter",
            set="test0",
            elements={"10.2.3.4", "10.4.3.2"},
        )

        # With NFTSet & NFTSetElem classes
        nft.set_elems(
            "add",
            set=my_set,
            elements={NFTSetElem(value="9.9.9.9", timeout=1000)},
        )

        try:
            assert {e.value for e in nft.set_elems("get", table="filter", set="test0")} == {
                "10.2.3.4",
                "10.4.3.2",
                "9.9.9.9",
            }
            assert nft.sets("get", table="filter", name="test0").comment == b"my test set"

            time.sleep(1.2)
            # timeout for elem 9.9.9.9 (1000ms)
            assert {e.value for e in nft.set_elems("get", table="filter", set="test0")} == {
                "10.2.3.4",
                "10.4.3.2",
            }
        finally:
            nft.sets("del", table="filter", name="test0")
            nft.table("del", name="filter")


def main():
    test_ipv4_addr_set()


if __name__ == "__main__":
    main()