File: main.py

package info (click to toggle)
python-sshoot 1.6.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 344 kB
  • sloc: python: 1,579; makefile: 23; sh: 1
file content (407 lines) | stat: -rw-r--r-- 13,042 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
"""Command-line interface to handle sshuttle VPN sessions."""

from argparse import (
    ArgumentParser,
    Namespace,
)
from functools import partial
from typing import Set

from argcomplete import autocomplete

#from toolrack.script import (
#    ErrorExitMessage,
#    Script,
#)
#https://raw.githubusercontent.com/albertodonato/toolrack/main/toolrack/script.py

import sys
from typing import IO


class ErrorExitMessage(Exception):
    """Raised to exit the process with the specified message and exit code.

    :param message: the error message.
    :param code: the script exit code.

    """

    def __init__(self, message: str, code: int = 1) -> None:
        self.message = message
        self.code = code


class Script:
    """Wraps a python script handling argument parsing.

    Subclasses must implement :func:`get_parser` and :func:`main` methods.

    Inside :func:`main`, :exc:`ErrorExitMessage` can be raised with the
    appropriate ``message`` and ``code`` to cause the script termination, with
    the message outputted to standard error.

    Script instances are callable, and can be passed the argument list (which
    defaults to :data:`sys.argv` if not provided).

    """

    def __init__(
        self, stdout: IO | None = None, stderr: IO | None = None
    ) -> None:
        self._stdout = stdout or sys.stdout
        self._stderr = stderr or sys.stderr

    def get_parser(self) -> ArgumentParser:
        """Return a configured :class:`argparse.ArgumentParser` instance.

        .. note::
            Subclasses must implement this method.

        """
        raise NotImplementedError()

    def main(self, args: Namespace) -> int | None:
        """The body of the script.

        It gets called with the :class:`argparse.Namespace` instance returned
        by :func:`get_parser`.

        :param args: command line arguments.

        .. note::
            Subclasses must implement this method.

        """
        raise NotImplementedError()

    def exit(self, code: int = 0) -> None:
        """Exit with the specified return code."""
        sys.exit(code)

    def handle_keyboard_interrupt(self, interrupt: KeyboardInterrupt):
        """Called when a :class:`KeyboardInterrupt` is raised.

        By default it just traps the exception and exits with success.
        It can be overridden to perform additional cleanups.

        """
        self.exit()

    def __call__(self, args: list[str] | None = None) -> int:
        """Call the script, passing :data:`sys.argv` by default."""
        parser = self.get_parser()
        parsed_args = parser.parse_args(args=args)
        try:
            return self.main(parsed_args) or 0
        except KeyboardInterrupt as interrupt:
            self.handle_keyboard_interrupt(interrupt)
        except ErrorExitMessage as error:
            self._stderr.write(f"{error.message}\n")
            self.exit(error.code)
        return 0

from . import __version__
from .autocomplete import (
    complete_argument,
    profile_completer,
)
from .i18n import _
from .listing import (
    profile_details,
    ProfileListing,
)
from .manager import (
    DEFAULT_CONFIG_PATH,
    Manager,
    ManagerProfileError,
)


