File: test_certificate.py

package info (click to toggle)
celery 5.5.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,008 kB
  • sloc: python: 64,346; sh: 795; makefile: 378
file content (109 lines) | stat: -rw-r--r-- 3,377 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import datetime
import os
from unittest.mock import Mock, patch

import pytest

from celery.exceptions import SecurityError
from celery.security.certificate import Certificate, CertStore, FSCertStore
from t.unit import conftest

from . import CERT1, CERT2, CERT_ECDSA, KEY1
from .case import SecurityCase


class test_Certificate(SecurityCase):

    def test_valid_certificate(self):
        Certificate(CERT1)
        Certificate(CERT2)

    def test_invalid_certificate(self):
        with pytest.raises((SecurityError, TypeError)):
            Certificate(None)
        with pytest.raises(SecurityError):
            Certificate('')
        with pytest.raises(SecurityError):
            Certificate('foo')
        with pytest.raises(SecurityError):
            Certificate(CERT1[:20] + CERT1[21:])
        with pytest.raises(SecurityError):
            Certificate(KEY1)
        with pytest.raises(SecurityError):
            Certificate(CERT_ECDSA)

    @pytest.mark.skip('TODO: cert expired')
    def test_has_expired(self):
        assert not Certificate(CERT1).has_expired()

    def test_has_expired_mock(self):
        x = Certificate(CERT1)

        x._cert = Mock(name='cert')
        time_after = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=-1)
        x._cert.not_valid_after_utc = time_after

        assert x.has_expired() is True

    def test_has_not_expired_mock(self):
        x = Certificate(CERT1)

        x._cert = Mock(name='cert')
        time_after = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=1)
        x._cert.not_valid_after_utc = time_after

        assert x.has_expired() is False


class test_CertStore(SecurityCase):

    def test_itercerts(self):
        cert1 = Certificate(CERT1)
        cert2 = Certificate(CERT2)
        certstore = CertStore()
        for c in certstore.itercerts():
            assert False
        certstore.add_cert(cert1)
        certstore.add_cert(cert2)
        for c in certstore.itercerts():
            assert c in (cert1, cert2)

    def test_duplicate(self):
        cert1 = Certificate(CERT1)
        certstore = CertStore()
        certstore.add_cert(cert1)
        with pytest.raises(SecurityError):
            certstore.add_cert(cert1)


class test_FSCertStore(SecurityCase):

    @patch('os.path.isdir')
    @patch('glob.glob')
    @patch('celery.security.certificate.Certificate')
    def test_init(self, Certificate, glob, isdir):
        cert = Certificate.return_value = Mock()
        cert.has_expired.return_value = False
        isdir.return_value = True
        glob.return_value = ['foo.cert']
        with conftest.open():
            cert.get_id.return_value = 1

            path = os.path.join('var', 'certs')
            x = FSCertStore(path)
            assert 1 in x._certs
            glob.assert_called_with(os.path.join(path, '*'))

            # they both end up with the same id
            glob.return_value = ['foo.cert', 'bar.cert']
            with pytest.raises(SecurityError):
                x = FSCertStore(path)
            glob.return_value = ['foo.cert']

            cert.has_expired.return_value = True
            with pytest.raises(SecurityError):
                x = FSCertStore(path)

            isdir.return_value = False
            with pytest.raises(SecurityError):
                x = FSCertStore(path)