File: test_WebHooks.py

package info (click to toggle)
weakforced 3.0.0-4
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 3,196 kB
  • sloc: cpp: 20,397; python: 2,002; sh: 700; makefile: 432
file content (86 lines) | stat: -rw-r--r-- 3,647 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import requests
import mmap
import re
import time
import os
import json
from test_helper import ApiTestCase

class TestWebHooks(ApiTestCase):

    def test_webhooks(self):
        self.writeCmdToConsole("addWebHook(events, ck)")
        self.reportFunc('webhooktest', '1.4.3.1', '1234', False)
        self.allowFunc('webhooktest', '1.4.3.2', '1234')
        self.resetFunc('webhooktest', '1.4.3.3'); 
        self.addBLEntryLogin('webhooktest', 10, 'This is not a reason')
        time.sleep(11)
        self.addBLEntryLogin('webhooktest', 10, 'This is not a reason')
        self.delBLEntryLogin('webhooktest')
        time.sleep(5)
        logfile = open('/tmp/webhook-server.log', 'r')
        s = mmap.mmap(logfile.fileno(), 0, access=mmap.ACCESS_READ)
        search_str = s.read(s.size()).decode()
        for event in [ 'report', 'allow', 'reset', 'addbl', 'delbl', 'expirebl' ]:
            regex = r"digest_match=True, event=" + re.escape(event)
            result = re.search(regex, search_str);
            self.assertNotEqual(result, None)
        s.close()
        logfile.close()

    def test_mtls_webhook(self):
        self.writeCmdToConsole("addWebHook(events, mtls_ck)")
        self.reportFunc('mtls_webhook_test', '1.4.3.1', '1234', False)
        self.allowFunc('mtls_webhook_test2', '1.4.3.2', '1234')
        self.resetFunc('mtls_webhook_test2', '1.4.3.3');
        self.addBLEntryLogin('mtls_webhook_test2', 10, 'This is not a reason')
        self.addBLEntryLogin('mtls_webhook_test2', 10, 'This is not a reason')
        self.delBLEntryLogin('mtls_webhook_test2')
        time.sleep(4)
        logfile = open('/var/log/nginx/access.log', 'r')
        s = mmap.mmap(logfile.fileno(), 0, access=mmap.ACCESS_READ)
        search_str = s.read(s.size()).decode()
        self.assertIn('"POST / HTTP/1.1" 200', search_str)
        s.close()
        logfile.close()

    def test_kafka_webhooks(self):
        self.writeCmdToConsole("addWebHook(events, kafka_ck)")
        self.reportFunc('kafkawebhooktest', '4.4.3.1', '1234', False)
        self.reportFunc('kafkawebhooktest', '4.4.3.1', '1234', False)
        time.sleep(5)
        r = self.kafkaProducer()
        j = r.json()
        print(json.dumps(j))
        self.assertGreater(j['offsets'][0]['offset'], 0)
        
    def test_customwebhooks(self):
        self.writeCmdToConsole("addCustomWebHook(\"customwebhook\", ck)")
        r = self.customFunc("custom1")
        j = r.json()
        self.assertEqual(j['r_attrs']['login'], 'custom1')
        time.sleep(5)
        logfile = open('/tmp/webhook-server.log', 'r')
        s = mmap.mmap(logfile.fileno(), 0, access=mmap.ACCESS_READ)
        search_str = s.read(s.size()).decode()
        for event in [ 'customwebhook' ]:
            regex = r"digest_match=True, event=" + re.escape(event)
            result = re.search(regex, search_str);
            self.assertNotEqual(result, None)
        s.close()
        logfile.close()

    def test_namedreportsinks(self):
        self.writeCmdToConsole("addNamedReportSink(\"trackalert\", \"127.0.0.1:4502\")")
        r = self.reportFunc('namedreportsink', '1.2.3.4', '1234', False)
        r = self.customFunc("customargs")
        time.sleep(1)
        logfile = open('/tmp/udp-sink.log', 'r')
        s = mmap.mmap(logfile.fileno(), 0, access=mmap.ACCESS_READ)
        search_str = s.read(s.size()).decode()
        for event in [ 'namedreportsink', 'customargs' ]:
            regex = r"\"login\": \"" + re.escape(event)
            result = re.search(regex, search_str);
            self.assertNotEqual(result, None)
        s.close()
        logfile.close()