File: test.py

package info (click to toggle)
postgresql-multicorn 1.0.4-1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 864 kB
  • ctags: 611
  • sloc: ansic: 2,690; python: 1,829; sql: 645; makefile: 93; sh: 29
file content (368 lines) | stat: -rw-r--r-- 11,543 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
# coding: utf8

"""

Tests for StructuredFS.

"""


import os
import sys
import functools
import tempfile
import shutil
from contextlib import contextmanager
from multicorn.compat import unicode_
import pytest

from .structuredfs import StructuredDirectory, Item
from .docutils_meta import mtime_lru_cache, extract_meta


def with_tempdir(function):
    @functools.wraps(function)
    def wrapper():
        directory = tempfile.mkdtemp()
        try:
            return function(directory)
        finally:
            shutil.rmtree(directory)
    return wrapper


@contextmanager
def assert_raises(exception_class, message_part):
    """
    Check that an exception is raised and its message contains some string.
    """
    try:
        yield
    except exception_class as exception:
        assert message_part.lower() in exception.args[0].lower()
    else:
        assert 0, 'Did not raise %s' % exception_class


@with_tempdir
def test_parser(tempdir):
    """
    Test the pattern parser.
    """
    make = functools.partial(StructuredDirectory, tempdir)

    with assert_raises(ValueError, 'slash-separated part is empty'):
        assert make('')
    with assert_raises(ValueError, 'slash-separated part is empty'):
        assert make('/a')
    with assert_raises(ValueError, 'slash-separated part is empty'):
        assert make('a/')
    with assert_raises(ValueError, 'slash-separated part is empty'):
        assert make('a//b')
    with assert_raises(ValueError, 'more than once'):
        assert make('{foo}/{foo}')
    with assert_raises(ValueError, 'Invalid property name'):
        assert make('{}')
    with assert_raises(ValueError, 'Invalid property name'):
        assert make('{0foo}')
    with assert_raises(ValueError, 'Invalid property name'):
        assert make('{foo/bar}')
    with assert_raises(ValueError, 'Invalid property name'):
        assert make('{foo!r}')
    with assert_raises(ValueError, 'Invalid property name'):
        assert make('{foo:s}')
    with assert_raises(ValueError, "unmatched '{'"):
        assert make('foo{bar')
    with assert_raises(ValueError, "single '}'"):
        assert make('foo}bar')

    bin = make('{category}/{num}_{name}.bin')
    assert bin.properties == set(['category', 'num', 'name'])
    assert bin._path_parts_properties == (('category',), ('num', 'name'))

    bin = make('{category}/{{num}}_{name}.bin')
    assert bin.properties == set(['category', 'name'])
    assert bin._path_parts_properties == (('category',), ('name',))
    assert [regex.pattern for regex in bin._path_parts_re] \
        == ['^(?P<category>.*)$', r'^\{num\}\_(?P<name>.*)\.bin$']


@with_tempdir
def test_filenames(tempdir):
    binary = StructuredDirectory(tempdir, '{category}/{num}_{name}.bin')
    text = StructuredDirectory(tempdir, '{category}/{num}_{name}.txt')

    # No file created yet
    assert os.listdir(tempdir) == []

    # Create some files
    for path_parts in [
            # Matching the pattern
            ['lipsum', '4_foo.bin'],
            ['lipsum', '4_foo.txt'],

            # Not matching the pattern
            ['lipsum', '4_foo'],
            ['lipsum', '4-foo.txt'],
            ['lipsum', '4_bar.txt', 'baz'],
            ['lipsum', '4'],
            ['dolor']]:
        filename = os.path.join(tempdir, *path_parts)
        dirname = os.path.dirname(filename)
        # Create parent directories as needed
        if not os.path.exists(dirname):
            os.makedirs(dirname)
        # Create an empty file
        open(filename, 'wb').close()

    assert [i.filename for i in text.get_items()] == ['lipsum/4_foo.txt']
    assert [i.filename for i in binary.get_items()] == ['lipsum/4_foo.bin']


