1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
|
import os
import re
from unittest.mock import patch
import pytest
from click.testing import CliRunner
from celery.bin.celery import celery
from celery.platforms import EX_UNAVAILABLE
_GLOBAL_OPTIONS = ['-A', 't.unit.bin.proj.app_with_custom_cmds', '--broker', 'memory://']
_INSPECT_OPTIONS = ['--timeout', '0'] # Avoid waiting for the zero workers to reply
@pytest.fixture(autouse=True)
def clean_os_environ():
# Celery modifies os.environ when given the CLI option --broker memory://
# This interferes with other tests, so we need to reset os.environ
with patch.dict(os.environ, clear=True):
yield
@pytest.mark.parametrize(
('celery_cmd', 'custom_cmd'),
[
('inspect', ('custom_inspect_cmd', '123')),
('control', ('custom_control_cmd', '123', '456')),
],
)
def test_custom_remote_command(celery_cmd, custom_cmd, isolated_cli_runner: CliRunner):
res = isolated_cli_runner.invoke(
celery,
[*_GLOBAL_OPTIONS, celery_cmd, *_INSPECT_OPTIONS, *custom_cmd],
catch_exceptions=False,
)
assert res.exit_code == EX_UNAVAILABLE, (res, res.output)
assert res.output.strip() == 'Error: No nodes replied within time constraint'
@pytest.mark.parametrize(
('celery_cmd', 'remote_cmd'),
[
# Test nonexistent commands
('inspect', 'this_command_does_not_exist'),
('control', 'this_command_does_not_exist'),
# Test commands that exist, but are of the wrong type
('inspect', 'custom_control_cmd'),
('control', 'custom_inspect_cmd'),
],
)
def test_unrecognized_remote_command(celery_cmd, remote_cmd, isolated_cli_runner: CliRunner):
res = isolated_cli_runner.invoke(
celery,
[*_GLOBAL_OPTIONS, celery_cmd, *_INSPECT_OPTIONS, remote_cmd],
catch_exceptions=False,
)
assert res.exit_code == 2, (res, res.output)
assert f'Error: Command {remote_cmd} not recognized. Available {celery_cmd} commands: ' in res.output
_expected_inspect_regex = (
'\n custom_inspect_cmd x\\s+Ask the workers to reply with x\\.\n'
)
_expected_control_regex = (
'\n custom_control_cmd a b\\s+Ask the workers to reply with a and b\\.\n'
)
@pytest.mark.parametrize(
('celery_cmd', 'expected_regex'),
[
('inspect', re.compile(_expected_inspect_regex, re.MULTILINE)),
('control', re.compile(_expected_control_regex, re.MULTILINE)),
],
)
def test_listing_remote_commands(celery_cmd, expected_regex, isolated_cli_runner: CliRunner):
res = isolated_cli_runner.invoke(
celery,
[*_GLOBAL_OPTIONS, celery_cmd, '--list'],
)
assert res.exit_code == 0, (res, res.stdout)
assert expected_regex.search(res.stdout)
|