File: shell_plus.py

package info (click to toggle)
python-django-extensions 4.1-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 2,820 kB
  • sloc: python: 18,601; javascript: 7,354; makefile: 108; xml: 17
file content (701 lines) | stat: -rw-r--r-- 25,055 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
# -*- coding: utf-8 -*-
import inspect
import os
import sys
import traceback
import warnings

from django.db import connections
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.utils.datastructures import OrderedSet

from django_extensions.management.shells import import_objects
from django_extensions.management.utils import signalcommand
from django_extensions.management.debug_cursor import monkey_patch_cursordebugwrapper


def use_vi_mode():
    editor = os.environ.get("EDITOR")
    if not editor:
        return False
    editor = os.path.basename(editor)
    return editor.startswith("vi") or editor.endswith("vim")


def shell_runner(flags, name, help=None):
    """
    Decorates methods with information about the application they are starting

    :param flags: The flags used to start this runner via the ArgumentParser.
    :param name: The name of this runner for the help text for the ArgumentParser.
    :param help: The optional help for the ArgumentParser if the dynamically generated help is not sufficient.
    """  # noqa: E501

    def decorator(fn):
        fn.runner_flags = flags
        fn.runner_name = name
        fn.runner_help = help

        return fn

    return decorator


class Command(BaseCommand):
    help = "Like the 'shell' command but autoloads the models of all installed Django apps."  # noqa: E501
    extra_args = None
    tests_mode = False

    def __init__(self):
        super().__init__()
        self.runners = [
            member
            for name, member in inspect.getmembers(self)
            if hasattr(member, "runner_flags")
        ]

    def add_arguments(self, parser):
        super().add_arguments(parser)

        group = parser.add_mutually_exclusive_group()
        for runner in self.runners:
            if runner.runner_help:
                help = runner.runner_help
            else:
                help = "Tells Django to use %s." % runner.runner_name

            group.add_argument(
                *runner.runner_flags,
                action="store_const",
                dest="runner",
                const=runner,
                help=help,
            )

        parser.add_argument(
            "--connection-file",
            action="store",
            dest="connection_file",
            help="Specifies the connection file to use if using the --kernel option",
        )
        parser.add_argument(
            "--no-startup",
            action="store_true",
            dest="no_startup",
            default=False,
            help=(
                "When using plain Python, ignore the PYTHONSTARTUP environment "
                "variable and ~/.pythonrc.py script."
            ),
        )
        parser.add_argument(
            "--use-pythonrc",
            action="store_true",
            dest="use_pythonrc",
            default=False,
            help=(
                "When using plain Python, load the PYTHONSTARTUP environment variable "
                "and ~/.pythonrc.py script."
            ),
        )
        parser.add_argument(
            "--print-sql",
            action="store_true",
            default=False,
            help="Print SQL queries as they're executed",
        )
        parser.add_argument(
            "--truncate-sql",
            action="store",
            type=int,
            help="Truncate SQL queries to a number of characters.",
        )
        parser.add_argument(
            "--print-sql-location",
            action="store_true",
            default=False,
            help="Show location in code where SQL query generated from",
        )
        parser.add_argument(
            "--dont-load",
            action="append",
            dest="dont_load",
            default=[],
            help="Ignore autoloading of some apps/models. Can be used several times.",
        )
        parser.add_argument(
            "--quiet-load",
            action="store_true",
            default=False,
            dest="quiet_load",
            help="Do not display loaded models messages",
        )
        parser.add_argument(
            "--vi",
            action="store_true",
            default=use_vi_mode(),
            dest="vi_mode",
            help="Load Vi key bindings (for --ptpython and --ptipython)",
        )
        parser.add_argument(
            "--no-browser",
            action="store_true",
            default=False,
            dest="no_browser",
            help="Don't open the notebook in a browser after startup.",
        )
        parser.add_argument(
            "-c",
            "--command",
            help=(
                "Instead of opening an interactive shell, "
                "run a command as Django and exit."
            ),
        )

    def run_from_argv(self, argv):
        if "--" in argv[2:]:
            idx = argv.index("--")
            self.extra_args = argv[idx + 1 :]
            argv = argv[:idx]
        return super().run_from_argv(argv)

    def get_ipython_arguments(self, options):
        ipython_args = "IPYTHON_ARGUMENTS"
        arguments = getattr(settings, ipython_args, [])
        if not arguments:
            arguments = os.environ.get(ipython_args, "").split()
        return arguments

    def get_notebook_arguments(self, options):
        notebook_args = "NOTEBOOK_ARGUMENTS"
        arguments = getattr(settings, notebook_args, [])
        if not arguments:
            arguments = os.environ.get(notebook_args, "").split()
        return arguments

    def get_imported_objects(self, options):
        imported_objects = import_objects(options, self.style)
        if self.tests_mode:
            # save imported objects so we can run tests against it later
            self.tests_imported_objects = imported_objects
        return imported_objects

    @shell_runner(flags=["--kernel"], name="IPython Kernel")
    def get_kernel(self, options):
        try:
            from IPython import release

            if release.version_info[0] < 2:
                print(
                    self.style.ERROR("--kernel requires at least IPython version 2.0")
                )
                return
            from IPython import start_kernel
        except ImportError:
            return traceback.format_exc()

        def run_kernel():
            imported_objects = self.get_imported_objects(options)
            kwargs = dict(
                argv=[],
                user_ns=imported_objects,
            )
            connection_file = options["connection_file"]
            if connection_file:
                kwargs["connection_file"] = connection_file
            start_kernel(**kwargs)

        return run_kernel

    def load_base_kernel_spec(self, app):
        """Finds and returns the base Python kernelspec to extend from."""
        ksm = app.kernel_spec_manager
        try_spec_names = getattr(
            settings,
            "NOTEBOOK_KERNEL_SPEC_NAMES",
            [
                "python3",
                "python",
            ],
        )

        if isinstance(try_spec_names, str):
            try_spec_names = [try_spec_names]

        ks = None
        for spec_name in try_spec_names:
            try:
                ks = ksm.get_kernel_spec(spec_name)
                break
            except Exception:
                continue
        if not ks:
            raise CommandError(
                "No notebook (Python) kernel specs found. Tried %r" % try_spec_names
            )

        return ks

    def generate_kernel_specs(self, app, ipython_arguments):
        """Generate an IPython >= 3.0 kernelspec that loads django extensions"""
        ks = self.load_base_kernel_spec(app)
        ks.argv.extend(ipython_arguments)
        ks.display_name = getattr(
            settings, "IPYTHON_KERNEL_DISPLAY_NAME", "Django Shell-Plus"
        )

        manage_py_dir, manage_py = os.path.split(os.path.realpath(sys.argv[0]))
        if manage_py == "manage.py" and os.path.isdir(manage_py_dir):
            pythonpath = ks.env.get("PYTHONPATH", os.environ.get("PYTHONPATH", ""))
            pythonpath = pythonpath.split(os.pathsep)
            if manage_py_dir not in pythonpath:
                pythonpath.append(manage_py_dir)

            ks.env["PYTHONPATH"] = os.pathsep.join(filter(None, pythonpath))

        return {"django_extensions": ks}

    def run_notebookapp(self, app_init, options, use_kernel_specs=True, history=True):
        no_browser = options["no_browser"]

        if self.extra_args:
            # if another '--' is found split the arguments notebook, ipython
            if "--" in self.extra_args:
                idx = self.extra_args.index("--")
                notebook_arguments = self.extra_args[:idx]
                ipython_arguments = self.extra_args[idx + 1 :]
            # otherwise pass the arguments to the notebook
            else:
                notebook_arguments = self.extra_args
                ipython_arguments = []
        else:
            notebook_arguments = self.get_notebook_arguments(options)
            ipython_arguments = self.get_ipython_arguments(options)

        # Treat IPYTHON_ARGUMENTS from settings
        if "django_extensions.management.notebook_extension" not in ipython_arguments:
            ipython_arguments.extend(
                ["--ext", "django_extensions.management.notebook_extension"]
            )

        # Treat NOTEBOOK_ARGUMENTS from settings
        if no_browser and "--no-browser" not in notebook_arguments:
            notebook_arguments.append("--no-browser")
        if "--notebook-dir" not in notebook_arguments and not any(
            e.startswith("--notebook-dir=") for e in notebook_arguments
        ):
            notebook_arguments.extend(["--notebook-dir", "."])

        # IPython < 3 passes through kernel args from notebook CLI
        if not use_kernel_specs:
            notebook_arguments.extend(ipython_arguments)

        # disable history if not already configured in some other way
        if not history and not any(
            arg.startswith("--HistoryManager") for arg in ipython_arguments
        ):
            ipython_arguments.append("--HistoryManager.enabled=False")

        if not callable(app_init):
            app = app_init
            warnings.warn(
                "Initialize should be a callable not an app instance",
                DeprecationWarning,
            )
            app.initialize(notebook_arguments)
        else:
            app = app_init(notebook_arguments)

        # IPython >= 3 uses kernelspecs to specify kernel CLI args
        if use_kernel_specs:
            ksm = app.kernel_spec_manager
            for kid, ks in self.generate_kernel_specs(app, ipython_arguments).items():
                roots = [os.path.dirname(ks.resource_dir), ksm.user_kernel_dir]

                for root in roots:
                    kernel_dir = os.path.join(root, kid)
                    try:
                        if not os.path.exists(kernel_dir):
                            os.makedirs(kernel_dir)
                        with open(os.path.join(kernel_dir, "kernel.json"), "w") as f:
                            f.write(ks.to_json())
                        break
                    except OSError:
                        continue
                else:
                    raise CommandError(
                        "Could not write kernel %r in directories %r" % (kid, roots)
                    )

        app.start()

    @shell_runner(flags=["--notebook"], name="IPython Notebook")
    def get_notebook(self, options):
        try:
            from IPython import release
        except ImportError:
            return traceback.format_exc()
        try:
            from notebook.notebookapp import NotebookApp
        except ImportError:
            if release.version_info[0] >= 7:
                return traceback.format_exc()
            try:
                from IPython.html.notebookapp import NotebookApp
            except ImportError:
                if release.version_info[0] >= 3:
                    return traceback.format_exc()
                try:
                    from IPython.frontend.html.notebook import notebookapp

                    NotebookApp = notebookapp.NotebookApp
                except ImportError:
                    return traceback.format_exc()

        use_kernel_specs = release.version_info[0] >= 3

        def app_init(*args, **kwargs):
            app = NotebookApp.instance()
            app.initialize(*args, **kwargs)
            return app

        def run_notebook():
            self.run_notebookapp(app_init, options, use_kernel_specs)

        return run_notebook

    @shell_runner(flags=["--lab"], name="JupyterLab Notebook")
    def get_jupyterlab(self, options):
        try:
            from jupyterlab.labapp import LabApp
        except ImportError:
            return traceback.format_exc()

        # check for JupyterLab 3.0
        try:
            from notebook.notebookapp import NotebookApp
        except ImportError:
            NotebookApp = None

        if not NotebookApp or not issubclass(LabApp, NotebookApp):
            app_init = LabApp.initialize_server
        else:

            def app_init(*args, **kwargs):
                app = LabApp.instance()
                app.initialize(*args, **kwargs)
                return app

        def run_jupyterlab():
            self.run_notebookapp(app_init, options, history=False)

        return run_jupyterlab

    @shell_runner(flags=["--plain"], name="plain Python")
    def get_plain(self, options):
        # Using normal Python shell
        import code

        # Set up a dictionary to serve as the environment for the shell.
        imported_objects = self.get_imported_objects(options)

        use_pythonrc = options["use_pythonrc"]
        no_startup = options["no_startup"]

        # We want to honor both $PYTHONSTARTUP and .pythonrc.py, so follow system
        # conventions and get $PYTHONSTARTUP first then .pythonrc.py.
        if use_pythonrc or not no_startup:
            for pythonrc in OrderedSet(
                [os.environ.get("PYTHONSTARTUP"), os.path.expanduser("~/.pythonrc.py")]
            ):
                if not pythonrc:
                    continue
                if not os.path.isfile(pythonrc):
                    continue
                with open(pythonrc) as handle:
                    pythonrc_code = handle.read()
                # Match the behavior of the cpython shell where an error in
                # PYTHONSTARTUP prints an exception and continues.
                try:
                    exec(compile(pythonrc_code, pythonrc, "exec"), imported_objects)
                except Exception:
                    traceback.print_exc()
                    if self.tests_mode:
                        raise

        # By default, this will set up readline to do tab completion and to read and
        # write history to the .python_history file, but this can be overridden by
        # $PYTHONSTARTUP or ~/.pythonrc.py.
        try:
            hook = sys.__interactivehook__
        except AttributeError:
            # Match the behavior of the cpython shell where a missing
            # sys.__interactivehook__ is ignored.
            pass
        else:
            try:
                hook()
            except Exception:
                # Match the behavior of the cpython shell where an error in
                # sys.__interactivehook__ prints a warning and the exception
                # and continues.
                print("Failed calling sys.__interactivehook__")
                traceback.print_exc()

        try:
            # Try activating rlcompleter, because it's handy.
            import readline
        except ImportError:
            pass
        else:
            # We don't have to wrap the following import in a 'try', because
            # we already know 'readline' was imported successfully.
            import rlcompleter

            readline.set_completer(rlcompleter.Completer(imported_objects).complete)
            # Enable tab completion on systems using libedit (e.g. macOS).
            # These lines are copied from Lib/site.py on Python 3.4.
            readline_doc = getattr(readline, "__doc__", "")
            if readline_doc is not None and "libedit" in readline_doc:
                readline.parse_and_bind("bind ^I rl_complete")
            else:
                readline.parse_and_bind("tab:complete")

        def run_plain():
            code.interact(local=imported_objects)

        return run_plain

    @shell_runner(flags=["--bpython"], name="BPython")
    def get_bpython(self, options):
        try:
            from bpython import embed
        except ImportError:
            return traceback.format_exc()

        def run_bpython():
            imported_objects = self.get_imported_objects(options)
            kwargs = {}
            if self.extra_args:
                kwargs["args"] = self.extra_args
            embed(imported_objects, **kwargs)

        return run_bpython

    @shell_runner(flags=["--ipython"], name="IPython")
    def get_ipython(self, options):
        try:
            from IPython import start_ipython

            def run_ipython():
                imported_objects = self.get_imported_objects(options)
                ipython_arguments = self.extra_args or self.get_ipython_arguments(
                    options
                )
                start_ipython(argv=ipython_arguments, user_ns=imported_objects)

            return run_ipython
        except ImportError:
            str_exc = traceback.format_exc()
            # IPython < 0.11
            # Explicitly pass an empty list as arguments, because otherwise
            # IPython would use sys.argv from this script.
            # Notebook not supported for IPython < 0.11.
            try:
                from IPython.Shell import IPShell
            except ImportError:
                return str_exc + "\n" + traceback.format_exc()

            def run_ipython():
                imported_objects = self.get_imported_objects(options)
                shell = IPShell(argv=[], user_ns=imported_objects)
                shell.mainloop()

            return run_ipython

    @shell_runner(flags=["--ptpython"], name="PTPython")
    def get_ptpython(self, options):
        try:
            from ptpython.repl import embed, run_config
        except ImportError:
            tb = traceback.format_exc()
            try:  # prompt_toolkit < v0.27
                from prompt_toolkit.contrib.repl import embed, run_config
            except ImportError:
                return tb

        def run_ptpython():
            imported_objects = self.get_imported_objects(options)
            history_filename = os.path.expanduser("~/.ptpython_history")
            embed(
                globals=imported_objects,
                history_filename=history_filename,
                vi_mode=options["vi_mode"],
                configure=run_config,
            )

        return run_ptpython

    @shell_runner(flags=["--ptipython"], name="PT-IPython")
    def get_ptipython(self, options):
        try:
            from ptpython.repl import run_config
            from ptpython.ipython import embed
        except ImportError:
            tb = traceback.format_exc()
            try:  # prompt_toolkit < v0.27
                from prompt_toolkit.contrib.repl import run_config
                from prompt_toolkit.contrib.ipython import embed
            except ImportError:
                return tb

        def run_ptipython():
            imported_objects = self.get_imported_objects(options)
            history_filename = os.path.expanduser("~/.ptpython_history")
            embed(
                user_ns=imported_objects,
                history_filename=history_filename,
                vi_mode=options["vi_mode"],
                configure=run_config,
            )

        return run_ptipython

    @shell_runner(flags=["--idle"], name="Idle")
    def get_idle(self, options):
        from idlelib.pyshell import main

        def run_idle():
            sys.argv = [
                sys.argv[0],
                "-c",
                """
from django_extensions.management import shells
from django.core.management.color import no_style
for k, m in shells.import_objects({}, no_style()).items():
    globals()[k] = m
""",
            ]
            main()

        return run_idle

    def set_application_name(self, options):
        """
        Set the application_name on PostgreSQL connection

        Use the fallback_application_name to let the user override
        it with PGAPPNAME env variable

        https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS
        """  # noqa: E501
        supported_backends = (
            "django.db.backends.postgresql",
            "django.db.backends.postgresql_psycopg2",
        )
        opt_name = "fallback_application_name"
        default_app_name = "django_shell"
        dbs = getattr(settings, "DATABASES", [])

        for connection in connections.all():
            alias = connection.alias
            mro = inspect.getmro(connection.__class__)
            if any(klass.__module__.startswith(supported_backends) for klass in mro):
                if "OPTIONS" not in dbs[alias] or opt_name not in dbs[alias]["OPTIONS"]:
                    dbs[alias].setdefault("OPTIONS", {}).update(
                        {opt_name: default_app_name}
                    )

    @signalcommand
    def handle(self, *args, **options):
        verbosity = options["verbosity"]
        get_runner = options["runner"]
        print_sql = getattr(settings, "SHELL_PLUS_PRINT_SQL", False)
        runner = None
        runner_name = None
        truncate = None if options["truncate_sql"] == 0 else options["truncate_sql"]

        with monkey_patch_cursordebugwrapper(
            print_sql=options["print_sql"] or print_sql,
            truncate=truncate,
            print_sql_location=options["print_sql_location"],
            confprefix="SHELL_PLUS",
        ):
            SETTINGS_SHELL_PLUS = getattr(settings, "SHELL_PLUS", None)

            def get_runner_by_flag(flag):
                for runner in self.runners:
                    if flag in runner.runner_flags:
                        return runner
                return None

            self.set_application_name(options)

            if not get_runner and SETTINGS_SHELL_PLUS:
                get_runner = get_runner_by_flag("--%s" % SETTINGS_SHELL_PLUS)
                if not get_runner:
                    runner = None
                    runner_name = SETTINGS_SHELL_PLUS

            if get_runner:
                runner = get_runner(options)
                runner_name = get_runner.runner_name
            else:

                def try_runner(get_runner):
                    runner_name = get_runner.runner_name
                    if verbosity > 2:
                        print(self.style.NOTICE("Trying: %s" % runner_name))

                    runner = get_runner(options)
                    if callable(runner):
                        if verbosity > 1:
                            print(self.style.NOTICE("Using: %s" % runner_name))
                        return runner
                    return None

                tried_runners = set()

                # try the runners that are least unexpected (normal shell runners)
                preferred_runners = [
                    "ptipython",
                    "ptpython",
                    "bpython",
                    "ipython",
                    "plain",
                ]
                for flag_suffix in preferred_runners:
                    get_runner = get_runner_by_flag("--%s" % flag_suffix)
                    tried_runners.add(get_runner)
                    runner = try_runner(get_runner)
                    if runner:
                        runner_name = get_runner.runner_name
                        break

                # try any remaining runners if needed
                if not runner:
                    for get_runner in self.runners:
                        if get_runner not in tried_runners:
                            runner = try_runner(get_runner)
                            if runner:
                                runner_name = get_runner.runner_name
                                break

            if not callable(runner):
                if runner:
                    print(runner)
                if not runner_name:
                    raise CommandError("No shell runner could be found.")
                raise CommandError("Could not load shell runner: '%s'." % runner_name)

            if self.tests_mode:
                return 130

            if options["command"]:
                imported_objects = self.get_imported_objects(options)
                exec(options["command"], imported_objects)
                return None

            runner()