@with_tempdir
def test_items(tempdir):
    """
    Test the :class:`Item` class.
    """
    binary = StructuredDirectory(tempdir, '{category}/{num}_{name}.bin')
    text = StructuredDirectory(tempdir, '{category}/{num}_{name}.txt')

    with assert_raises(ValueError, 'Missing properties'):
        text.create(category='lipsum')

    with assert_raises(ValueError, 'Unknown properties'):
        text.create(category='lipsum', num='4', name='foo', bang='bar')

    with assert_raises(TypeError, 'must be of type unicode'):
        text.create(category='lipsum', num=4, name='foo')

    with assert_raises(ValueError, 'can not contain a slash'):
        text.create(category='lipsum', num='4', name='foo/bar')

    values = dict(category='lipsum', num='4', name='foo')
    assert Item(binary, values).filename == 'lipsum/4_foo.bin'
    assert Item(text, values).filename == 'lipsum/4_foo.txt'

    # No file created yet
    assert os.listdir(tempdir) == []

    # Create a file directly
    os.mkdir(os.path.join(text.root_dir, 'lipsum'))
    open(os.path.join(text.root_dir, 'lipsum', '4_foo.txt'), 'wb').close()

    # Create a file from an Item
    text.create(category='lipsum', num='5', name='bar').write('BAR')

    item_foo, item_bar, = sorted(text.get_items(),
                                 key=lambda item: item['num'])
    assert len(item_foo) == 3
    assert dict(item_foo) == dict(category='lipsum', num='4', name='foo')
    assert item_foo.read() == ''

    assert len(item_bar) == 3
    assert dict(item_bar) == dict(category='lipsum', num='5', name='bar')
    assert item_bar.read() == 'BAR'

    content = b'Hello,\xc2\xa0W\xc3\xb6rld!'.decode('utf-8')
    with pytest.raises(UnicodeError):
        item_foo.write(content)
    item_foo.write(content.encode('utf8'))
    assert item_foo.read().decode('utf8') == content
    item_foo.remove()
    with pytest.raises(IOError):
        item_foo.read()
    with pytest.raises(OSError):
        item_foo.remove()

    assert [i.filename for i in text.get_items()] == ['lipsum/5_bar.txt']
    item_bar.remove()
    assert [i.filename for i in text.get_items()] == []
    # The 'lipsum' directory was also removed
    assert os.listdir(tempdir) == []


@with_tempdir
def test_get_items(tempdir):
    """
    Test the results of :meth:`StructuredDirectory.get_items`
    """
    text = StructuredDirectory(tempdir, '{category}/{num}_{name}.txt')

    text.create(category='lipsum', num='4', name='foo').write('FOO')
    text.create(category='lipsum', num='5', name='bar').write('BAR')

    def filenames(**properties):
        return [i.filename for i in text.get_items(**properties)]

    assert filenames(num='9') == []
    assert filenames(num='5', name='UUU') == []
    assert filenames(num='5') == ['lipsum/5_bar.txt']
    assert filenames(num='5', name='bar') == ['lipsum/5_bar.txt']
    assert sorted(filenames()) == ['lipsum/4_foo.txt', 'lipsum/5_bar.txt']

    with assert_raises(ValueError, 'Unknown properties'):
        filenames(fiz='5')


@with_tempdir
def test_from_filename(tempdir):
    """
    Test the results of :meth:`StructuredDirectory.from_filename`
    """
    text = StructuredDirectory(tempdir, '{category}/{num}_{name}.txt')

    assert text.from_filename('lipsum/4_foo.txt/bar') is None
    assert text.from_filename('lipsum') is None
    assert text.from_filename('lipsum/4') is None
    assert text.from_filename('lipsum/4_foo.bin') is None
    matching = text.from_filename('lipsum/4_foo.txt')
    assert dict(matching) == dict(category='lipsum', num='4', name='foo')
    assert matching.filename == 'lipsum/4_foo.txt'


