1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
|
from __future__ import absolute_import, unicode_literals
import celery
import djcelery
import django
import sys
from django.core.management.base import BaseCommand
from djcelery.compat import setenv
DB_SHARED_THREAD = """\
DatabaseWrapper objects created in a thread can only \
be used in that same thread. The object with alias '{0}' \
was created in thread id {1} and this is thread id {2}.\
"""
VALIDATE_MODELS = not django.VERSION >= (1, 7)
def patch_thread_ident():
# monkey patch django.
# This patch make sure that we use real threads to get the ident which
# is going to happen if we are using gevent or eventlet.
# -- patch taken from gunicorn
if getattr(patch_thread_ident, 'called', False):
return
try:
from django.db.backends import BaseDatabaseWrapper, DatabaseError
if 'validate_thread_sharing' in BaseDatabaseWrapper.__dict__:
import thread
_get_ident = thread.get_ident
__old__init__ = BaseDatabaseWrapper.__init__
def _init(self, *args, **kwargs):
__old__init__(self, *args, **kwargs)
self._thread_ident = _get_ident()
def _validate_thread_sharing(self):
if (not self.allow_thread_sharing and
self._thread_ident != _get_ident()):
raise DatabaseError(
DB_SHARED_THREAD % (
self.alias, self._thread_ident, _get_ident()),
)
BaseDatabaseWrapper.__init__ = _init
BaseDatabaseWrapper.validate_thread_sharing = \
_validate_thread_sharing
patch_thread_ident.called = True
except ImportError:
pass
patch_thread_ident()
class CeleryCommand(BaseCommand):
options = ()
if hasattr(BaseCommand, 'option_list'):
options = BaseCommand.option_list
else:
def add_arguments(self, parser):
option_typemap = {
"string": str,
"int": int,
"float": float
}
for opt in self.option_list:
option = {k: v
for k, v in opt.__dict__.items()
if v is not None}
flags = (option.get("_long_opts", []) +
option.get("_short_opts", []))
del option["_long_opts"]
del option["_short_opts"]
if "type" in option:
opttype = option["type"]
option["type"] = option_typemap.get(opttype, opttype)
parser.add_argument(*flags, **option)
skip_opts = ['--app', '--loader', '--config', '--no-color']
requires_model_validation = VALIDATE_MODELS
keep_base_opts = False
stdout, stderr = sys.stdout, sys.stderr
def get_version(self):
return 'celery {c.__version__}\ndjango-celery {d.__version__}'.format(
c=celery, d=djcelery,
)
def execute(self, *args, **options):
broker = options.get('broker')
if broker:
self.set_broker(broker)
super(CeleryCommand, self).execute(*args, **options)
def set_broker(self, broker):
setenv('CELERY_BROKER_URL', broker)
def run_from_argv(self, argv):
self.handle_default_options(argv[2:])
return super(CeleryCommand, self).run_from_argv(argv)
def handle_default_options(self, argv):
acc = []
broker = None
for i, arg in enumerate(argv):
# --settings and --pythonpath are also handled
# by BaseCommand.handle_default_options, but that is
# called with the resulting options parsed by optparse.
if '--settings=' in arg:
_, settings_module = arg.split('=')
setenv('DJANGO_SETTINGS_MODULE', settings_module)
elif '--pythonpath=' in arg:
_, pythonpath = arg.split('=')
sys.path.insert(0, pythonpath)
elif '--broker=' in arg:
_, broker = arg.split('=')
elif arg == '-b':
broker = argv[i + 1]
else:
acc.append(arg)
if broker:
self.set_broker(broker)
return argv if self.keep_base_opts else acc
def die(self, msg):
sys.stderr.write(msg)
sys.stderr.write('\n')
sys.exit()
def _is_unwanted_option(self, option):
return option._long_opts and option._long_opts[0] in self.skip_opts
@property
def option_list(self):
return [x for x in self.options if not self._is_unwanted_option(x)]
|