File: dispatcher-master.py

package info (click to toggle)
lava-server 2016.12-3
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 28,728 kB
  • sloc: python: 34,417; sh: 902; makefile: 298; xml: 154
file content (685 lines) | stat: -rw-r--r-- 29,411 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
# Copyright (C) 2015 Linaro Limited
#
# Author: Remi Duraffort <remi.duraffort@linaro.org>
#
# This file is part of LAVA Dispatcher.
#
# LAVA Dispatcher is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# LAVA Dispatcher is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along
# with this program; if not, see <http://www.gnu.org/licenses>.

# pylint: disable=wrong-import-order

import errno
import sys
import fcntl
import jinja2
import logging
import lzma
import os
import signal
import time
import yaml
import zmq
import zmq.auth
from zmq.auth.thread import ThreadAuthenticator

from django.core.management.base import BaseCommand
from django.db import transaction
from django.db.models import Q
from django.db.utils import OperationalError, InterfaceError
from lava_scheduler_app.models import TestJob
from lava_scheduler_app.utils import mkdir
from lava_scheduler_app.dbutils import (
    create_job, start_job,
    fail_job, cancel_job,
    parse_job_description,
    select_device,
)
from lava_results_app.dbutils import map_scanned_results, create_metadata_store


# pylint: disable=no-member,too-many-branches,too-many-statements,too-many-locals

# Current version of the protocol
# The slave does send the protocol version along with the HELLO and HELLO_RETRY
# messages. If both version are not identical, the connection is refused by the
# master.
PROTOCOL_VERSION = 1
# TODO constants to move into external files
FD_TIMEOUT = 60
TIMEOUT = 10
DB_LIMIT = 10

# TODO: share this value with dispatcher-slave
# This should be 3 times the slave ping timeout
DISPATCHER_TIMEOUT = 3 * 10


class SlaveDispatcher(object):  # pylint: disable=too-few-public-methods

    def __init__(self, hostname, online=False):
        self.hostname = hostname
        self.last_msg = time.time() if online else 0
        self.online = online

    def alive(self):
        self.last_msg = time.time()


class FileHandler(object):  # pylint: disable=too-few-public-methods

    def __init__(self, name):
        self.filename = name
        self.fd = open(name, 'a+')  # pylint: disable=invalid-name
        self.last_usage = time.time()

    def close(self):
        self.fd.close()


def get_env_string(filename):
    """
    Returns the string after checking for YAML errors which would cause issues later.
    """
    logger = logging.getLogger('dispatcher-master')
    try:
        with open(filename, "r") as f_in:
            env_str = f_in.read()
        yaml.safe_load(env_str)
        return env_str
    except IOError as exc:
        # This is ok if the file does not exist
        if exc.errno == errno.ENOENT:
            return ''
        raise
    except yaml.YAMLError as exc:
        # Raise an IOError because the caller uses yaml.YAMLError for a
        # specific usage. Allows here to specify the faulty filename.
        raise IOError("", "Not a valid YAML file", filename)


