File: base.py

package info (click to toggle)
django-celery 3.1.17-4
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 916 kB
  • ctags: 1,135
  • sloc: python: 4,149; makefile: 88; sh: 64
file content (139 lines) | stat: -rw-r--r-- 4,685 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from __future__ import absolute_import, unicode_literals

import celery
import djcelery
import django
import sys

from django.core.management.base import BaseCommand

from djcelery.compat import setenv

DB_SHARED_THREAD = """\
DatabaseWrapper objects created in a thread can only \
be used in that same thread.  The object with alias '{0}' \
was created in thread id {1} and this is thread id {2}.\
"""

VALIDATE_MODELS = not django.VERSION >= (1, 7)


def patch_thread_ident():
    # monkey patch django.
    # This patch make sure that we use real threads to get the ident which
    # is going to happen if we are using gevent or eventlet.
    # -- patch taken from gunicorn
    if getattr(patch_thread_ident, 'called', False):
        return
    try:
        from django.db.backends import BaseDatabaseWrapper, DatabaseError

        if 'validate_thread_sharing' in BaseDatabaseWrapper.__dict__:
            import thread
            _get_ident = thread.get_ident

            __old__init__ = BaseDatabaseWrapper.__init__

            def _init(self, *args, **kwargs):
                __old__init__(self, *args, **kwargs)
                self._thread_ident = _get_ident()

            def _validate_thread_sharing(self):
                if (not self.allow_thread_sharing and
                        self._thread_ident != _get_ident()):
                    raise DatabaseError(
                        DB_SHARED_THREAD % (
                            self.alias, self._thread_ident, _get_ident()),
                    )

            BaseDatabaseWrapper.__init__ = _init
            BaseDatabaseWrapper.validate_thread_sharing = \
                _validate_thread_sharing

        patch_thread_ident.called = True
    except ImportError:
        pass
patch_thread_ident()


class CeleryCommand(BaseCommand):
    options = ()
    if hasattr(BaseCommand, 'option_list'):
        options = BaseCommand.option_list
    else:
        def add_arguments(self, parser):
            option_typemap = {
                "string": str,
                "int": int,
                "float": float
            }
            for opt in self.option_list:
                option = {k: v
                          for k, v in opt.__dict__.items()
                          if v is not None}
                flags = (option.get("_long_opts", []) +
                         option.get("_short_opts", []))
                del option["_long_opts"]
                del option["_short_opts"]
                if "type" in option:
                    opttype = option["type"]
                    option["type"] = option_typemap.get(opttype, opttype)
                parser.add_argument(*flags, **option)

    skip_opts = ['--app', '--loader', '--config', '--no-color']
    requires_model_validation = VALIDATE_MODELS
    keep_base_opts = False
    stdout, stderr = sys.stdout, sys.stderr

    def get_version(self):
        return 'celery {c.__version__}\ndjango-celery {d.__version__}'.format(
            c=celery, d=djcelery,
        )

    def execute(self, *args, **options):
        broker = options.get('broker')
        if broker:
            self.set_broker(broker)
        super(CeleryCommand, self).execute(*args, **options)

    def set_broker(self, broker):
        setenv('CELERY_BROKER_URL', broker)

    def run_from_argv(self, argv):
        self.handle_default_options(argv[2:])
        return super(CeleryCommand, self).run_from_argv(argv)

    def handle_default_options(self, argv):
        acc = []
        broker = None
        for i, arg in enumerate(argv):
            # --settings and --pythonpath are also handled
            # by BaseCommand.handle_default_options, but that is
            # called with the resulting options parsed by optparse.
            if '--settings=' in arg:
                _, settings_module = arg.split('=')
                setenv('DJANGO_SETTINGS_MODULE', settings_module)
            elif '--pythonpath=' in arg:
                _, pythonpath = arg.split('=')
                sys.path.insert(0, pythonpath)
            elif '--broker=' in arg:
                _, broker = arg.split('=')
            elif arg == '-b':
                broker = argv[i + 1]
            else:
                acc.append(arg)
        if broker:
            self.set_broker(broker)
        return argv if self.keep_base_opts else acc

    def die(self, msg):
        sys.stderr.write(msg)
        sys.stderr.write('\n')
        sys.exit()

    def _is_unwanted_option(self, option):
        return option._long_opts and option._long_opts[0] in self.skip_opts

    @property
    def option_list(self):
        return [x for x in self.options if not self._is_unwanted_option(x)]