@with_tempdir
def test_optimizations(tempdir):
    """
    Test that :meth:`StructuredDirectory.get_items` doesn’t do more calls
    to :func:`os.listdir` than needed.
    """
    text = StructuredDirectory(tempdir, '{cat}/{org}_{name}/{id}')

    listed = []
    real_listdir = text._listdir

    def listdir_mock(parts):
        listed.append('/'.join(parts))
        return real_listdir(parts)

    text._listdir = listdir_mock

    contents = {}

    def create(**values):
        item = Item(text, values)
        assert values['id'] not in contents  # Make sure ids are unique
        content = item.filename.encode('ascii')
        item.write(content)
        contents[values['id']] = content

    def assert_listed(properties, expected_ids, expected_listed):
        del listed[:]
        expected_contents = set(contents[num] for num in expected_ids)
        results = [item.read() for item in text.get_items(**properties)]
        assert set(results) == expected_contents
        assert set(listed) == set(expected_listed)

    create(cat='lipsum', org='a', name='foo', id='1')

    # No fixed values: all directories on the path are listed.
    assert_listed(dict(),
        ['1'],
        ['', 'lipsum', 'lipsum/a_foo'])

    # The category was fixed, no need to listdir() the root.
    assert_listed(dict(cat='lipsum'),
        ['1'],
        ['lipsum', 'lipsum/a_foo'])

    # The num and name were fixed, no need to listdir() the lipsum dir.
    assert_listed(dict(org='a', name='foo'),
        ['1'],
        ['', 'lipsum/a_foo'])

    # All filename properties were fixed, no need to listdir() anything
    assert_listed(dict(cat='lipsum', org='a', name='foo', id='1'),
        ['1'],
        [])

    create(cat='lipsum', org='b', name='foo', id='2')
    create(cat='dolor', org='c', name='bar', id='3')

    assert_listed(dict(),
        ['1', '2', '3'],
        ['', 'lipsum', 'dolor', 'lipsum/a_foo', 'lipsum/b_foo', 'dolor/c_bar'])

    # No need to listdir() the root
    assert_listed(dict(cat='lipsum'),
        ['1', '2'],
        ['lipsum', 'lipsum/a_foo', 'lipsum/b_foo'])

    # No need to listdir() the root
    assert_listed(dict(cat='dolor'),
        ['3'],
        ['dolor', 'dolor/c_bar'])

    # org='b' is not a whole part so we still need to listdir() lipsum,
    # but can filter out some deeper directories
    assert_listed(dict(org='b'),
        ['2'],
        ['', 'lipsum', 'dolor', 'lipsum/b_foo'])

    # Does not list the root and directry tries to list 'nonexistent'
    assert_listed(dict(cat='nonexistent'),
        [],
        ['nonexistent'])


@with_tempdir
def test_docutils_meta(tempdir):
    def counting(filename):
        counting.n_calls += 1
        return extract_meta(filename)
    counting.n_calls = 0
    wrapper = mtime_lru_cache(counting, max_size=2)
    def extract(filename):
        return wrapper(os.path.join(tempdir, filename))
    rest_1 = '''
The main title
==============

Second title
------------

:Author: Me

Content
'''
    meta_1 = {'title': 'The main title', 'subtitle': 'Second title',
              'author': 'Me'}
    rest_2 = '''
First title
===========

:Author: Myself
:Summary:
    Lorem ipsum
    dolor sit amet

Not a subtitle
--------------

Content
'''
    meta_2 = {'title': 'First title', 'author': 'Myself',
              'summary': 'Lorem ipsum\ndolor sit amet'}
    def write(filename, content):
        with open(os.path.join(tempdir, filename), 'w') as file_obj:
            file_obj.write(content)
    write('first.rst', rest_1)
    write('second.rst', rest_2)
    assert counting.n_calls == 0
    assert extract('first.rst') == meta_1
    assert counting.n_calls == 1
    assert extract('first.rst') == meta_1  # cached
    assert counting.n_calls == 1
    assert extract('second.rst') == meta_2
    assert counting.n_calls == 2
    write('third.rst', rest_1)
    assert extract('third.rst') == meta_1  # Exceeds the cache size
    assert counting.n_calls == 3
    write('third.rst', rest_2)
    assert extract('third.rst') == meta_2
    assert counting.n_calls == 4
    assert extract('first.rst') == meta_1  # Not cached anymore
    assert counting.n_calls == 5


if __name__ == '__main__':
    pytest.main([__file__] + sys.argv)