File: test_filesystem.py

package info (click to toggle)
celery 5.5.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,008 kB
  • sloc: python: 64,346; sh: 795; makefile: 378
file content (130 lines) | stat: -rw-r--r-- 4,435 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import os
import pickle
import sys
import tempfile
import time
from unittest.mock import patch

import pytest

import t.skip
from celery import states, uuid
from celery.backends import filesystem
from celery.backends.filesystem import FilesystemBackend
from celery.exceptions import ImproperlyConfigured


@t.skip.if_win32
class test_FilesystemBackend:

    def setup_method(self):
        self.directory = tempfile.mkdtemp()
        self.url = 'file://' + self.directory
        self.path = self.directory.encode('ascii')

    def test_a_path_is_required(self):
        with pytest.raises(ImproperlyConfigured):
            FilesystemBackend(app=self.app)

    def test_a_path_in_url(self):
        tb = FilesystemBackend(app=self.app, url=self.url)
        assert tb.path == self.path

    @pytest.mark.parametrize("url,expected_error_message", [
        ('file:///non-existing', filesystem.E_PATH_INVALID),
        ('url://non-conforming', filesystem.E_PATH_NON_CONFORMING_SCHEME),
        (None, filesystem.E_NO_PATH_SET)
    ])
    def test_raises_meaningful_errors_for_invalid_urls(
        self,
        url,
        expected_error_message
    ):
        with pytest.raises(
            ImproperlyConfigured,
            match=expected_error_message
        ):
            FilesystemBackend(app=self.app, url=url)

    def test_localhost_is_removed_from_url(self):
        url = 'file://localhost' + self.directory
        tb = FilesystemBackend(app=self.app, url=url)
        assert tb.path == self.path

    def test_missing_task_is_PENDING(self):
        tb = FilesystemBackend(app=self.app, url=self.url)
        assert tb.get_state('xxx-does-not-exist') == states.PENDING

    def test_mark_as_done_writes_file(self):
        tb = FilesystemBackend(app=self.app, url=self.url)
        tb.mark_as_done(uuid(), 42)
        assert len(os.listdir(self.directory)) == 1

    def test_done_task_is_SUCCESS(self):
        tb = FilesystemBackend(app=self.app, url=self.url)
        tid = uuid()
        tb.mark_as_done(tid, 42)
        assert tb.get_state(tid) == states.SUCCESS

    def test_correct_result(self):
        data = {'foo': 'bar'}

        tb = FilesystemBackend(app=self.app, url=self.url)
        tid = uuid()
        tb.mark_as_done(tid, data)
        assert tb.get_result(tid) == data

    def test_get_many(self):
        data = {uuid(): 'foo', uuid(): 'bar', uuid(): 'baz'}

        tb = FilesystemBackend(app=self.app, url=self.url)
        for key, value in data.items():
            tb.mark_as_done(key, value)

        for key, result in tb.get_many(data.keys()):
            assert result['result'] == data[key]

    def test_forget_deletes_file(self):
        tb = FilesystemBackend(app=self.app, url=self.url)
        tid = uuid()
        tb.mark_as_done(tid, 42)
        tb.forget(tid)
        assert len(os.listdir(self.directory)) == 0

    @pytest.mark.usefixtures('depends_on_current_app')
    def test_pickleable(self):
        tb = FilesystemBackend(app=self.app, url=self.url, serializer='pickle')
        assert pickle.loads(pickle.dumps(tb))

    @pytest.mark.skipif(sys.platform == 'win32', reason='Test can fail on '
                        'Windows/FAT due to low granularity of st_mtime')
    def test_cleanup(self):
        tb = FilesystemBackend(app=self.app, url=self.url)
        yesterday_task_ids = [uuid() for i in range(10)]
        today_task_ids = [uuid() for i in range(10)]
        for tid in yesterday_task_ids:
            tb.mark_as_done(tid, 42)
        day_length = 0.2
        time.sleep(day_length)  # let FS mark some difference in mtimes
        for tid in today_task_ids:
            tb.mark_as_done(tid, 42)
        with patch.object(tb, 'expires', 0):
            tb.cleanup()
        # test that zero expiration time prevents any cleanup
        filenames = set(os.listdir(tb.path))
        assert all(
            tb.get_key_for_task(tid) in filenames
            for tid in yesterday_task_ids + today_task_ids
        )
        # test that non-zero expiration time enables cleanup by file mtime
        with patch.object(tb, 'expires', day_length):
            tb.cleanup()
        filenames = set(os.listdir(tb.path))
        assert not any(
            tb.get_key_for_task(tid) in filenames
            for tid in yesterday_task_ids
        )
        assert all(
            tb.get_key_for_task(tid) in filenames
            for tid in today_task_ids
        )