1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
|
import logging
from ZeekControl import config
errmsg = ""
try:
import broker
import broker.zeek
except ImportError as e:
broker = None
errmsg = e
# Broker communication with running nodes.
# Sends event to a set of nodes in parallel.
#
# events is a list of tuples of the form (node, event, args, result_event).
# node: the destination node.
# event: the name of the event to send (node that receiver must subscribe
# to it as well).
# args: a list of event args; each arg must be a data type understood by
# the Broker module.
# result_event: name of a event the node sends back. None if no event is
# sent back.
#
# Returns a list of tuples (node, success, results_args).
# If success is True, result_args is a list of arguments as shipped with the
# result event, or [] if no result_event was specified.
# If success is False, results_args is a string with an error message.
def send_events_parallel(events, topic):
results = []
sent = []
for (node, event, args, result_event) in events:
if not broker:
results += [(node, False, "Python bindings for Broker: %s" % errmsg)]
continue
success, endpoint, sub = _send_event_init(node, event, args, result_event, topic)
if success and result_event:
sent += [(node, result_event, endpoint, sub)]
else:
results += [(node, success, endpoint, sub)]
for (node, result_event, endpoint, sub) in sent:
success, result_args = _send_event_wait(node, result_event, endpoint, sub)
endpoint.shutdown()
results += [(node, success, result_args)]
return results
def _send_event_init(node, event, args, result_event, topic):
host = node.addr
endpoint = broker.Endpoint()
subscriber = endpoint.make_subscriber(topic)
status_subscriber = endpoint.make_status_subscriber(True)
endpoint.peer(host, node.getPort(), 1)
tries = 0
while True:
msgs = status_subscriber.get(1, 1)
for msg in msgs:
if isinstance(msg, broker.Status):
if msg.code() == broker.SC.PeerAdded:
ev = broker.zeek.Event(event, *args)
endpoint.publish(topic + "/" + repr(msg.context()), ev)
logging.debug("broker: %s(%s) to node %s", event,
", ".join(args), node.name)
return (True, endpoint, subscriber)
tries += 1
if tries > config.Config.commtimeout:
return (False, "time-out", None)
def _send_event_wait(node, result_event, bc, sub):
if not result_event:
return (True, [])
# Wait for reply event.
tries = 0
while True:
msgs = sub.get(1, 1)
for msg in msgs:
(topic, event) = msg
ev = broker.zeek.Event(event)
args = ev.args()
logging.debug("broker: %s(%s) from node %s", result_event,
", ".join(args), node.name)
return (True, args)
tries += 1
if tries > config.Config.commtimeout:
logging.debug("broker: timeout during receive from node %s", node.name)
return (False, "time-out")
|