1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
|
"""Keys and certificates for tests (KEY1 is a private key of CERT1, etc.)
Generated with:
.. code-block:: console
$ openssl genrsa -des3 -passout pass:test -out key1.key 1024
$ openssl req -new -key key1.key -out key1.csr -passin pass:test
$ cp key1.key key1.key.org
$ openssl rsa -in key1.key.org -out key1.key -passin pass:test
$ openssl x509 -req -days 365 -in cert1.csr \
-signkey key1.key -out cert1.crt
$ rm key1.key.org cert1.csr
"""
import builtins
import os
import tempfile
from unittest.mock import Mock, patch
import sys
import pytest
from kombu.exceptions import SerializerNotInstalled
from kombu.serialization import disable_insecure_serializers, registry
from celery.exceptions import ImproperlyConfigured, SecurityError
from celery.security import disable_untrusted_serializers, setup_security
from celery.security.utils import reraise_errors
from t.unit import conftest
from . import CERT1, ENCKEY1, KEY1, KEYPASSWORD
from .case import SecurityCase
class test_security(SecurityCase):
def teardown_method(self):
registry._disabled_content_types.clear()
registry._set_default_serializer('json')
try:
registry.unregister('auth')
except SerializerNotInstalled:
pass
def test_disable_insecure_serializers(self):
try:
disabled = registry._disabled_content_types
assert disabled
disable_insecure_serializers(
['application/json', 'application/x-python-serialize'],
)
assert 'application/x-yaml' in disabled
assert 'application/json' not in disabled
assert 'application/x-python-serialize' not in disabled
disabled.clear()
disable_insecure_serializers(allowed=None)
assert 'application/x-yaml' in disabled
assert 'application/json' in disabled
assert 'application/x-python-serialize' in disabled
finally:
disable_insecure_serializers(allowed=['json'])
@patch('celery.security._disable_insecure_serializers')
def test_disable_untrusted_serializers(self, disable):
disable_untrusted_serializers(['foo'])
disable.assert_called_with(allowed=['foo'])
def test_setup_security(self):
with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp_key1:
tmp_key1.write(KEY1)
with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp_cert1:
tmp_cert1.write(CERT1)
self.app.conf.update(
task_serializer='auth',
accept_content=['auth'],
security_key=tmp_key1.name,
security_certificate=tmp_cert1.name,
security_cert_store='*.pem',
)
self.app.setup_security()
os.remove(tmp_key1.name)
os.remove(tmp_cert1.name)
def test_setup_security_encrypted_key_file(self):
with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp_key1:
tmp_key1.write(ENCKEY1)
with tempfile.NamedTemporaryFile(mode='w', delete=False) as tmp_cert1:
tmp_cert1.write(CERT1)
self.app.conf.update(
task_serializer='auth',
accept_content=['auth'],
security_key=tmp_key1.name,
security_key_password=KEYPASSWORD,
security_certificate=tmp_cert1.name,
security_cert_store='*.pem',
)
self.app.setup_security()
os.remove(tmp_key1.name)
os.remove(tmp_cert1.name)
def test_setup_security_disabled_serializers(self):
disabled = registry._disabled_content_types
assert len(disabled) == 0
self.app.conf.task_serializer = 'json'
with pytest.raises(ImproperlyConfigured):
self.app.setup_security()
assert 'application/x-python-serialize' in disabled
disabled.clear()
self.app.conf.task_serializer = 'auth'
with pytest.raises(ImproperlyConfigured):
self.app.setup_security()
assert 'application/json' in disabled
disabled.clear()
@patch('celery.current_app')
@pytest.mark.skipif(sys.version_info > (3, 8), reason='breaks with python 3.8')
def test_setup_security__default_app(self, current_app):
with pytest.raises(ImproperlyConfigured):
setup_security()
@patch('celery.security.register_auth')
@patch('celery.security._disable_insecure_serializers')
def test_setup_registry_complete(self, dis, reg, key='KEY', cert='CERT'):
calls = [0]
def effect(*args):
try:
m = Mock()
m.read.return_value = 'B' if calls[0] else 'A'
return m
finally:
calls[0] += 1
self.app.conf.task_serializer = 'auth'
self.app.conf.accept_content = ['auth']
with conftest.open(side_effect=effect):
with patch('celery.security.registry') as registry:
store = Mock()
self.app.setup_security(['json'], key, None, cert, store)
dis.assert_called_with(['json'])
reg.assert_called_with('A', None, 'B', store, 'sha256', 'json')
registry._set_default_serializer.assert_called_with('auth')
def test_security_conf(self):
self.app.conf.task_serializer = 'auth'
with pytest.raises(ImproperlyConfigured):
self.app.setup_security()
self.app.conf.accept_content = ['auth']
with pytest.raises(ImproperlyConfigured):
self.app.setup_security()
_import = builtins.__import__
def import_hook(name, *args, **kwargs):
if name == 'cryptography':
raise ImportError
return _import(name, *args, **kwargs)
builtins.__import__ = import_hook
with pytest.raises(ImproperlyConfigured):
self.app.setup_security()
builtins.__import__ = _import
def test_reraise_errors(self):
with pytest.raises(SecurityError):
with reraise_errors(errors=(KeyError,)):
raise KeyError('foo')
with pytest.raises(KeyError):
with reraise_errors(errors=(ValueError,)):
raise KeyError('bar')
|