File: test_stream_decision_client.py

package info (click to toggle)
python-pycrowdsec 0.0.5-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 332 kB
  • sloc: python: 879; sh: 6; makefile: 4
file content (99 lines) | stat: -rw-r--r-- 3,320 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import ipaddress
import threading
import unittest

from pycrowdsec.client import StreamDecisionClient


class TestStreamDecisionClient(unittest.TestCase):
    def setUp(self):
        self.client = StreamDecisionClient("abcd")

    def test_process_response(self):
        response = {
            "deleted": [
                {
                    "duration": "-40h37m10.022674981s",
                    "id": 1,
                    "origin": "cscli",
                    "scenario": "manual 'ban' from 'b436842423d302bb11cb6f1160d6cb30q9EGL7irEAzdUu1z'",
                    "scope": "Ip",
                    "type": "ban",
                    "value": "18.22.10.20",
                },
                {
                    "duration": "-37m7.335622172s",
                    "id": 97,
                    "origin": "CAPI",
                    "scenario": "crowdsecurity/http-crawl-non_statics",
                    "scope": "Ip",
                    "type": "ban",
                    "value": "185.220.101.204",
                },
            ],
            "new": [
                {
                    "duration": "-37m7.335622172s",
                    "id": 97,
                    "origin": "CAPI",
                    "scenario": "crowdsecurity/http-crawl-non_statics",
                    "scope": "Ip",
                    "type": "ban",
                    "value": "18.22.10.20",
                },
            ],
        }
        self.client.process_response(response)
        assert len(list(self.client.get_new_decision())) == 1
        assert len(list(self.client.get_deleted_decision())) == 2

        assert len(list(self.client.get_new_decision())) == 0
        assert len(list(self.client.get_deleted_decision())) == 0

    def test_empty(self):
        assert self.client.new_decisions.empty() == True
        assert self.client.deleted_decisions.empty() == True

        for _ in self.client.get_deleted_decision():
            pass

        for _ in self.client.get_new_decision():
            pass

    def test_read_write_race(self):
        response = {
            "deleted": [
                {
                    "duration": "-40h37m10.022674981s",
                    "id": 1,
                    "origin": "cscli",
                    "scenario": "manual 'ban' from 'b436842423d302bb11cb6f1160d6cb30q9EGL7irEAzdUu1z'",
                    "scope": "Ip",
                    "type": "ban",
                    "value": str(ipaddress.IPv4Address(v)),
                }
                for v in range(100)
            ],
            "new": [
                {
                    "duration": "-37m7.335622172s",
                    "id": 97,
                    "origin": "CAPI",
                    "scenario": "crowdsecurity/http-crawl-non_statics",
                    "scope": "Ip",
                    "type": "ban",
                    "value": str(ipaddress.IPv4Address(v)),
                }
                for v in range(100)
            ],
        }

        def response_filler():
            for _ in range(100):
                self.client.process_response(response)

        t = threading.Thread(target=response_filler)
        t.start()
        for _ in range(1000):
            list(self.client.get_deleted_decision())
            list(self.client.get_new_decision())