File: events.py

package info (click to toggle)
zeekctl 2.2.0%2Bds1-2
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 2,544 kB
  • sloc: python: 5,639; sh: 1,374; makefile: 71; awk: 24
file content (107 lines) | stat: -rw-r--r-- 3,283 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import logging

from ZeekControl import config

errmsg = ""

try:
    import broker
    import broker.zeek
except ImportError as e:
    broker = None
    errmsg = e

# Broker communication with running nodes.

# Sends event to a set of nodes in parallel.
#
# events is a list of tuples of the form (node, event, args, result_event).
#   node:    the destination node.
#   event:   the name of the event to send (node that receiver must subscribe
#            to it as well).
#   args:    a list of event args; each arg must be a data type understood by
#            the Broker module.
#   result_event: name of a event the node sends back. None if no event is
#                 sent back.
#
# Returns a list of tuples (node, success, results_args).
#   If success is True, result_args is a list of arguments as shipped with the
#   result event, or [] if no result_event was specified.
#   If success is False, results_args is a string with an error message.

def send_events_parallel(events, topic):

    results = []
    sent = []

    for (node, event, args, result_event) in events:

        if not broker:
            results += [(node, False, "Python bindings for Broker: %s" % errmsg)]
            continue

        success, endpoint, sub = _send_event_init(node, event, args, result_event, topic)

        if success and result_event:
            sent += [(node, result_event, endpoint, sub)]
        else:
            results += [(node, success, endpoint, sub)]

    for (node, result_event, endpoint, sub) in sent:
        success, result_args = _send_event_wait(node, result_event, endpoint, sub)
        endpoint.shutdown()
        results += [(node, success, result_args)]

    return results

def _send_event_init(node, event, args, result_event, topic):

    host = node.addr
    endpoint = broker.Endpoint()
    subscriber = endpoint.make_subscriber(topic)
    status_subscriber = endpoint.make_status_subscriber(True)
    endpoint.peer(host, node.getPort(), 1)

    tries = 0

    while True:
        msgs = status_subscriber.get(1, 1)

        for msg in msgs:
            if isinstance(msg, broker.Status):
                if msg.code() == broker.SC.PeerAdded:
                    ev = broker.zeek.Event(event, *args)
                    endpoint.publish(topic + "/" + repr(msg.context()), ev)
                    logging.debug("broker: %s(%s) to node %s", event,
                                  ", ".join(args), node.name)
                    return (True, endpoint, subscriber)

        tries += 1

        if tries > config.Config.commtimeout:
            return (False, "time-out", None)

def _send_event_wait(node, result_event, bc, sub):
    if not result_event:
        return (True, [])

    # Wait for reply event.
    tries = 0

    while True:
        msgs = sub.get(1, 1)

        for msg in msgs:
            (topic, event) = msg
            ev = broker.zeek.Event(event)
            args = ev.args()
            logging.debug("broker: %s(%s) from node %s", result_event,
                          ", ".join(args), node.name)
            return (True, args)

        tries += 1

        if tries > config.Config.commtimeout:
            logging.debug("broker: timeout during receive from node %s", node.name)
            return (False, "time-out")