File: test_integration__io_uri_manager.py

package info (click to toggle)
python-rioxarray 0.19.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 7,304 kB
  • sloc: python: 7,893; makefile: 93
file content (139 lines) | stat: -rw-r--r-- 3,556 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
"""
Tests based on: https://github.com/pydata/xarray/blob/071da2a900702d65c47d265192bc7e424bb57932/xarray/tests/test_backends_file_manager.py
"""
import concurrent.futures
import gc
import pickle
from unittest import mock

import pytest

from rioxarray._io import URIManager


def test_uri_manager_mock_write():
    mock_file = mock.Mock()
    opener = mock.Mock(spec=open, return_value=mock_file)

    manager = URIManager(opener, "filename")
    f = manager.acquire()
    f.write("contents")
    manager.close()

    opener.assert_called_once_with("filename", mode="r")
    mock_file.write.assert_called_once_with("contents")
    mock_file.close.assert_called_once_with()


def test_uri_manager_mock_write__threaded():
    mock_file = mock.Mock()
    opener = mock.Mock(spec=open, return_value=mock_file)

    manager = URIManager(opener, "filename")

    def write(iter):
        nonlocal manager
        fh = manager.acquire()
        fh.write("contents")
        manager._local.thread_manager = None

    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        for result in executor.map(write, range(5)):
            pass

    gc.collect()

    opener.assert_has_calls([mock.call("filename", mode="r") for _ in range(5)])
    mock_file.write.assert_has_calls([mock.call("contents") for _ in range(5)])
    mock_file.close.assert_has_calls([mock.call() for _ in range(5)])


@pytest.mark.parametrize("expected_warning", [None, RuntimeWarning])
def test_uri_manager_autoclose(expected_warning):
    mock_file = mock.Mock()
    opener = mock.Mock(return_value=mock_file)

    manager = URIManager(opener, "filename")
    manager.acquire()

    del manager
    gc.collect()

    mock_file.close.assert_called_once_with()


def test_uri_manager_write_concurrent(tmpdir):
    path = str(tmpdir.join("testing.txt"))
    manager = URIManager(open, path, mode="w")
    f1 = manager.acquire()
    f2 = manager.acquire()
    f3 = manager.acquire()
    assert f1 is f2
    assert f2 is f3
    f1.write("foo")
    f1.flush()
    f2.write("bar")
    f2.flush()
    f3.write("baz")
    f3.flush()

    del manager
    gc.collect()

    with open(path) as f:
        assert f.read() == "foobarbaz"


def test_uri_manager_write_pickle(tmpdir):
    path = str(tmpdir.join("testing.txt"))
    manager = URIManager(open, path, mode="a")
    f = manager.acquire()
    f.write("foo")
    f.flush()
    manager2 = pickle.loads(pickle.dumps(manager))
    f2 = manager2.acquire()
    f2.write("bar")
    del manager
    del manager2
    gc.collect()

    with open(path) as f:
        assert f.read() == "foobar"


def test_uri_manager_read(tmpdir):
    path = str(tmpdir.join("testing.txt"))

    with open(path, "w") as f:
        f.write("foobar")

    manager = URIManager(open, path)
    f = manager.acquire()
    assert f.read() == "foobar"
    manager.close()


def test_uri_manager_acquire_context(tmpdir):
    path = str(tmpdir.join("testing.txt"))

    with open(path, "w") as f:
        f.write("foobar")

    class AcquisitionError(Exception):
        pass

    manager = URIManager(open, path)
    with pytest.raises(AcquisitionError):
        with manager.acquire_context() as f:
            assert f.read() == "foobar"
            raise AcquisitionError

    with manager.acquire_context() as f:
        assert f.read() == "foobar"

    with pytest.raises(AcquisitionError):
        with manager.acquire_context() as f:
            f.seek(0)
            assert f.read() == "foobar"
            raise AcquisitionError
    manager.close()