class Command(BaseCommand):
    """
    worker_host is the hostname of the worker this field is set by the admin
    and could therefore be empty in a misconfigured instance.
    """
    logger = None
    help = "LAVA dispatcher master"

    def __init__(self, *args, **options):
        super(Command, self).__init__(*args, **options)
        self.pull_socket = None
        self.controler = None
        # List of logs
        self.logs = {}
        # List of known dispatchers. At startup do not load this from the
        # database. This will help to know if the slave as restarted or not.
        self.dispatchers = {}
        self.logging_support()

    def add_arguments(self, parser):
        parser.add_argument('--master-socket',
                            default='tcp://*:5556',
                            help="Socket for master-slave communication. Default: tcp://*:5556")
        parser.add_argument('--log-socket',
                            default='tcp://*:5555',
                            help="Socket waiting for logs. Default: tcp://*:5555")
        parser.add_argument('--encrypt', default=False, action='store_true',
                            help="Encrypt messages")
        parser.add_argument('--master-cert',
                            default='/etc/lava-dispatcher/certificates.d/master.key_secret',
                            help="Certificate for the master socket")
        parser.add_argument('--slaves-certs',
                            default='/etc/lava-dispatcher/certificates.d',
                            help="Directory for slaves certificates")
        parser.add_argument('-l', '--level',
                            default='DEBUG',
                            help="Logging level (ERROR, WARN, INFO, DEBUG) Default: DEBUG")
        parser.add_argument('--templates',
                            default="/etc/lava-server/dispatcher-config/",
                            help="Base directory for device configuration templates. "
                                 "Default: /etc/lava-server/dispatcher-config/")
        # Important: ensure share/env.yaml is put into /etc/ by setup.py in packaging.
        parser.add_argument('--env',
                            default="/etc/lava-server/env.yaml",
                            help="Environment variables for the dispatcher processes. "
                                 "Default: /etc/lava-server/env.yaml")
        parser.add_argument('--env-dut',
                            default="/etc/lava-server/env.dut.yaml",
                            help="Environment variables for device under test. "
                                 "Default: /etc/lava-server/env.dut.yaml")
        parser.add_argument('--output-dir',
                            default='/var/lib/lava-server/default/media/job-output',
                            help="Directory where to store job outputs. "
                                 "Default: /var/lib/lava-server/default/media/job-output")

    def send_status(self, hostname):
        """
        The master crashed, send a STATUS message to get the current state of jobs
        """
        jobs = TestJob.objects.filter(actual_device__worker_host__hostname=hostname,
                                      is_pipeline=True,
                                      status=TestJob.RUNNING)
        for job in jobs:
            self.logger.info("[%d] STATUS => %s (%s)", job.id, hostname,
                             job.actual_device.hostname)
            self.controler.send_multipart([hostname, 'STATUS', str(job.id)])

    def dispatcher_alive(self, hostname):
        if hostname not in self.dispatchers:
            # The server crashed: send a STATUS message
            self.logger.warning("Unknown dispatcher <%s> (server crashed)", hostname)
            self.dispatchers[hostname] = SlaveDispatcher(hostname, online=True)
            self.send_status(hostname)

        # Mark the dispatcher as alive
        self.dispatchers[hostname].alive()

    def logging_socket(self, options):
        msg = self.pull_socket.recv_multipart()
        try:
            (job_id, level, name, message) = msg
        except ValueError:
            # do not let a bad message stop the master.
            self.logger.error("Failed to parse log message, skipping: %s", msg)
            return

        try:
            scanned = yaml.load(message)
        except yaml.YAMLError:
            self.logger.error("[%s] data are not valid YAML, dropping", job_id)
            return

        # Look for "results" level
        try:
            message_lvl = scanned["lvl"]
            message_msg = scanned["msg"]
        except KeyError:
            self.logger.error(
                "[%s] Invalid log line, missing \"lvl\" or \"msg\" keys: %s",
                job_id, message)
            return

        # Clear filename
        if '/' in level or '/' in name:
            self.logger.error("[%s] Wrong level or name received, dropping the message", job_id)
            return
        filename = "%s/job-%s/pipeline/%s/%s-%s.yaml" % (options['output_dir'],
                                                         job_id, level.split('.')[0],
                                                         level, name)

        # Find the handler (if available)
        if job_id in self.logs:
            if filename != self.logs[job_id].filename:
                # Close the old file handler
                self.logs[job_id].close()
                mkdir(os.path.dirname(filename))
                self.logs[job_id] = FileHandler(filename)
        else:
            self.logger.info("[%s] Receiving logs from a new job", job_id)
            mkdir(os.path.dirname(filename))
            self.logs[job_id] = FileHandler(filename)

        if message_lvl == "results":
            try:
                job = TestJob.objects.get(pk=job_id)
            except TestJob.DoesNotExist:
                self.logger.error("[%s] Unknown job id", job_id)
                return
            meta_filename = create_metadata_store(message_msg, job, level)
            ret = map_scanned_results(results=message_msg, job=job, meta_filename=meta_filename)
            if not ret:
                self.logger.warning(
                    "[%s] Unable to map scanned results: %s",
                    job_id, message)

        # Mark the file handler as used
        # TODO: try to use a more pythonnic way
        self.logs[job_id].last_usage = time.time()

        # n.b. logging here would produce a log entry for every message in every job.
        # The format is a list of dictionaries
        message = "- %s" % message

        # Write data
        f_handler = self.logs[job_id].fd
        f_handler.write(message)
        f_handler.write('\n')
        f_handler.flush()

        # TODO: keep the file handler to avoid calling open for each line
        filename = os.path.join(options['output_dir'],
                                "job-%s" % job_id,
                                'output.yaml')
        with open(filename, 'a+') as f_out:
            f_out.write(message)
            f_out.write('\n')

    def controler_socket(self):
        msg = self.controler.recv_multipart()
        # This is way to verbose for production and should only be activated
        # by (and for) developers
        # self.logger.debug("[CC] Receiving: %s", msg)

        # 1: the hostname (see ZMQ documentation)
        hostname = msg[0]
        # 2: the action
        action = msg[1]
        # Handle the actions
        if action == 'HELLO' or action == 'HELLO_RETRY':
            self.logger.info("%s => %s", hostname, action)

            # Check the protocol version
            try:
                slave_version = int(msg[2])
            except (IndexError, ValueError):
                self.logger.error("Invalid message from <%s> '%s'", hostname, msg)
                return False
            if slave_version != PROTOCOL_VERSION:
                self.logger.error("<%s> using protocol v%d while master is using v%d",
                                  hostname, slave_version, PROTOCOL_VERSION)
                return False

            self.controler.send_multipart([hostname, 'HELLO_OK'])
            # If the dispatcher is known and sent an HELLO, means that
            # the slave has restarted
            if hostname in self.dispatchers:
                if action == 'HELLO':
                    self.logger.warning("Dispatcher <%s> has RESTARTED",
                                        hostname)
                else:
                    # Assume the HELLO command was received, and the
                    # action succeeded.
                    self.logger.warning("Dispatcher <%s> was not confirmed",
                                        hostname)
            else:
                # No dispatcher, treat HELLO and HELLO_RETRY as a normal HELLO
                # message.
                self.logger.warning("New dispatcher <%s>", hostname)
                self.dispatchers[hostname] = SlaveDispatcher(hostname, online=True)

            if action == 'HELLO':
                # FIXME: slaves need to be allowed to restart cleanly without affecting jobs
                # as well as handling unexpected crashes.
                self._cancel_slave_dispatcher_jobs(hostname)

            # Mark the dispatcher as alive
            self.dispatchers[hostname].alive()

        elif action == 'PING':
            self.logger.debug("%s => PING", hostname)
            # Send back a signal
            self.controler.send_multipart([hostname, 'PONG'])
            self.dispatcher_alive(hostname)

        elif action == 'END':
            try:
                job_id = int(msg[2])
                job_status = int(msg[3])
                error_msg = msg[4]
                description = msg[5]
            except (IndexError, ValueError):
                self.logger.error("Invalid message from <%s> '%s'", hostname, msg)
                return False
            if job_status:
                status = TestJob.INCOMPLETE
                self.logger.info("[%d] %s => END with error %d", job_id, hostname, job_status)
                self.logger.error("[%d] Error: %s", job_id, error_msg)
            else:
                status = TestJob.COMPLETE
                self.logger.info("[%d] %s => END", job_id, hostname)

            # Find the corresponding job and update the status
            try:
                with transaction.atomic():
                    job = TestJob.objects.select_for_update().get(id=job_id)
                    if job.status == TestJob.CANCELING:
                        cancel_job(job)
                    fail_job(job, fail_msg=error_msg, job_status=status)

                # Save the description
                filename = os.path.join(job.output_dir, 'description.yaml')
                try:
                    with open(filename, 'w') as f_description:
                        f_description.write(lzma.decompress(description))
                except (IOError, lzma.error) as exc:
                    self.logger.error("[%d] Unable to dump 'description.yaml'",
                                      job_id)
                    self.logger.exception(exc)
                parse_job_description(job)

            except TestJob.DoesNotExist:
                self.logger.error("[%d] Unknown job", job_id)
            # ACK even if the job is unknown to let the dispatcher
            # forget about it
            self.controler.send_multipart([hostname, 'END_OK', str(job_id)])
            self.dispatcher_alive(hostname)

        elif action == 'START_OK':
            try:
                job_id = int(msg[2])
            except (IndexError, ValueError):
                self.logger.error("Invalid message from <%s> '%s'", hostname, msg)
                return False
            self.logger.info("[%d] %s => START_OK", job_id, hostname)
            try:
                with transaction.atomic():
                    job = TestJob.objects.select_for_update() \
                                         .get(id=job_id)
                    start_job(job)
            except TestJob.DoesNotExist:
                self.logger.error("[%d] Unknown job", job_id)

            self.dispatcher_alive(hostname)

        else:
            self.logger.error("<%s> sent unknown action=%s, args=(%s)",
                              hostname, action, msg[1:])
        return True

    def _cancel_slave_dispatcher_jobs(self, hostname):
        """Get dispatcher jobs and cancel them.

        :param hostname: The name of the dispatcher host.
        :type hostname: string
        """
        # TODO: DB: mark the dispatcher as online in the database.
        # For the moment this should not be done by this process as
        # some dispatchers are using old and new dispatcher.

        # Mark all jobs on this dispatcher as canceled.
        # The dispatcher had (re)started, so all jobs have to be
        # finished.
        with transaction.atomic():
            jobs = TestJob.objects.filter(
                actual_device__worker_host__hostname=hostname,
                is_pipeline=True,
                status=TestJob.RUNNING).select_for_update()

            for job in jobs:
                self.logger.info("[%d] Canceling", job.id)
                cancel_job(job)

    def export_definition(self, job):  # pylint: disable=no-self-use
        job_def = yaml.load(job.definition)
        job_def['compatibility'] = job.pipeline_compatibility

        # no need for the dispatcher to retain comments
        return str(yaml.dump(job_def))

    def process_jobs(self, options):
        for job in TestJob.objects.filter(
                Q(status=TestJob.SUBMITTED) & Q(is_pipeline=True) & ~Q(actual_device=None))\
                .order_by('-health_check', '-priority', 'submit_time', 'target_group', 'id'):
            if job.dynamic_connection:
                # A secondary connection must be made from a dispatcher local to the host device
                # to allow for local firewalls etc. So the secondary connection is started on the
                # remote worker of the "nominated" host.
                # FIXME:
                device = None
                worker_host = job.lookup_worker
                self.logger.info("[%d] START => %s (connection)", job.id,
                                 worker_host.hostname)
            else:
                device = select_device(job, self.dispatchers)
                if not device:
                    return
                # selecting device can change the job
                job = TestJob.objects.get(id=job.id)
                self.logger.info("[%d] Assigning %s device", job.id, device)
                if job.actual_device is None:
                    device = job.requested_device
                    if not device.worker_host:
                        msg = "Infrastructure error: Invalid worker information"
                        self.logger.error("[%d] %s", job.id, msg)
                        fail_job(job, msg, TestJob.INCOMPLETE)
                        return

                    # Launch the job
                    create_job(job, device)
                    self.logger.info("[%d] START => %s (%s)", job.id,
                                     device.worker_host.hostname, device.hostname)
                    worker_host = device.worker_host
                else:
                    device = job.actual_device
                    if not device.worker_host:
                        msg = "Infrastructure error: Invalid worker information"
                        self.logger.error("[%d] %s", job.id, msg)
                        fail_job(job, msg, TestJob.INCOMPLETE)
                        return
                    self.logger.info("[%d] START => %s (%s) (retrying)", job.id,
                                     device.worker_host.hostname, device.hostname)
                    worker_host = device.worker_host
            try:
                # Load job definition to get the variables for template
                # rendering
                job_def = yaml.load(job.definition)
                job_ctx = job_def.get('context', {})

                # Load device configuration
                device_configuration = '' \
                    if job.dynamic_connection else device.load_device_configuration(job_ctx)

                env_str = get_env_string(options['env'])
                env_dut_str = get_env_string(options['env_dut'])

                if job.is_multinode:
                    for group_job in job.sub_jobs_list:
                        if group_job.dynamic_connection:
                            # to get this far, the rest of the multinode group must also be ready
                            # so start the dynamic connections
                            # FIXME: rationalise and streamline
                            self.controler.send_multipart(
                                [str(worker_host.hostname),
                                 'START', str(group_job.id),
                                 self.export_definition(group_job),
                                 str(device_configuration),
                                 env_str, env_dut_str])

                self.controler.send_multipart(
                    [str(worker_host.hostname),
                     'START', str(job.id),
                     self.export_definition(job),
                     str(device_configuration),
                     env_str, env_dut_str])
                return

            except jinja2.TemplateNotFound as exc:
                self.logger.error("[%d] Template not found: '%s'",
                                  job.id, exc.message)
                msg = "Infrastructure error: Template not found: '%s'" % \
                      exc.message
            except jinja2.TemplateSyntaxError as exc:
                self.logger.error("[%d] Template syntax error in '%s', line %d: %s",
                                  job.id, exc.name, exc.lineno, exc.message)
                msg = "Infrastructure error: Template syntax error in '%s', line %d: %s" % \
                      (exc.name, exc.lineno, exc.message)
            except IOError as exc:
                self.logger.error("[%d] Unable to read '%s': %s",
                                  job.id, exc.filename, exc.strerror)
                msg = "Infrastructure error: cannot open '%s': %s" % \
                      (exc.filename, exc.strerror)
            except yaml.YAMLError as exc:
                self.logger.error("[%d] Unable to parse job definition: %s",
                                  job.id, exc)
                msg = "Infrastructure error: cannot parse job definition: %s" % \
                      exc
            else:
                self.logger.exception(exc)
                msg = "Infrastructure error: %s" % exc.message

            self.logger.error("[%d] INCOMPLETE job", job.id)
            fail_job(job=job, fail_msg=msg, job_status=TestJob.INCOMPLETE)

    def handle_canceling(self):
        for job in TestJob.objects.filter(status=TestJob.CANCELING, is_pipeline=True):
            worker_host = job.lookup_worker if job.dynamic_connection else job.actual_device.worker_host
            if not worker_host:
                self.logger.warning("[%d] Invalid worker information", job.id)
                # shouldn't happen
                fail_job(job, 'invalid worker information', TestJob.CANCELED)
                continue
            self.logger.info("[%d] CANCEL => %s", job.id,
                             worker_host.hostname)
            self.controler.send_multipart([str(worker_host.hostname),
                                           'CANCEL', str(job.id)])

    def logging_support(self):
        del logging.root.handlers[:]
        del logging.root.filters[:]
        # Create the logger
        log_format = '%(asctime)-15s %(levelname)s %(message)s'
        logging.basicConfig(format=log_format, filename='/var/log/lava-server/lava-master.log')
        self.logger = logging.getLogger('dispatcher-master')

    def handle(self, *args, **options):
        if options['level'] == 'ERROR':
            self.logger.setLevel(logging.ERROR)
        elif options['level'] == 'WARN':
            self.logger.setLevel(logging.WARN)
        elif options['level'] == 'INFO':
            self.logger.setLevel(logging.INFO)
        else:
            self.logger.setLevel(logging.DEBUG)

        # Create the sockets
        context = zmq.Context()
        self.pull_socket = context.socket(zmq.PULL)
        self.controler = context.socket(zmq.ROUTER)

        if options['encrypt']:
            self.logger.info("Starting encryption")
            try:
                auth = ThreadAuthenticator(context)
                auth.start()
                self.logger.debug("Opening master certificate: %s", options['master_cert'])
                master_public, master_secret = zmq.auth.load_certificate(options['master_cert'])
                self.logger.debug("Using slaves certificates from: %s", options['slaves_certs'])
                auth.configure_curve(domain='*', location=options['slaves_certs'])
            except IOError as err:
                self.logger.error(err)
                auth.stop()
                return
            self.controler.curve_publickey = master_public
            self.controler.curve_secretkey = master_secret
            self.controler.curve_server = True
            self.pull_socket.curve_publickey = master_public
            self.pull_socket.curve_secretkey = master_secret
            self.pull_socket.curve_server = True

        self.pull_socket.bind(options['log_socket'])
        self.controler.bind(options['master_socket'])

        # Last access to the database for new jobs and cancelations
        last_db_access = 0

        # Poll on the sockets (only one for the moment). This allow to have a
        # nice timeout along with polling.
        poller = zmq.Poller()
        poller.register(self.pull_socket, zmq.POLLIN)
        poller.register(self.controler, zmq.POLLIN)

        # Mask signals and create a pipe that will receive a bit for each
        # signal received. Poll the pipe along with the zmq socket so that we
        # can only be interupted while reading data.
        (pipe_r, pipe_w) = os.pipe()
        flags = fcntl.fcntl(pipe_w, fcntl.F_GETFL, 0)
        fcntl.fcntl(pipe_w, fcntl.F_SETFL, flags | os.O_NONBLOCK)

        def signal_to_pipe(signum, frame):
            # Send the signal number on the pipe
            os.write(pipe_w, chr(signum))

        signal.signal(signal.SIGHUP, signal_to_pipe)
        signal.signal(signal.SIGINT, signal_to_pipe)
        signal.signal(signal.SIGTERM, signal_to_pipe)
        signal.signal(signal.SIGQUIT, signal_to_pipe)
        poller.register(pipe_r, zmq.POLLIN)

        if os.path.exists('/etc/lava-server/worker.conf'):
            self.logger.error("[FAIL] lava-master must not be run on a remote worker!")
            self.controler.close(linger=0)
            self.pull_socket.close(linger=0)
            context.term()
            sys.exit(2)

        self.logger.info("[INIT] LAVA dispatcher-master has started.")

        while True:
            try:
                try:
                    # TODO: Fix the timeout computation
                    # Wait for data or a timeout
                    sockets = dict(poller.poll(TIMEOUT * 1000))
                except zmq.error.ZMQError:
                    continue

                if sockets.get(pipe_r) == zmq.POLLIN:
                    signum = ord(os.read(pipe_r, 1))
                    if signum == signal.SIGHUP:
                        self.logger.info("[POLL] SIGHUP received, restarting loggers")
                        self.logging_support()
                    else:
                        self.logger.info("[POLL] Received a signal, leaving")
                        break

                # Logging socket
                if sockets.get(self.pull_socket) == zmq.POLLIN:
                    self.logging_socket(options)

                # Garbage collect file handlers
                now = time.time()
                for job_id in self.logs.keys():
                    if now - self.logs[job_id].last_usage > FD_TIMEOUT:
                        self.logger.info("[%s] Closing log file", job_id)
                        self.logs[job_id].close()
                        del self.logs[job_id]

                # Command socket
                if sockets.get(self.controler) == zmq.POLLIN:
                    if not self.controler_socket():
                        continue

                # Check dispatchers status
                now = time.time()
                for hostname in self.dispatchers.keys():
                    dispatcher = self.dispatchers[hostname]
                    if dispatcher.online and now - dispatcher.last_msg > DISPATCHER_TIMEOUT:
                        self.logger.error("[STATE] Dispatcher <%s> goes OFFLINE", hostname)
                        self.dispatchers[hostname].online = False
                        # TODO: DB: mark the dispatcher as offline and attached
                        # devices

                # Limit accesses to the database. This will also limit the rate of
                # CANCEL and START messages
                if now - last_db_access > DB_LIMIT:
                    last_db_access = now

                    # TODO: make this atomic
                    # Dispatch pipeline jobs with devices in Reserved state
                    self.process_jobs(options)

                    # Handle canceling jobs
                    self.handle_canceling()
            except (OperationalError, InterfaceError):
                self.logger.info("[RESET] database connection reset.")
                continue

        # Closing sockets and droping messages.
        self.logger.info("[CLOSE] Closing the sockets and dropping messages")
        self.controler.close(linger=0)
        self.pull_socket.close(linger=0)
        if options['encrypt']:
            auth.stop()
        context.term()