File: _task.py

package info (click to toggle)
oscrypto 1.3.0-7
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,164 kB
  • sloc: python: 22,115; makefile: 7
file content (163 lines) | stat: -rw-r--r-- 4,196 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# coding: utf-8
from __future__ import unicode_literals, division, absolute_import, print_function

import ast
import _ast
import os
import sys

from . import package_root, task_keyword_args
from ._import import _import_from


if sys.version_info < (3,):
    byte_cls = str
else:
    byte_cls = bytes


def _list_tasks():
    """
    Fetches a list of all valid tasks that may be run, and the args they
    accept. Does not actually import the task module to prevent errors if a
    user does not have the dependencies installed for every task.

    :return:
        A list of 2-element tuples:
         0: a unicode string of the task name
         1: a list of dicts containing the parameter definitions
    """

    out = []
    dev_path = os.path.join(package_root, 'dev')
    for fname in sorted(os.listdir(dev_path)):
        if fname.startswith('.') or fname.startswith('_'):
            continue
        if not fname.endswith('.py'):
            continue
        name = fname[:-3]
        args = ()

        full_path = os.path.join(package_root, 'dev', fname)
        with open(full_path, 'rb') as f:
            full_code = f.read()
            if sys.version_info >= (3,):
                full_code = full_code.decode('utf-8')

        task_node = ast.parse(full_code, filename=full_path)
        for node in ast.iter_child_nodes(task_node):
            if isinstance(node, _ast.Assign):
                if len(node.targets) == 1 \
                        and isinstance(node.targets[0], _ast.Name) \
                        and node.targets[0].id == 'run_args':
                    args = ast.literal_eval(node.value)
                    break

        out.append((name, args))
    return out


def show_usage():
    """
    Prints to stderr the valid options for invoking tasks
    """

    valid_tasks = []
    for task in _list_tasks():
        usage = task[0]
        for run_arg in task[1]:
            usage += ' '
            name = run_arg.get('name', '')
            if run_arg.get('required', False):
                usage += '{%s}' % name
            else:
                usage += '[%s]' % name
        valid_tasks.append(usage)

    out = 'Usage: run.py'
    for karg in task_keyword_args:
        out += ' [%s=%s]' % (karg['name'], karg['placeholder'])
    out += ' (%s)' % ' | '.join(valid_tasks)

    print(out, file=sys.stderr)
    sys.exit(1)


def _get_arg(num):
    """
    :return:
        A unicode string of the requested command line arg
    """

    if len(sys.argv) < num + 1:
        return None
    arg = sys.argv[num]
    if isinstance(arg, byte_cls):
        arg = arg.decode('utf-8')
    return arg


def run_task():
    """
    Parses the command line args, invoking the requested task
    """

    arg_num = 1
    task = None
    args = []
    kwargs = {}

    # We look for the task name, processing any global task keyword args
    # by setting the appropriate env var
    while True:
        val = _get_arg(arg_num)
        if val is None:
            break

        next_arg = False
        for karg in task_keyword_args:
            if val.startswith(karg['name'] + '='):
                os.environ[karg['env_var']] = val[len(karg['name']) + 1:]
                next_arg = True
                break

        if next_arg:
            arg_num += 1
            continue

        task = val
        break

    if task is None:
        show_usage()

    task_mod = _import_from('dev.%s' % task, package_root, allow_error=True)
    if task_mod is None:
        show_usage()

    run_args = task_mod.__dict__.get('run_args', [])
    max_args = arg_num + 1 + len(run_args)

    if len(sys.argv) > max_args:
        show_usage()

    for i, run_arg in enumerate(run_args):
        val = _get_arg(arg_num + 1 + i)
        if val is None:
            if run_arg.get('required', False):
                show_usage()
            break

        if run_arg.get('cast') == 'int' and val.isdigit():
            val = int(val)

        kwarg = run_arg.get('kwarg')
        if kwarg:
            kwargs[kwarg] = val
        else:
            args.append(val)

    run = task_mod.__dict__.get('run')

    result = run(*args, **kwargs)
    sys.exit(int(not result))