File: cli.py

package info (click to toggle)
python-hupper 1.12-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 344 kB
  • sloc: python: 1,668; makefile: 149
file content (70 lines) | stat: -rw-r--r-- 2,119 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import argparse
import os
import sys
import time

import hupper

here = os.path.dirname(__file__)


def parse_options(args):
    parser = argparse.ArgumentParser()
    parser.add_argument('--reload', action='store_true')
    parser.add_argument('--callback-file')
    parser.add_argument(
        '--watch-file', action='append', dest='watch_files', default=[]
    )
    parser.add_argument('--watchman', action='store_true')
    parser.add_argument('--watchdog', action='store_true')
    parser.add_argument('--poll', action='store_true')
    parser.add_argument('--poll-interval', type=int)
    parser.add_argument('--reload-interval', type=int)
    parser.add_argument('--shutdown-interval', type=int)
    return parser.parse_args(args)


def main(args=None):
    if args is None:
        args = sys.argv[1:]
    opts = parse_options(args)
    if opts.reload:
        kw = {}
        if opts.poll:
            from hupper.polling import PollingFileMonitor

            pkw = {}
            if opts.poll_interval:
                pkw['poll_interval'] = opts.poll_interval
            kw['monitor_factory'] = lambda cb: PollingFileMonitor(cb, **pkw)

        if opts.watchdog:
            from hupper.watchdog import WatchdogFileMonitor

            kw['monitor_factory'] = WatchdogFileMonitor

        if opts.watchman:
            from hupper.watchman import WatchmanFileMonitor

            kw['monitor_factory'] = WatchmanFileMonitor

        if opts.reload_interval is not None:
            kw['reload_interval'] = opts.reload_interval

        if opts.shutdown_interval is not None:
            kw['shutdown_interval'] = opts.shutdown_interval

        hupper.start_reloader(__name__ + '.main', **kw)

    if hupper.is_active():
        hupper.get_reloader().watch_files([os.path.join(here, 'foo.ini')])
        hupper.get_reloader().watch_files(opts.watch_files)

    if opts.callback_file:
        with open(opts.callback_file, 'ab') as fp:
            fp.write('{:d}\n'.format(int(time.time())).encode('utf8'))
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        pass