class Sshoot(Script):
    """Manage multiple sshuttle VPN sessions."""

    def main(self, args: Namespace):
        try:
            manager = Manager(config_path=args.config)
            manager.load_config()
        except OSError as error:
            raise ErrorExitMessage(error, code=3)
        action = args.action.replace("-", "_")
        method = getattr(self, "action_" + action)
        action_args = Namespace(
            **{
                key: value
                for key, value in args.__dict__.items()
                if key not in self.global_args
            }
        )
        try:
            return method(manager, action_args)
        except ManagerProfileError as error:
            raise ErrorExitMessage(error, code=2)

    def print(self, *args, **kwargs):
        """Print out message."""
        print(*args, **kwargs, file=self._stdout)

    def action_list(self, manager: Manager, args: Namespace):
        """Print out the list of profiles as a table."""
        listing = ProfileListing(manager)
        self.print(
            listing.get_output(args.format, verbose=args.verbose), end=""
        )

    def action_show(self, manager: Manager, args: Namespace):
        """Show details on a profile."""
        self.print(profile_details(manager, args.name))

    def action_create(self, manager: Manager, args: Namespace):
        """Create a new profile."""
        details = args.__dict__.copy()
        details.pop("name")
        manager.create_profile(args.name, details)

    def action_delete(self, manager: Manager, args: Namespace):
        """Delete profile with the given name."""
        manager.remove_profile(args.name)

    def action_start(self, manager: Manager, args: Namespace):
        """Start sshuttle for the specified profile."""
        manager.start_profile(
            args.name,
            extra_args=args.args,
            disable_global_extra_options=args.disable_global_extra_options,
        )
        self.print(_("Profile started"))

    def action_stop(self, manager: Manager, args: Namespace):
        """Stop sshuttle for the specified profile."""
        manager.stop_profile(args.name)
        self.print(_("Profile stopped"))

    def action_restart(self, manager: Manager, args: Namespace):
        """Restart sshuttle for the specified profile."""
        manager.restart_profile(
            args.name,
            extra_args=args.args,
            disable_global_extra_options=args.disable_global_extra_options,
        )
        self.print(_("Profile restarted"))

    def action_is_running(self, manager: Manager, args: Namespace):
        """Return whether the specified profile is running."""
        # raise an error if profile is unknown
        manager.get_profile(args.name)
        retval = 0 if manager.is_running(args.name) else 1
        self.exit(retval)

    def action_get_command(self, manager: Manager, args: Namespace):
        """Print the sshuttle command for the specified profile."""
        cmdline = manager.get_cmdline(
            args.name,
            disable_global_extra_options=args.disable_global_extra_options,
        )
        self.print(" ".join(cmdline))

    def get_parser(self) -> ArgumentParser:
        """Return a configured argparse.ArgumentParse instance."""
        parser = ArgumentParser(
            prog="sshoot",
            description=_("Manage multiple sshuttle VPN sessions"),
        )
        parser.add_argument(
            "-V",
            "--version",
            action="version",
            version=f"%(prog)s {__version__}",
        )
        parser.add_argument(
            "-C",
            "--config",
            default=DEFAULT_CONFIG_PATH,
            help=_("configuration directory (default: $HOME/.config/sshoot)"),
        )
        subparsers = parser.add_subparsers(
            metavar="ACTION", dest="action", help=_("action to perform")
        )
        subparsers.required = True

        # List profiles
        list_parser = subparsers.add_parser(
            "list", help=_("list defined profiles")
        )
        list_parser.add_argument(
            "-v", "--verbose", action="store_true", help=_("verbose listing")
        )
        list_parser.add_argument(
            "-f",
            "--format",
            choices=ProfileListing.supported_formats(),
            default="table",
            help=_("listing format (default %(default)s)"),
        )

        # Show profile
        show_parser = subparsers.add_parser(
            "show", help=_("show profile configuration")
        )
        complete_argument(
            show_parser.add_argument("name", help=_("profile name")),
            profile_completer,
        )

        # Add profile
        create_parser = subparsers.add_parser(
            "create", help=_("define a new profile")
        )
        create_parser.add_argument("name", help=_("profile name"))
        create_parser.add_argument(
            "subnets", nargs="+", help=_("subnets to route over the VPN")
        )
        create_parser.add_argument(
            "-r", "--remote", help=_("remote host to connect to")
        )
        create_parser.add_argument(
            "-H",
            "--auto-hosts",
            action="store_true",
            help=_("automatically update /etc/hosts with hosts from VPN"),
        )
        create_parser.add_argument(
            "-N",
            "--auto-nets",
            action="store_true",
            help=_("automatically route additional nets from server"),
        )
        create_parser.add_argument(
            "-d",
            "--dns",
            action="store_true",
            help=_("forward DNS queries through the VPN"),
        )
        create_parser.add_argument(
            "-x",
            "--exclude-subnets",
            nargs="+",
            help=_("exclude subnets from VPN forward"),
        )
        create_parser.add_argument(
            "-S",
            "--seed-hosts",
            nargs="+",
            help=_("comma-separated list of hosts to seed to auto-hosts"),
        )
        create_parser.add_argument(
            "--extra-opts",
            type=str.split,
            help=_("extra arguments to pass to sshuttle command line"),
        )

        # Remove profile
        delete_parser = subparsers.add_parser(
            "delete", help=_("delete an existing profile")
        )
        complete_argument(
            delete_parser.add_argument(
                "name", help=_("name of the profile to remove")
            ),
            profile_completer,
        )

        # Start profile
        start_parser = subparsers.add_parser(
            "start", help=_("start a VPN session for a profile")
        )
        complete_argument(
            start_parser.add_argument(
                "name", help=_("name of the profile to start")
            ),
            partial(profile_completer, running=False),
        )
        start_parser.add_argument(
            "--no-global-extra-options",
            dest="disable_global_extra_options",
            action="store_true",
            help=_("disable global extra-options set in config.yaml"),
        )
        start_parser.add_argument(
            "args",
            nargs="*",
            help=_("additional arguments passed to sshuttle command line"),
        )

        # Stop profile
        stop_parser = subparsers.add_parser(
            "stop", help=_("stop a running VPN session for a profile")
        )
        complete_argument(
            stop_parser.add_argument(
                "name", help=_("name of the profile to stop")
            ),
            partial(profile_completer, running=True),
        )

        # Restart profile
        restart_parser = subparsers.add_parser(
            "restart", help=_("restart a VPN session for a profile")
        )
        complete_argument(
            restart_parser.add_argument(
                "name", help=_("name of the profile to restart")
            ),
            partial(profile_completer, running=True),
        )
        restart_parser.add_argument(
            "--no-global-extra-options",
            dest="disable_global_extra_options",
            action="store_true",
            help=_("disable global extra-options set in config.yaml"),
        )
        restart_parser.add_argument(
            "args",
            nargs="*",
            help=_("additional arguments passed to sshuttle command line"),
        )

        # Return whether profile is running
        is_running_parser = subparsers.add_parser(
            "is-running", help=_("return whether a profile is running")
        )
        complete_argument(
            is_running_parser.add_argument(
                "name", help=_("name of the profile to query")
            ),
            profile_completer,
        )

        # Get profile command
        get_command_parser = subparsers.add_parser(
            "get-command", help=_("return the sshuttle command for a profile")
        )
        complete_argument(
            get_command_parser.add_argument(
                "name", help=_("name of the profile")
            ),
            profile_completer,
        )
        get_command_parser.add_argument(
            "--no-global-extra-options",
            dest="disable_global_extra_options",
            action="store_true",
            help=_("disable global extra-options set in config.yaml"),
        )

        # track global arguments/options so they can be stripped from action namespace
        self.global_args: Set[str] = set()
        for group in parser._action_groups:
            self.global_args.update(
                action.dest for action in group._group_actions
            )

        # Setup autocompletion
        autocomplete(parser)
        return parser


sshoot = Sshoot()