1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175
|
"""Implementation of the "broker" IPC component"""
import argparse
import os
import zmq
import gitubuntu.importer_service as importer_service
from gitubuntu.importer_service_ipc import WorkerRequest, WorkerReply
def assign_work(service_state, available_workers):
"""Assign available workers to outstanding work
:param dict(Any, str) available_workers: which workers are currently
available for work. Key: identity; value: zmq_address.
:returns: sequence of work assignments as (identity, zmq_address, request)
triples.
:rtype: sequence(tuple(str, Any, WorkerRequest))
"""
assignments = list(zip(
available_workers.items(),
service_state.get_pending_imports(),
))
for (identity, zmq_address), request in assignments:
service_state.assign_ipc_worker(identity, request)
return [
(
identity,
zmq_address,
WorkerRequest(request.package, request.level.value),
)
for (identity, zmq_address), request
in assignments
]
def handle_reply(service_state, reply):
"""Handle a reply from a worker
Update service state with the result of an import request that a worker has
finished with. This may be nothing, in the case of a worker that has just
started and is just requesting work. Otherwise, the worker's reply tells us
whether the import request was successful or failed.
:param importer_service.State service_state: the state object of the
importer service.
:param importer_service_ipc.WorkerReply reply: the reply from a worker.
:returns: None
"""
if reply.previous_result is None:
assert reply.previous_package is None
assert reply.previous_level is None
return
# Some assertions to check types on reply as it came in JSON-serialized
assert isinstance(reply.previous_result, bool)
assert isinstance(reply.previous_package, str)
assert isinstance(reply.previous_level, int)
request = importer_service.ImportRequest(
package=reply.previous_package,
level=importer_service.ImportRequest.Level(reply.previous_level),
)
completed_requests = []
failed_requests = []
if reply.previous_result:
completed_requests.append(request)
else:
failed_requests.append(request)
# We only update the status of the request if the worker is telling us
# about the request we have recorded as having given it. It's possible that
# the worker is reporting work from an old identity and we've already moved
# on from that, so in that case we just ignore the worker's work and rely
# on the cleanup done later using service_state.clear_ipc_worker() to get
# back to a good state.
package, level, reckoning_time = service_state.lookup_ipc_worker(
reply.identity,
)
if all([
package == reply.previous_package,
level.value == reply.previous_level,
]):
# We're lacking good monitoring/observability here. In the long term,
# this is the place we'd like to insert into something like InfluxDB
# the success/failure together with some statistics like the import
# time taken, which could be sent by the worker in its reply. In the
# past, before we had queuing, we'd email failures for investigation.
# See gitubuntu/mailer.py and gitubuntu/mailer_test.py prior to commit
# 7ef484a for the old code, and
# https://code.launchpad.net/~racb/git-ubuntu/+git/usd-importer/+merge/425379
# for discussion.
service_state.mark_import_outcomes(
reckoning_time=reckoning_time,
completed_requests=completed_requests,
failed_requests=failed_requests,
)
def main_loop(worker_socket, service_state):
"""Main loop for the importer service broker process
:param zmq.Socket worker_socket: the ZeroMQ socket to use to speak to
workers.
:param importer_service.State service_state: the state object of the
importer service.
"""
available_workers = dict()
while True:
# A systemd watchdog is deliberately not used here. I know of no case
# where the broker should ever hang. If it does, it's a bug that I want
# to identify and fix, rather than have it papered over by systemd in
# production. This is in contrast to the watchdog kicks in components
# that speak to Launchpad, where launchpadlib is known to hang for an
# unknown reason and the watchdog is our pragmatic workaround.
zmq_address, _, encoded_reply = worker_socket.recv_multipart()
reply = WorkerReply.from_bytes(encoded_reply)
handle_reply(service_state, reply)
service_state.clear_ipc_worker(reply.identity)
available_workers[reply.identity] = zmq_address
assignments = assign_work(service_state, available_workers)
for identity, zmq_address, work in assignments:
worker_socket.send_multipart([zmq_address, b'', work.to_bytes()])
del available_workers[identity]
def main(data_directory, worker_broker_socket_address):
"""Entry point for the importer service broker process
:param str data_directory: the path where persistent data on the status of
the import may be stored.
:param str worker_broker_socket_address: the ZeroMQ endpoint to use to
speak to workers, specified in a manner acceptable to libzmq.
"""
service_state = importer_service.State(os.path.join(data_directory, 'db'))
zmq_context = zmq.Context.instance()
worker_socket = zmq_context.socket(zmq.ROUTER)
worker_socket.bind(worker_broker_socket_address)
main_loop(worker_socket, service_state) # never returns
def parse_args(subparsers=None, base_subparsers=None):
kwargs = {
'description': "Coordinate importer service workers",
'formatter_class': argparse.ArgumentDefaultsHelpFormatter,
}
if base_subparsers:
kwargs['parents'] = base_subparsers
if subparsers:
parser = subparsers.add_parser('importer-service-broker', **kwargs)
parser.set_defaults(func=cli_main)
else:
parser = argparse.ArgumentParser(**kwargs)
parser.add_argument(
'--data-directory',
help="Directory to store persistent data",
default='/var/local/git-ubuntu',
)
parser.add_argument(
'worker_broker_socket_address',
type=str,
help=(
"the ZeroMQ endpoint to use to speak to the workers (eg. "
"tcp://localhost:5555 or ipc:///run/git-ubuntu/socket)"
),
)
if not subparsers:
return parser.parse_args()
return 'import-service-broker - %s' % kwargs['description']
def cli_main(args):
"""CLI entry point for the importer service broker process"""
main(
data_directory=args.data_directory,
worker_broker_socket_address=args.worker_broker_socket_address,
)
|