File: flux_instance_manager.py

package info (click to toggle)
python-parsl 2025.01.13%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 12,072 kB
  • sloc: python: 23,817; makefile: 349; sh: 276; ansic: 45
file content (56 lines) | stat: -rw-r--r-- 2,129 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
"""Script meant to be the initial program of a Flux instance."""

import argparse
import logging
import os
from os.path import dirname
from socket import gethostbyname, gethostname

import zmq


def main():
    """Run a Flux instance to completion.

    Send the path to the Flux Python package and the URI of the
    encapsulating Flux instance.
    """
    # flux imports only available when launched under Flux instance
    import flux
    import flux.job

    logging.basicConfig(
        level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s"
    )
    parser = argparse.ArgumentParser()
    parser.add_argument("protocol", help="Protocol of the parent executor's socket")
    parser.add_argument("hostname", help="hostname of the parent executor's socket")
    parser.add_argument("port", help="Port of the parent executor's socket")
    args = parser.parse_args()
    with zmq.Context() as context, context.socket(zmq.REQ) as socket:
        socket.connect(
            args.protocol + "://" + gethostbyname(args.hostname) + ":" + args.port
        )
        # send the path to the ``flux.job`` package
        socket.send(dirname(dirname(os.path.realpath(flux.__file__))).encode())
        logging.debug("Flux package path sent.")
        # collect the encapsulating Flux instance's URI
        local_uri = flux.Flux().attr_get("local-uri")
        hostname = gethostname()
        if args.hostname == hostname:
            flux_uri = local_uri
        else:
            flux_uri = "ssh://" + gethostname() + local_uri.replace("local://", "")
        logging.debug("Flux URI is %s", flux_uri)
        response = socket.recv()  # get acknowledgment
        logging.debug("Received acknowledgment %s", response)
        socket.send(flux_uri.encode())  # send URI
        logging.debug("URI sent. Blocking for response...")
        response = socket.recv()  # wait for shutdown message
        logging.debug("Response %s received, draining flux jobs...", response)
        flux.Flux().rpc("job-manager.drain").get()
        logging.debug("Flux jobs drained, exiting.")


if __name__ == "__main__":
    main()