File: importer_service_worker.py

package info (click to toggle)
git-ubuntu 1.1-2
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 1,688 kB
  • sloc: python: 13,378; sh: 480; makefile: 2
file content (339 lines) | stat: -rw-r--r-- 12,671 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
"""Implementation of the "worker" IPC component"""
import argparse
import subprocess
import time

import systemd.daemon
import zmq

from gitubuntu.importer_service import ImportRequest
from gitubuntu.importer_service_ipc import WorkerRequest, WorkerReply
from gitubuntu.run import run

class BrokerTimeout(RuntimeError): pass

def zmq_recv_with_timeout(poller, socket, timeout):
    """Block to receive a complete ZeroMQ message with a timeout

    :param zmq.Poller poller: the ZeroMQ Poller that is set up to block for
        data to be available the socket.
    :param zmq.Socket socket: the ZeroMQ socket on which to receive the
        message.
    :param int timeout: how long to block before timing out, in milliseconds.
    :raises BrokerTimeout: if the call times out.
    :rtype: bytes
    :returns: the ZeroMQ message received
    """
    while True:
        events = poller.poll(timeout)
        if not events:
            raise BrokerTimeout()
        try:
            return socket.recv(zmq.NOBLOCK)
        except zmq.error.Again:
            continue

def import_srcpkg(
    request,
    dry_run,
    lp_user,
    lp_owner,
    push=True,
    set_as_default_repository=True,
):
    """Invoke git ubuntu import on an import request

    :param ImportRequest request: the package request, as returned
        by importer_service.State.get_pending_imports().
    :param bool dry_run: a boolean to indicate a dry-run operation
    :param str lp_user: the LP user to authenticate against for the import, as
        passed to "git ubuntu import -l"
    :param str lp_owner: the LP user or team holding the target imported
        repository, as passed to "git ubuntu import -o"
    :param bool push: if the import should push to Launchpad when done. This
        defaults to True because it is the purpose of this function, but False
        can be used to avoid --push being passed to "git ubuntu import",
        resulting in a deeper dry run than just --dry-run.
    :param bool set_as_default_repository: True to set the repository in
        Launchpad as default for its target in Launchpad after push. This only
        has any effect if :py:attr:`push` is also True. This results in "git
        ubuntu import" being called with "--set-as-default-repository".

    :rtype: tuple(ImportRequest, bool)
    :returns: the original request together with a boolean describing the
        success (True) or failure (False) of the import.
    """
    # try up to 3 times before declaring failure, in case of
    # racing with the publisher finalizing files and/or
    # transient download failure

    for attempt in range(3):
        cmd = [
            'git',
            'ubuntu',
            'import',
        ]
        if push:
            cmd.append('--push')
            if set_as_default_repository:
                cmd.append('--set-as-default-repository')
        if lp_user:
            cmd.extend(['-l', lp_user])
        if lp_owner:
            cmd.extend(['-o', lp_owner])
        if request.level == request.level.REIMPORT:
           cmd.append('--reimport')
        cmd.append(request.package)
        try:
            print(' '.join(cmd))
            if not dry_run:
                run(cmd, check=True)
            return request, True
        except subprocess.CalledProcessError:
            print(
                "Failed to import %s (attempt %d/3)" % (
                    request.package,
                    attempt+1,
                )
            )
            time.sleep(10)

    return request, False

def main_loop(
    poller,
    broker_socket,
    broker_timeout,
    identity,
    dry_run,
    lp_user,
    lp_owner,
    push,
    set_as_default_repository,
):
    """Main loop for the importer service worker processes

    :param zmq.Poller poller: the ZeroMQ Poller that is set up to block for
        data to be available on broker_socket.
    :param zmq.Socket: the ZeroMQ socket to use to speak to the broker.
    :param int broker_timeout: time to wait for work from the broker before
        raising BrokerTimeout.
    :param bool dry_run: True if we should just print the underlying "git
        ubuntu import" call instead of actually calling it; False otherwise.
    :param str lp_user: the LP user to authenticate against for the imports, as
        passed to "git ubuntu import -l"
    :param str lp_owner: the LP user or team holding the target imported
        repositories, as passed to "git ubuntu import -o"
    :param bool push: True to confirm that the import should be pushed to
        Launchpad when complete. If False, then this results in "git ubuntu
        import" being called without --push, resulting in a deeper dry run.
    :param bool set_as_default_repository: True to set the repository in
        Launchpad as default for its target in Launchpad after push. This only
        has any effect if :py:attr:`push` is also True. This results in "git
        ubuntu import" being called with "--set-as-default-repository".
    :raises BrokerTimeout: if we time out waiting on work from the broker (see
        broker_timeout).
    """
    # A reply is both the result of the import just performed and a request for
    # further work. In our first "reply" the result is empty (None) - so we're
    # just requesting further work.
    reply = WorkerReply(
        identity=identity,
        previous_package=None,
        previous_level=None,
        previous_result=None,
    )
    while True:
        # Notify systemd that we're still alive. For now we are not
        # deliberately kicking the watchdog in an inner loop, so that the
        # watchdog timer can be used as a poor man's import-takes-too-long
        # timeout to allow us to batch process reimports and initial imports
        # faster.
        systemd.daemon.notify('WATCHDOG=1')

        # This is the zeromq "load balancing pattern". The worker sends a
        # request for work, sending the previous result as part of the request.
        broker_socket.send(reply.to_bytes())

        # Receive and decode the work request
        encoded_ipc_request = zmq_recv_with_timeout(
            poller=poller,
            socket=broker_socket,
            timeout=broker_timeout,
        )
        ipc_request = WorkerRequest.from_bytes(encoded_ipc_request)
        request = ImportRequest(
            package=ipc_request.package,
            level=ImportRequest.Level(ipc_request.level),
        )

        # Do the work
        _, result = import_srcpkg(
            request=request,
            dry_run=dry_run,
            lp_user=lp_user,
            lp_owner=lp_owner,
            push=push,
            set_as_default_repository=set_as_default_repository,
        )

        # Construct the result. This doubles as the request for further work.
        reply = WorkerReply(
            identity=identity,
            previous_package=request.package,
            previous_level=request.level.value,
            previous_result=result,
        )

