File: interchange.py

package info (click to toggle)
python-parsl 2025.12.01%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky
  • size: 12,112 kB
  • sloc: python: 24,369; makefile: 352; sh: 252; ansic: 45
file content (645 lines) | stat: -rw-r--r-- 26,585 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
#!/usr/bin/env python
import datetime
import logging
import os
import pickle
import platform
import sys
import threading
import time
from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, cast

import zmq
from sortedcontainers import SortedList

from parsl import curvezmq
from parsl.addresses import tcp_url
from parsl.app.errors import RemoteExceptionWrapper
from parsl.executors.high_throughput.errors import ManagerLost, VersionMismatch
from parsl.executors.high_throughput.manager_record import ManagerRecord
from parsl.executors.high_throughput.manager_selector import ManagerSelector
from parsl.monitoring.message_type import MessageType
from parsl.monitoring.radios.base import MonitoringRadioSender
from parsl.monitoring.radios.zmq import ZMQRadioSender
from parsl.process_loggers import wrap_with_logs
from parsl.serialize import serialize as serialize_object
from parsl.version import VERSION as PARSL_VERSION

PKL_HEARTBEAT_CODE = pickle.dumps((2 ** 32) - 1)
PKL_DRAINED_CODE = pickle.dumps((2 ** 32) - 2)

LOGGER_NAME = "interchange"
logger = logging.getLogger(LOGGER_NAME)


