1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
|
import json
import logging
import os
import re
import time
import pytest
from celery.contrib.pytest import celery_app, celery_session_worker
from celery.contrib.testing.manager import Manager
from t.integration.tasks import get_redis_connection
# we have to import the pytest plugin fixtures here,
# in case user did not do the `python setup.py develop` yet,
# that installs the pytest plugin into the setuptools registry.
logger = logging.getLogger(__name__)
TEST_BROKER = os.environ.get('TEST_BROKER', 'pyamqp://')
TEST_BACKEND = os.environ.get('TEST_BACKEND', 'redis://')
__all__ = (
'celery_app',
'celery_session_worker',
'get_active_redis_channels',
)
def get_active_redis_channels():
return get_redis_connection().execute_command('PUBSUB CHANNELS')
def check_for_logs(caplog, message: str, max_wait: float = 1.0, interval: float = 0.1) -> bool:
start_time = time.monotonic()
while time.monotonic() - start_time < max_wait:
if any(re.search(message, record.message) for record in caplog.records):
return True
time.sleep(interval)
return False
@pytest.fixture(scope='session')
def celery_config(request):
config = {
'broker_url': TEST_BROKER,
'result_backend': TEST_BACKEND,
'result_extended': True,
'cassandra_servers': ['localhost'],
'cassandra_keyspace': 'tests',
'cassandra_table': 'tests',
'cassandra_read_consistency': 'ONE',
'cassandra_write_consistency': 'ONE',
}
try:
# To override the default configuration, create the integration-tests-config.json file
# in Celery's root directory.
# The file must contain a dictionary of valid configuration name/value pairs.
with open(str(request.config.rootdir / "integration-tests-config.json")) as file:
overrides = json.load(file)
config.update(overrides)
except OSError:
pass
return config
@pytest.fixture(scope='session')
def celery_enable_logging():
return True
@pytest.fixture(scope='session')
def celery_worker_pool():
return 'prefork'
@pytest.fixture(scope='session')
def celery_includes():
return {'t.integration.tasks'}
@pytest.fixture
def app(celery_app):
yield celery_app
@pytest.fixture
def manager(app, celery_session_worker):
manager = Manager(app)
yield manager
try:
manager.wait_until_idle()
except Exception as e:
logger.warning("Failed to stop Celery test manager cleanly: %s", e)
@pytest.fixture(autouse=True)
def ZZZZ_set_app_current(app):
app.set_current()
app.set_default()
@pytest.fixture(scope='session')
def celery_class_tasks():
from t.integration.tasks import ClassBasedAutoRetryTask
return [ClassBasedAutoRetryTask]
|