1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
|
"""Implementation of the "worker" IPC component"""
import argparse
import subprocess
import time
import systemd.daemon
import zmq
from gitubuntu.importer_service import ImportRequest
from gitubuntu.importer_service_ipc import WorkerRequest, WorkerReply
from gitubuntu.run import run
class BrokerTimeout(RuntimeError): pass
def zmq_recv_with_timeout(poller, socket, timeout):
"""Block to receive a complete ZeroMQ message with a timeout
:param zmq.Poller poller: the ZeroMQ Poller that is set up to block for
data to be available the socket.
:param zmq.Socket socket: the ZeroMQ socket on which to receive the
message.
:param int timeout: how long to block before timing out, in milliseconds.
:raises BrokerTimeout: if the call times out.
:rtype: bytes
:returns: the ZeroMQ message received
"""
while True:
events = poller.poll(timeout)
if not events:
raise BrokerTimeout()
try:
return socket.recv(zmq.NOBLOCK)
except zmq.error.Again:
continue
def import_srcpkg(
request,
dry_run,
lp_user,
lp_owner,
push=True,
set_as_default_repository=True,
):
"""Invoke git ubuntu import on an import request
:param ImportRequest request: the package request, as returned
by importer_service.State.get_pending_imports().
:param bool dry_run: a boolean to indicate a dry-run operation
:param str lp_user: the LP user to authenticate against for the import, as
passed to "git ubuntu import -l"
:param str lp_owner: the LP user or team holding the target imported
repository, as passed to "git ubuntu import -o"
:param bool push: if the import should push to Launchpad when done. This
defaults to True because it is the purpose of this function, but False
can be used to avoid --push being passed to "git ubuntu import",
resulting in a deeper dry run than just --dry-run.
:param bool set_as_default_repository: True to set the repository in
Launchpad as default for its target in Launchpad after push. This only
has any effect if :py:attr:`push` is also True. This results in "git
ubuntu import" being called with "--set-as-default-repository".
:rtype: tuple(ImportRequest, bool)
:returns: the original request together with a boolean describing the
success (True) or failure (False) of the import.
"""
# try up to 3 times before declaring failure, in case of
# racing with the publisher finalizing files and/or
# transient download failure
for attempt in range(3):
cmd = [
'git',
'ubuntu',
'import',
]
if push:
cmd.append('--push')
if set_as_default_repository:
cmd.append('--set-as-default-repository')
if lp_user:
cmd.extend(['-l', lp_user])
if lp_owner:
cmd.extend(['-o', lp_owner])
if request.level == request.level.REIMPORT:
cmd.append('--reimport')
cmd.append(request.package)
try:
print(' '.join(cmd))
if not dry_run:
run(cmd, check=True)
return request, True
except subprocess.CalledProcessError:
print(
"Failed to import %s (attempt %d/3)" % (
request.package,
attempt+1,
)
)
time.sleep(10)
return request, False
def main_loop(
poller,
broker_socket,
broker_timeout,
identity,
dry_run,
lp_user,
lp_owner,
push,
set_as_default_repository,
):
"""Main loop for the importer service worker processes
:param zmq.Poller poller: the ZeroMQ Poller that is set up to block for
data to be available on broker_socket.
:param zmq.Socket: the ZeroMQ socket to use to speak to the broker.
:param int broker_timeout: time to wait for work from the broker before
raising BrokerTimeout.
:param bool dry_run: True if we should just print the underlying "git
ubuntu import" call instead of actually calling it; False otherwise.
:param str lp_user: the LP user to authenticate against for the imports, as
passed to "git ubuntu import -l"
:param str lp_owner: the LP user or team holding the target imported
repositories, as passed to "git ubuntu import -o"
:param bool push: True to confirm that the import should be pushed to
Launchpad when complete. If False, then this results in "git ubuntu
import" being called without --push, resulting in a deeper dry run.
:param bool set_as_default_repository: True to set the repository in
Launchpad as default for its target in Launchpad after push. This only
has any effect if :py:attr:`push` is also True. This results in "git
ubuntu import" being called with "--set-as-default-repository".
:raises BrokerTimeout: if we time out waiting on work from the broker (see
broker_timeout).
"""
# A reply is both the result of the import just performed and a request for
# further work. In our first "reply" the result is empty (None) - so we're
# just requesting further work.
reply = WorkerReply(
identity=identity,
previous_package=None,
previous_level=None,
previous_result=None,
)
while True:
# Notify systemd that we're still alive. For now we are not
# deliberately kicking the watchdog in an inner loop, so that the
# watchdog timer can be used as a poor man's import-takes-too-long
# timeout to allow us to batch process reimports and initial imports
# faster.
systemd.daemon.notify('WATCHDOG=1')
# This is the zeromq "load balancing pattern". The worker sends a
# request for work, sending the previous result as part of the request.
broker_socket.send(reply.to_bytes())
# Receive and decode the work request
encoded_ipc_request = zmq_recv_with_timeout(
poller=poller,
socket=broker_socket,
timeout=broker_timeout,
)
ipc_request = WorkerRequest.from_bytes(encoded_ipc_request)
request = ImportRequest(
package=ipc_request.package,
level=ImportRequest.Level(ipc_request.level),
)
# Do the work
_, result = import_srcpkg(
request=request,
dry_run=dry_run,
lp_user=lp_user,
lp_owner=lp_owner,
push=push,
set_as_default_repository=set_as_default_repository,
)
# Construct the result. This doubles as the request for further work.
reply = WorkerReply(
identity=identity,
previous_package=request.package,
previous_level=request.level.value,
previous_result=result,
)
def main(
identity,
worker_broker_socket_address,
broker_timeout,
dry_run,
lp_user,
lp_owner,
push,
set_as_default_repository,
):
"""Entry point for the importer service worker processes
:param str identity: the unique identity of this worker as an arbitrary
string.
:param str worker_broker_socket_address: the ZeroMQ endpoint to use to
speak to the broker, specified in a manner acceptable to libzmq.
:param int broker_timeout: time to wait for work from the broker before
considering the request for work to be lost, recreating the ZeroMQ
socket and trying again. Specified in milliseconds.
:param bool dry_run: True if we should just print the underlying "git
ubuntu import" call instead of actually calling it; False otherwise.
:param str lp_user: the LP user to authenticate against for the imports, as
passed to "git ubuntu import -l"
:param str lp_owner: the LP user or team holding the target imported
repositories, as passed to "git ubuntu import -o"
:param bool push: True to confirm that the import should be pushed to
Launchpad when complete. If False, then this results in "git ubuntu
import" being called without --push, resulting in a deeper dry run.
:param bool set_as_default_repository: True to set the repository in
Launchpad as default for its target in Launchpad after push. This only
has any effect if :py:attr:`push` is also True. This results in "git
ubuntu import" being called with "--set-as-default-repository".
"""
zmq_context = zmq.Context.instance()
while True:
# Initialize system notification
systemd.daemon.notify('READY=1')
broker_socket = zmq_context.socket(zmq.REQ)
broker_socket.connect(worker_broker_socket_address)
poller = zmq.Poller()
poller.register(broker_socket, zmq.POLLIN)
try:
main_loop(
poller=poller,
broker_socket=broker_socket,
broker_timeout=broker_timeout,
identity=identity,
dry_run=dry_run,
lp_user=lp_user,
lp_owner=lp_owner,
push=push,
set_as_default_repository=set_as_default_repository,
)
except BrokerTimeout:
# A request for work to the broker is now considered potentially
# lost as we haven't heard back. We will close and reopen the
# socket and start again, sending a new request for work by
# starting the main loop again. This may be normal if there is no
# work available, in which case this operation just serves as a
# regular heartbeat.
# If we don't turn off LINGER then the underlying file descriptor
# won't necessarily get freed which eventually exhausts the file
# descriptor limit.
broker_socket.setsockopt(zmq.LINGER, 0)
broker_socket.close()
continue
break
def parse_args(subparsers=None, base_subparsers=None):
kwargs = {
'description': "Work on imports as instructed by a broker",
'formatter_class': argparse.ArgumentDefaultsHelpFormatter,
}
if base_subparsers:
kwargs['parents'] = base_subparsers
if subparsers:
parser = subparsers.add_parser('importer-service-worker', **kwargs)
parser.set_defaults(func=cli_main)
else:
parser = argparse.ArgumentParser(**kwargs)
parser.add_argument(
'--broker-timeout',
type=int,
default=60000,
help=(
"How long to block on waiting for work from the broker before "
"asking again (in milliseconds)"
),
)
parser.add_argument('--dry-run', action='store_true')
parser.add_argument(
'-l',
'--lp-user',
type=str,
help=(
"The Launchpad user to authenticate against for the imports, as "
"passed to 'git ubuntu import -l'"
),
default='git-ubuntu-bot',
)
parser.add_argument(
'-o',
'--lp-owner',
type=str,
help=(
"The Launchpad user or team that will own the git repository "
"being imported, as passed to 'git ubuntu import -o'"
),
)
parser.add_argument(
'identity',
type=str,
help="the unique identity of this worker as an arbitrary string",
)
parser.add_argument(
'worker_broker_socket_address',
type=str,
help=(
"the ZeroMQ endpoint to use to speak to the broker (eg. "
"tcp://localhost:5555 or ipc:///run/git-ubuntu/socket)"
),
)
parser.add_argument(
'--no-push',
action='store_true',
help='Do not push to the remote',
)
parser.add_argument(
'--no-set-as-default-repository',
action='store_true',
help='Do not set as default repository for its target after push',
)
if not subparsers:
return parser.parse_args()
return 'import-service-broker - %s' % kwargs['description']
def cli_main(args):
"""CLI entry point for the importer service worker processes"""
main(
identity=args.identity,
worker_broker_socket_address=args.worker_broker_socket_address,
broker_timeout=args.broker_timeout,
dry_run=args.dry_run,
lp_user=args.lp_user,
lp_owner=args.lp_owner,
push=not args.no_push,
set_as_default_repository=not args.no_set_as_default_repository,
)
|