File: importer_service_broker.py

package info (click to toggle)
git-ubuntu 1.1-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,688 kB
  • sloc: python: 13,378; sh: 480; makefile: 2
file content (175 lines) | stat: -rw-r--r-- 7,019 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
"""Implementation of the "broker" IPC component"""
import argparse
import os

import zmq

import gitubuntu.importer_service as importer_service
from gitubuntu.importer_service_ipc import WorkerRequest, WorkerReply

def assign_work(service_state, available_workers):
    """Assign available workers to outstanding work

    :param dict(Any, str) available_workers: which workers are currently
        available for work. Key: identity; value: zmq_address.
    :returns: sequence of work assignments as (identity, zmq_address, request)
        triples.
    :rtype: sequence(tuple(str, Any, WorkerRequest))
    """
    assignments = list(zip(
        available_workers.items(),
        service_state.get_pending_imports(),
    ))
    for (identity, zmq_address), request in assignments:
        service_state.assign_ipc_worker(identity, request)
    return [
        (
            identity,
            zmq_address,
            WorkerRequest(request.package, request.level.value),
        )
        for (identity, zmq_address), request
        in assignments
    ]


def handle_reply(service_state, reply):
    """Handle a reply from a worker

    Update service state with the result of an import request that a worker has
    finished with. This may be nothing, in the case of a worker that has just
    started and is just requesting work. Otherwise, the worker's reply tells us
    whether the import request was successful or failed.

    :param importer_service.State service_state: the state object of the
        importer service.
    :param importer_service_ipc.WorkerReply reply: the reply from a worker.
    :returns: None
    """
    if reply.previous_result is None:
        assert reply.previous_package is None
        assert reply.previous_level is None
        return

    # Some assertions to check types on reply as it came in JSON-serialized
    assert isinstance(reply.previous_result, bool)
    assert isinstance(reply.previous_package, str)
    assert isinstance(reply.previous_level, int)

    request = importer_service.ImportRequest(
        package=reply.previous_package,
        level=importer_service.ImportRequest.Level(reply.previous_level),
    )
    completed_requests = []
    failed_requests = []
    if reply.previous_result:
        completed_requests.append(request)
    else:
        failed_requests.append(request)

    # We only update the status of the request if the worker is telling us
    # about the request we have recorded as having given it. It's possible that
    # the worker is reporting work from an old identity and we've already moved
    # on from that, so in that case we just ignore the worker's work and rely
    # on the cleanup done later using service_state.clear_ipc_worker() to get
    # back to a good state.
    package, level, reckoning_time = service_state.lookup_ipc_worker(
        reply.identity,
    )
    if all([
        package == reply.previous_package,
        level.value == reply.previous_level,
    ]):
        # We're lacking good monitoring/observability here. In the long term,
        # this is the place we'd like to insert into something like InfluxDB
        # the success/failure together with some statistics like the import
        # time taken, which could be sent by the worker in its reply. In the
        # past, before we had queuing, we'd email failures for investigation.
        # See gitubuntu/mailer.py and gitubuntu/mailer_test.py prior to commit
        # 7ef484a for the old code, and
        # https://code.launchpad.net/~racb/git-ubuntu/+git/usd-importer/+merge/425379
        # for discussion.
        service_state.mark_import_outcomes(
            reckoning_time=reckoning_time,
            completed_requests=completed_requests,
            failed_requests=failed_requests,
        )

def main_loop(worker_socket, service_state):
    """Main loop for the importer service broker process

    :param zmq.Socket worker_socket: the ZeroMQ socket to use to speak to
        workers.
    :param importer_service.State service_state: the state object of the
        importer service.
    """
    available_workers = dict()

    while True:
        # A systemd watchdog is deliberately not used here. I know of no case
        # where the broker should ever hang. If it does, it's a bug that I want
        # to identify and fix, rather than have it papered over by systemd in
        # production. This is in contrast to the watchdog kicks in components
        # that speak to Launchpad, where launchpadlib is known to hang for an
        # unknown reason and the watchdog is our pragmatic workaround.

        zmq_address, _, encoded_reply = worker_socket.recv_multipart()
        reply = WorkerReply.from_bytes(encoded_reply)
        handle_reply(service_state, reply)
        service_state.clear_ipc_worker(reply.identity)
        available_workers[reply.identity] = zmq_address
        assignments = assign_work(service_state, available_workers)
        for identity, zmq_address, work in assignments:
            worker_socket.send_multipart([zmq_address, b'', work.to_bytes()])
            del available_workers[identity]

def main(data_directory, worker_broker_socket_address):
    """Entry point for the importer service broker process

    :param str data_directory: the path where persistent data on the status of
        the import may be stored.
    :param str worker_broker_socket_address: the ZeroMQ endpoint to use to
        speak to workers, specified in a manner acceptable to libzmq.
    """
    service_state = importer_service.State(os.path.join(data_directory, 'db'))
    zmq_context = zmq.Context.instance()
    worker_socket = zmq_context.socket(zmq.ROUTER)
    worker_socket.bind(worker_broker_socket_address)
    main_loop(worker_socket, service_state)  # never returns

def parse_args(subparsers=None, base_subparsers=None):
    kwargs = {
        'description': "Coordinate importer service workers",
        'formatter_class': argparse.ArgumentDefaultsHelpFormatter,
    }
    if base_subparsers:
        kwargs['parents'] = base_subparsers
    if subparsers:
        parser = subparsers.add_parser('importer-service-broker', **kwargs)
        parser.set_defaults(func=cli_main)
    else:
        parser = argparse.ArgumentParser(**kwargs)

    parser.add_argument(
        '--data-directory',
        help="Directory to store persistent data",
        default='/var/local/git-ubuntu',
    )
    parser.add_argument(
        'worker_broker_socket_address',
        type=str,
        help=(
            "the ZeroMQ endpoint to use to speak to the workers (eg. "
            "tcp://localhost:5555 or ipc:///run/git-ubuntu/socket)"
        ),
    )
    if not subparsers:
        return parser.parse_args()
    return 'import-service-broker - %s' % kwargs['description']

def cli_main(args):
    """CLI entry point for the importer service broker process"""
    main(
        data_directory=args.data_directory,
        worker_broker_socket_address=args.worker_broker_socket_address,
    )