1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
|
import json
import os
import pytest
# we have to import the pytest plugin fixtures here,
# in case user did not do the `python setup.py develop` yet,
# that installs the pytest plugin into the setuptools registry.
from celery.contrib.pytest import celery_app, celery_session_worker
from celery.contrib.testing.manager import Manager
from t.integration.tasks import get_redis_connection
TEST_BROKER = os.environ.get('TEST_BROKER', 'pyamqp://')
TEST_BACKEND = os.environ.get('TEST_BACKEND', 'redis://')
# Tricks flake8 into silencing redefining fixtures warnings.
__all__ = (
'celery_app',
'celery_session_worker',
'get_active_redis_channels',
)
def get_active_redis_channels():
return get_redis_connection().execute_command('PUBSUB CHANNELS')
@pytest.fixture(scope='session')
def celery_config(request):
config = {
'broker_url': TEST_BROKER,
'result_backend': TEST_BACKEND,
'cassandra_servers': ['localhost'],
'cassandra_keyspace': 'tests',
'cassandra_table': 'tests',
'cassandra_read_consistency': 'ONE',
'cassandra_write_consistency': 'ONE',
'result_extended': True
}
try:
# To override the default configuration, create the integration-tests-config.json file
# in Celery's root directory.
# The file must contain a dictionary of valid configuration name/value pairs.
config_overrides = json.load(open(str(request.config.rootdir / "integration-tests-config.json")))
config.update(config_overrides)
except OSError:
pass
return config
@pytest.fixture(scope='session')
def celery_enable_logging():
return True
@pytest.fixture(scope='session')
def celery_worker_pool():
return 'prefork'
@pytest.fixture(scope='session')
def celery_includes():
return {'t.integration.tasks'}
@pytest.fixture
def app(celery_app):
yield celery_app
@pytest.fixture
def manager(app, celery_session_worker):
manager = Manager(app)
yield manager
manager.wait_until_idle()
@pytest.fixture(autouse=True)
def ZZZZ_set_app_current(app):
app.set_current()
app.set_default()
@pytest.fixture(scope='session')
def celery_class_tasks():
from t.integration.tasks import ClassBasedAutoRetryTask
return [ClassBasedAutoRetryTask]
|