def main(
    identity,
    worker_broker_socket_address,
    broker_timeout,
    dry_run,
    lp_user,
    lp_owner,
    push,
    set_as_default_repository,
):
    """Entry point for the importer service worker processes

    :param str identity: the unique identity of this worker as an arbitrary
        string.
    :param str worker_broker_socket_address: the ZeroMQ endpoint to use to
        speak to the broker, specified in a manner acceptable to libzmq.
    :param int broker_timeout: time to wait for work from the broker before
        considering the request for work to be lost, recreating the ZeroMQ
        socket and trying again. Specified in milliseconds.
    :param bool dry_run: True if we should just print the underlying "git
        ubuntu import" call instead of actually calling it; False otherwise.
    :param str lp_user: the LP user to authenticate against for the imports, as
        passed to "git ubuntu import -l"
    :param str lp_owner: the LP user or team holding the target imported
        repositories, as passed to "git ubuntu import -o"
    :param bool push: True to confirm that the import should be pushed to
        Launchpad when complete. If False, then this results in "git ubuntu
        import" being called without --push, resulting in a deeper dry run.
    :param bool set_as_default_repository: True to set the repository in
        Launchpad as default for its target in Launchpad after push. This only
        has any effect if :py:attr:`push` is also True. This results in "git
        ubuntu import" being called with "--set-as-default-repository".
    """
    zmq_context = zmq.Context.instance()
    while True:
        # Initialize system notification
        systemd.daemon.notify('READY=1')

        broker_socket = zmq_context.socket(zmq.REQ)
        broker_socket.connect(worker_broker_socket_address)
        poller = zmq.Poller()
        poller.register(broker_socket, zmq.POLLIN)
        try:
            main_loop(
                poller=poller,
                broker_socket=broker_socket,
                broker_timeout=broker_timeout,
                identity=identity,
                dry_run=dry_run,
                lp_user=lp_user,
                lp_owner=lp_owner,
                push=push,
                set_as_default_repository=set_as_default_repository,
            )
        except BrokerTimeout:
            # A request for work to the broker is now considered potentially
            # lost as we haven't heard back. We will close and reopen the
            # socket and start again, sending a new request for work by
            # starting the main loop again. This may be normal if there is no
            # work available, in which case this operation just serves as a
            # regular heartbeat.

            # If we don't turn off LINGER then the underlying file descriptor
            # won't necessarily get freed which eventually exhausts the file
            # descriptor limit.
            broker_socket.setsockopt(zmq.LINGER, 0)
            broker_socket.close()
            continue
        break

def parse_args(subparsers=None, base_subparsers=None):
    kwargs = {
        'description': "Work on imports as instructed by a broker",
        'formatter_class': argparse.ArgumentDefaultsHelpFormatter,
    }
    if base_subparsers:
        kwargs['parents'] = base_subparsers
    if subparsers:
        parser = subparsers.add_parser('importer-service-worker', **kwargs)
        parser.set_defaults(func=cli_main)
    else:
        parser = argparse.ArgumentParser(**kwargs)

    parser.add_argument(
        '--broker-timeout',
        type=int,
        default=60000,
        help=(
            "How long to block on waiting for work from the broker before "
            "asking again (in milliseconds)"
        ),
    )
    parser.add_argument('--dry-run', action='store_true')
    parser.add_argument(
        '-l',
        '--lp-user',
        type=str,
        help=(
            "The Launchpad user to authenticate against for the imports, as "
            "passed to 'git ubuntu import -l'"
        ),
        default='git-ubuntu-bot',
    )
    parser.add_argument(
        '-o',
        '--lp-owner',
        type=str,
        help=(
            "The Launchpad user or team that will own the git repository "
            "being imported, as passed to 'git ubuntu import -o'"
        ),
    )
    parser.add_argument(
        'identity',
        type=str,
        help="the unique identity of this worker as an arbitrary string",
    )
    parser.add_argument(
        'worker_broker_socket_address',
        type=str,
        help=(
            "the ZeroMQ endpoint to use to speak to the broker (eg. "
            "tcp://localhost:5555 or ipc:///run/git-ubuntu/socket)"
        ),
    )
    parser.add_argument(
        '--no-push',
        action='store_true',
        help='Do not push to the remote',
    )
    parser.add_argument(
        '--no-set-as-default-repository',
        action='store_true',
        help='Do not set as default repository for its target after push',
    )

    if not subparsers:
        return parser.parse_args()
    return 'import-service-broker - %s' % kwargs['description']

def cli_main(args):
    """CLI entry point for the importer service worker processes"""
    main(
        identity=args.identity,
        worker_broker_socket_address=args.worker_broker_socket_address,
        broker_timeout=args.broker_timeout,
        dry_run=args.dry_run,
        lp_user=args.lp_user,
        lp_owner=args.lp_owner,
        push=not args.no_push,
        set_as_default_repository=not args.no_set_as_default_repository,
    )