class Interchange:
    """ Interchange is a task orchestrator for distributed systems.

    1. Asynchronously queue large volume of tasks (>100K)
    2. Allow for workers to join and leave the union
    3. Detect workers that have failed using heartbeats
    """
    def __init__(self,
                 *,
                 client_address: str,
                 interchange_address: Optional[str],
                 client_ports: Tuple[int, int, int],
                 worker_port: Optional[int],
                 worker_port_range: Tuple[int, int],
                 hub_address: Optional[str],
                 hub_zmq_port: Optional[int],
                 heartbeat_threshold: int,
                 logdir: str,
                 logging_level: int,
                 poll_period: int,
                 cert_dir: Optional[str],
                 manager_selector: ManagerSelector,
                 run_id: str,
                 _check_python_mismatch: bool,
                 ) -> None:
        """
        Parameters
        ----------
        client_address : str
             The ip address at which the parsl client can be reached. Default: "127.0.0.1"

        interchange_address : Optional str
             If specified the interchange will only listen on this address for connections from workers
             else, it binds to all addresses.

        client_ports : tuple(int, int, int)
             The ports at which the client can be reached

        worker_port : int
             The specific port to which workers will connect to the Interchange.

        worker_port_range : tuple(int, int)
             The interchange picks ports at random from the range which will be used by workers.
             This is overridden when the worker_port option is set.

        hub_address : str
             The IP address at which the interchange can send info about managers to when monitoring is enabled.
             When None, monitoring is disabled.

        hub_zmq_port : str
             The port at which the interchange can send info about managers to when monitoring is enabled.
             When None, monitoring is disabled.

        heartbeat_threshold : int
             Number of seconds since the last heartbeat after which worker is considered lost.

        logdir : str
             Parsl log directory paths. Logs and temp files go here.

        logging_level : int
             Logging level as defined in the logging module.

        poll_period : int
             The main thread polling period, in milliseconds.

        cert_dir : str | None
            Path to the certificate directory.

        _check_python_mismatch : bool
            If True, the interchange and worker managers must run the same version of
            Python. Running different versions can cause inter-process communication
            errors, so proceed with caution.
        """
        self.cert_dir = cert_dir
        self.logdir = logdir
        os.makedirs(self.logdir, exist_ok=True)

        start_file_logger("{}/interchange.log".format(self.logdir), level=logging_level)
        logger.debug("Initializing Interchange process")

        self.client_address = client_address
        self.interchange_address: str = interchange_address or "*"
        self.poll_period = poll_period

        logger.info("Attempting connection to client at {} on ports: {},{},{}".format(
            client_address, client_ports[0], client_ports[1], client_ports[2]))
        self.zmq_context = curvezmq.ServerContext(self.cert_dir)
        self.task_incoming = self.zmq_context.socket(zmq.DEALER)
        self.task_incoming.set_hwm(0)
        self.task_incoming.connect(tcp_url(client_address, client_ports[0]))
        self.results_outgoing = self.zmq_context.socket(zmq.DEALER)
        self.results_outgoing.set_hwm(0)
        self.results_outgoing.connect(tcp_url(client_address, client_ports[1]))

        self.command_channel = self.zmq_context.socket(zmq.REP)
        self.command_channel.connect(tcp_url(client_address, client_ports[2]))
        logger.info("Connected to client")

        self.run_id = run_id
        self._check_python_mismatch = _check_python_mismatch

        self.hub_address = hub_address
        self.hub_zmq_port = hub_zmq_port

        self.pending_task_queue: SortedList[Any] = SortedList(key=lambda tup: (tup[0], tup[1]))

        # count of tasks that have been sent out to worker pools
        self.count = 0

        self.manager_sock = self.zmq_context.socket(zmq.ROUTER)
        self.manager_sock.set_hwm(0)

        if worker_port:
            task_addy = tcp_url(self.interchange_address, worker_port)
            self.manager_sock.bind(task_addy)

        else:
            worker_port = self.manager_sock.bind_to_random_port(
                tcp_url(self.interchange_address),
                min_port=worker_port_range[0],
                max_port=worker_port_range[1],
                max_tries=100,
            )
        self.worker_port = worker_port

        logger.info(f"Bound to port {worker_port} for incoming worker connections")

        self._ready_managers: Dict[bytes, ManagerRecord] = {}
        self._logged_manager_count_token: object = None
        self.connected_block_history: List[str] = []

        self.heartbeat_threshold = heartbeat_threshold

        self.manager_selector = manager_selector

        self.current_platform = {'parsl_v': PARSL_VERSION,
                                 'python_v': "{}.{}.{}".format(sys.version_info.major,
                                                               sys.version_info.minor,
                                                               sys.version_info.micro),
                                 'os': platform.system(),
                                 'hostname': platform.node(),
                                 'dir': os.getcwd()}

        logger.info("Platform info: {}".format(self.current_platform))

    def get_tasks(self, count: int) -> Sequence[dict]:
        """ Obtains a batch of tasks from the internal pending_task_queue

        Parameters
        ----------
        count: int
            Count of tasks to get from the queue

        Returns
        -------
        List of upto count tasks. May return fewer than count down to an empty list
            eg. [{'task_id':<x>, 'buffer':<buf>} ... ]
        """
        tasks = []
        try:
            for _ in range(count):
                *_, task = self.pending_task_queue.pop()
                tasks.append(task)
        except IndexError:
            pass

        return tasks

    def _send_monitoring_info(self, monitoring_radio: Optional[MonitoringRadioSender], manager: ManagerRecord) -> None:
        if monitoring_radio:
            logger.info("Sending message {} to MonitoringHub".format(manager))

            d: Dict = cast(Dict, manager.copy())
            d['timestamp'] = datetime.datetime.now()
            d['last_heartbeat'] = datetime.datetime.fromtimestamp(d['last_heartbeat'])
            d['run_id'] = self.run_id

            monitoring_radio.send((MessageType.NODE_INFO, d))

    def process_command(self, monitoring_radio: Optional[MonitoringRadioSender]) -> None:
        """ Command server to run async command to the interchange
        """

        reply: Any  # the type of reply depends on the command_req received (aka this needs dependent types...)

        if self.socks.get(self.command_channel) == zmq.POLLIN:
            logger.debug("entering command_server section")

            command_req = self.command_channel.recv_pyobj()
            logger.debug("Received command request: {}".format(command_req))
            if command_req == "CONNECTED_BLOCKS":
                reply = self.connected_block_history

            elif command_req == "WORKERS":
                reply = sum(m['worker_count'] for m in self._ready_managers.values())

            elif command_req == "MANAGERS":
                reply = []
                now = time.time()
                for manager_id, m in self._ready_managers.items():
                    idle_duration = now - (m['idle_since'] or now)
                    resp = {
                        'manager': manager_id.decode('utf-8'),
                        'block_id': m['block_id'],
                        'worker_count': m['worker_count'],
                        'tasks': len(m['tasks']),
                        'idle_duration': idle_duration,
                        'active': m['active'],
                        'parsl_version': m['parsl_version'],
                        'python_version': m['python_version'],
                        'draining': m['draining']
                    }
                    reply.append(resp)

            elif command_req == "MANAGERS_PACKAGES":
                reply = {}
                for manager_id, m in self._ready_managers.items():
                    manager_id_str = manager_id.decode('utf-8')
                    reply[manager_id_str] = m["packages"]

            elif command_req.startswith("HOLD_WORKER"):
                cmd, s_manager = command_req.split(';')
                manager_id = s_manager.encode('utf-8')
                logger.info("Received HOLD_WORKER for {!r}".format(manager_id))
                if manager_id in self._ready_managers:
                    m = self._ready_managers[manager_id]
                    m['active'] = False
                    self._send_monitoring_info(monitoring_radio, m)
                else:
                    logger.warning("Worker to hold was not in ready managers list")

                reply = None

            elif command_req == "WORKER_BINDS":
                reply = self.worker_port

            else:
                logger.error(f"Received unknown command: {command_req}")
                reply = None

            logger.debug("Reply: {}".format(reply))
            self.command_channel.send_pyobj(reply)

    @wrap_with_logs
    def start(self) -> None:
        """ Start the interchange
        """

        logger.info("Starting main interchange method")

        if self.hub_address is not None and self.hub_zmq_port is not None:
            logger.debug("Creating monitoring radio to %s:%s", self.hub_address, self.hub_zmq_port)
            monitoring_radio = ZMQRadioSender(self.hub_address, self.hub_zmq_port)
            logger.debug("Created monitoring radio")
        else:
            monitoring_radio = None

        poll_period = self.poll_period

        start = time.time()

        kill_event = threading.Event()

        poller = zmq.Poller()
        poller.register(self.manager_sock, zmq.POLLIN)
        poller.register(self.task_incoming, zmq.POLLIN)
        poller.register(self.command_channel, zmq.POLLIN)

        # These are managers which we should examine in an iteration
        # for scheduling a job (or maybe any other attention?).
        # Anything altering the state of the manager should add it
        # onto this list.
        interesting_managers: Set[bytes] = set()

        while not kill_event.is_set():
            self.socks = dict(poller.poll(timeout=poll_period))

            self.process_command(monitoring_radio)
            self.process_task_incoming()
            self.process_manager_socket_message(interesting_managers, monitoring_radio, kill_event)
            self.expire_bad_managers(interesting_managers, monitoring_radio)
            self.expire_drained_managers(interesting_managers, monitoring_radio)
            self.log_manager_counts(interesting_managers)
            self.process_tasks_to_send(interesting_managers, monitoring_radio)

        self.zmq_context.destroy()
        delta = time.time() - start
        logger.info(f"Processed {self.count} tasks in {delta} seconds")
        logger.warning("Exiting")

    def process_task_incoming(self) -> None:
        """Process incoming task message(s).
        """

        if self.socks.get(self.task_incoming) == zmq.POLLIN:
            logger.debug("start task_incoming section")
            msg = self.task_incoming.recv_pyobj()

            # Process priority, higher number = lower priority
            task_id = msg['task_id']
            resource_spec = msg['context'].get('resource_spec', {})
            priority = resource_spec.get('priority', float('inf'))
            queue_entry = (-priority, -task_id, msg)

            logger.debug("Putting task %s onto pending_task_queue", task_id)

            self.pending_task_queue.add(queue_entry)
            logger.debug("Put task %s onto pending_task_queue", task_id)

    def process_manager_socket_message(
        self,
        interesting_managers: Set[bytes],
        monitoring_radio: Optional[MonitoringRadioSender],
        kill_event: threading.Event,
    ) -> None:
        """Process one message from manager on the manager_sock channel."""
        if not self.socks.get(self.manager_sock) == zmq.POLLIN:
            return

        logger.debug('starting worker message section')
        msg_parts = self.manager_sock.recv_multipart()
        try:
            manager_id, meta_b, *msgs = msg_parts
            meta = pickle.loads(meta_b)
            mtype = meta['type']
        except Exception as e:
            logger.warning(
                'Failed to read manager message; ignoring message'
                f' (Exception: [{type(e).__name__}] {e})'
            )
            logger.debug('Raw message bytes:\n   %r\n', msg_parts, exc_info=e)
            return

        logger.debug(
            'Processing message type %r from manager %r', mtype, manager_id
        )

        if mtype == 'connection_probe':
            self.manager_sock.send_multipart([manager_id, b''])
            return

        elif mtype == 'registration':
            ix_minor_py = self.current_platform['python_v'].rsplit('.', 1)[0]
            ix_parsl_v = self.current_platform['parsl_v']
            mgr_minor_py = meta['python_v'].rsplit('.', 1)[0]
            mgr_parsl_v = meta['parsl_v']

            new_rec = ManagerRecord(
                block_id=None,
                start_time=meta['start_time'],
                tasks=[],
                worker_count=0,
                max_capacity=0,
                active=True,
                draining=False,
                last_heartbeat=time.time(),
                idle_since=time.time(),
                parsl_version=mgr_parsl_v,
                python_version=meta['python_v'],
            )

            # m is a ManagerRecord, but meta is a dict[Any,Any] and so can
            # contain arbitrary fields beyond those in ManagerRecord (and
            # indeed does - for example, python_v) which are then ignored
            # later.
            new_rec.update(meta)

            logger.info(f'Registration info for manager {manager_id!r}: {meta}')
            self._send_monitoring_info(monitoring_radio, new_rec)

            python_mismatch: bool = ix_minor_py != mgr_minor_py
            parsl_mismatch: bool = ix_parsl_v != mgr_parsl_v
            if parsl_mismatch or (self._check_python_mismatch and python_mismatch):
                kill_event.set()
                vm_exc = VersionMismatch(
                    f"py.v={ix_minor_py} parsl.v={ix_parsl_v}",
                    f"py.v={mgr_minor_py} parsl.v={mgr_parsl_v}",
                )
                result_package = {
                    'type': 'result',
                    'task_id': -1,
                    'exception': serialize_object(vm_exc),
                }
                pkl_package = pickle.dumps(result_package)
                self.results_outgoing.send(pkl_package)
                logger.error(
                    'Manager has incompatible version info with the interchange;'
                    ' sending failure reports and shutting down:'
                    f'\n  Interchange: {vm_exc.interchange_version}'
                    f'\n  Manager:     {vm_exc.manager_version}'
                )

            else:
                # We really should update the associated data structure; but not
                # at this time.  *kicks can down the road*
                assert new_rec['block_id'] is not None, 'Verified externally'

                # set up entry only if we accept the registration
                self._ready_managers[manager_id] = new_rec
                self.connected_block_history.append(new_rec['block_id'])

                interesting_managers.add(manager_id)

                logger.info(
                    f"Registered manager {manager_id!r} (py{mgr_minor_py},"
                    f" {mgr_parsl_v}) and added to ready queue"
                )
                logger.debug("Manager %r -> %s", manager_id, new_rec)

            return

        if not (m := self._ready_managers.get(manager_id)):
            logger.warning(f"Ignoring message from unknown manager: {manager_id!r}")
            return

        if mtype == 'result':
            logger.debug("Number of results in batch: %d", len(msgs))
            b_messages_to_send = []

            for p_message in msgs:
                r = pickle.loads(p_message)
                r_type = r['type']
                if r_type == 'result':
                    # process this for task ID and forward to executor
                    tid = r['task_id']
                    logger.debug("Removing task %s from manager", tid)
                    try:
                        m['tasks'].remove(tid)
                        b_messages_to_send.append(p_message)
                    except Exception:
                        logger.exception(
                            'Ignoring exception removing task_id %s from manager'
                            ' task list %s',
                            tid,
                            m['tasks']
                        )
                elif r_type == 'monitoring':
                    # the monitoring code makes the assumption that no
                    # monitoring messages will be received if monitoring
                    # is not configured, and that monitoring_radio will only
                    # be None when monitoring is not configurated.
                    assert monitoring_radio is not None

                    monitoring_radio.send(r['payload'])

                else:
                    logger.error(
                        f'Discarding result message of unknown type: {r_type}'
                    )

            if b_messages_to_send:
                logger.debug(
                    'Sending messages (%d) on results_outgoing',
                    len(b_messages_to_send),
                )
                self.results_outgoing.send_multipart(b_messages_to_send)
                logger.debug('Sent messages on results_outgoing')

                # At least one result received, so manager now has idle capacity
                interesting_managers.add(manager_id)

                if len(m['tasks']) == 0 and m['idle_since'] is None:
                    m['idle_since'] = time.time()

                self._send_monitoring_info(monitoring_radio, m)

        elif mtype == 'heartbeat':
            m['last_heartbeat'] = time.time()
            self.manager_sock.send_multipart([manager_id, PKL_HEARTBEAT_CODE])

        elif mtype == 'drain':
            m['draining'] = True

        else:
            logger.error(f"Unexpected message type received from manager: {mtype}")

        logger.debug("leaving worker message section")

    def expire_drained_managers(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None:

        for manager_id in list(interesting_managers):
            # is it always true that a draining manager will be in interesting managers?
            # i think so because it will have outstanding capacity?
            m = self._ready_managers[manager_id]
            if m['draining'] and len(m['tasks']) == 0:
                logger.info(f"Manager {manager_id!r} is drained - sending drained message to manager")
                self.manager_sock.send_multipart([manager_id, PKL_DRAINED_CODE])
                interesting_managers.remove(manager_id)
                self._ready_managers.pop(manager_id)

                m['active'] = False
                self._send_monitoring_info(monitoring_radio, m)

    def log_manager_counts(self, interesting_managers: Set[bytes]) -> None:
        count_interesting = len(interesting_managers)
        count_ready = len(self._ready_managers)

        new_logged_manager_count_token = (count_interesting, count_ready)

        if self._logged_manager_count_token != new_logged_manager_count_token:

            logger.debug(
                "Managers count (interesting/total): %d/%d",
                count_interesting,
                count_ready
            )
            self._logged_manager_count_token = new_logged_manager_count_token

    def process_tasks_to_send(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None:
        # Check if there are tasks that could be sent to managers

        if interesting_managers and self.pending_task_queue:
            shuffled_managers = self.manager_selector.sort_managers(self._ready_managers, interesting_managers)

            while shuffled_managers and self.pending_task_queue:  # cf. the if statement above...
                manager_id = shuffled_managers.pop()
                m = self._ready_managers[manager_id]
                tasks_inflight = len(m['tasks'])
                real_capacity = m['max_capacity'] - tasks_inflight

                if real_capacity and m["active"] and not m["draining"]:
                    tasks = self.get_tasks(real_capacity)
                    if tasks:
                        self.manager_sock.send_multipart([manager_id, pickle.dumps(tasks)])
                        task_count = len(tasks)
                        self.count += task_count
                        tids = [t['task_id'] for t in tasks]
                        m['tasks'].extend(tids)
                        m['idle_since'] = None
                        logger.debug("Sent tasks: %s to manager %r", tids, manager_id)
                        # recompute real_capacity after sending tasks
                        real_capacity -= task_count
                        if real_capacity > 0:
                            logger.debug("Manager %r has free capacity %s", manager_id, real_capacity)
                            # ... so keep it in the interesting_managers list
                        else:
                            logger.debug("Manager %r is now saturated", manager_id)
                            interesting_managers.remove(manager_id)
                    self._send_monitoring_info(monitoring_radio, m)
                else:
                    interesting_managers.remove(manager_id)
            logger.debug("leaving _ready_managers section, with %s managers still interesting", len(interesting_managers))

    def expire_bad_managers(self, interesting_managers: Set[bytes], monitoring_radio: Optional[MonitoringRadioSender]) -> None:
        bad_managers = [(manager_id, m) for (manager_id, m) in self._ready_managers.items() if
                        time.time() - m['last_heartbeat'] > self.heartbeat_threshold]
        for (manager_id, m) in bad_managers:
            logger.debug("Last: {} Current: {}".format(m['last_heartbeat'], time.time()))
            logger.warning(f"Too many heartbeats missed for manager {manager_id!r} - removing manager")
            if m['active']:
                m['active'] = False
                self._send_monitoring_info(monitoring_radio, m)

            logger.warning(f"Cancelling htex tasks {m['tasks']} on removed manager")
            for tid in m['tasks']:
                try:
                    raise ManagerLost(manager_id, m['hostname'])
                except Exception:
                    result_package = {'type': 'result', 'task_id': tid, 'exception': serialize_object(RemoteExceptionWrapper(*sys.exc_info()))}
                    pkl_package = pickle.dumps(result_package)
                    self.results_outgoing.send(pkl_package)
            logger.warning("Sent failure reports, unregistering manager")
            self._ready_managers.pop(manager_id, 'None')
            if manager_id in interesting_managers:
                interesting_managers.remove(manager_id)


def start_file_logger(filename: str, level: int = logging.DEBUG, format_string: Optional[str] = None) -> None:
    """Add a stream log handler.

    Parameters
    ---------

    filename: string
        Name of the file to write logs to. Required.
    level: logging.LEVEL
        Set the logging level. Default=logging.DEBUG
        - format_string (string): Set the format string
    format_string: string
        Format string to use.

    Returns
    -------
        None.
    """
    if format_string is None:
        format_string = (

            "%(asctime)s.%(msecs)03d %(name)s:%(lineno)d "
            "%(processName)s(%(process)d) %(threadName)s "
            "%(funcName)s [%(levelname)s] %(message)s"

        )

    logger.setLevel(level)
    handler = logging.FileHandler(filename)
    handler.setLevel(level)
    formatter = logging.Formatter(format_string, datefmt='%Y-%m-%d %H:%M:%S')
    handler.setFormatter(formatter)
    logger.addHandler(handler)


if __name__ == "__main__":
    from parsl.utils import setproctitle

    setproctitle("parsl: HTEX interchange")

    config = pickle.load(sys.stdin.buffer)

    ic = Interchange(**config)
    ic.start()