File: mock_fs.py

package info (click to toggle)
pcs 0.12.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 16,148 kB
  • sloc: python: 238,810; xml: 20,833; ruby: 13,203; makefile: 1,595; sh: 484
file content (134 lines) | stat: -rw-r--r-- 4,165 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import site

from pcs_test import PROJECT_ROOT

CALL_TYPE_FS = "CALL_TYPE_FS"

_FUNC_ARGS = {
    "open": ["name", "mode", "buffering"],
    "os.path.exists": ["path"],
    "os.path.isdir": ["path"],
    "os.path.isfile": ["path"],
    "os.chmod": ["fd", "mode"],
    "os.chown": ["fd", "uid", "gid"],
    "os.listdir": ["path"],
    "shutil.rmtree": ["path"],
    "os.makedirs": ["path", "mode"],
}


def _ensure_consistent_args(func_name, call_args, call_kwargs):
    if len(call_args) > len(_FUNC_ARGS[func_name]):
        raise AssertionError(
            "{0}() too many positional arguments ({1} > {2})".format(
                func_name,
                len(call_args),
                len(_FUNC_ARGS[func_name]),
            )
        )

    param_intersection = set(
        _FUNC_ARGS[func_name][: len(call_args)]
    ).intersection(call_kwargs.keys())
    if param_intersection:
        raise TypeError(
            "{0}() got multiple values for keyword argument(s) '{1}'".format(
                func_name, "', '".join(param_intersection)
            )
        )


def _get_all_args_as_kwargs(func_name, call_args, call_kwargs):
    _ensure_consistent_args(func_name, call_args, call_kwargs)
    kwargs = call_kwargs.copy()
    for i, arg in enumerate(call_args):
        kwargs[_FUNC_ARGS[func_name][i]] = arg
    return kwargs


class Call:
    type = CALL_TYPE_FS

    def __init__(
        self, func_name, return_value=None, side_effect=None, call_kwargs=None
    ):
        """
        callable check_stdin raises AssertionError when given stdin doesn't
            match
        """
        call_kwargs = call_kwargs if call_kwargs else {}

        # TODO side effect with return_value - mutually exclusive

        self.type = CALL_TYPE_FS
        self.func_name = func_name
        self.call_kwargs = call_kwargs
        self.return_value = return_value
        self.side_effect = side_effect

    def finish(self):
        if self.side_effect:
            if isinstance(self.side_effect, Exception):
                raise self.side_effect
            raise AssertionError(
                "side_effect other than instance of exception not supported yet"
            )

        return self.return_value

    def __repr__(self):
        return str("<Fs '{0}' kwargs={1}>").format(
            self.func_name,
            self.call_kwargs,
        )

    def __ne__(self, other):
        return (
            self.func_name != other.func_name
            or self.call_kwargs != other.call_kwargs
        )


def get_fs_mock(call_queue):
    package_dir_list = site.getsitepackages()
    package_dir_list.append(PROJECT_ROOT)

    def get_fs_call(func_name, original_call):
        def call_fs(*args, **kwargs):
            # Standard python unittest tries to open some python code (e.g. the
            # test file for caching  when the test raises AssertionError).
            # It is before it the cleanup is called so at this moment the
            # function open is still mocked.
            # Pcs should not open file inside python package in the command so
            # attempt to open file inside pcs package is almost certainly
            # outside of library command and we will provide the original
            # function.
            if func_name == "open":
                for python_package_dir in package_dir_list:
                    if args[0].startswith(python_package_dir):
                        return original_call(*args, **kwargs)

            real_call = Call(
                func_name,
                call_kwargs=_get_all_args_as_kwargs(func_name, args, kwargs),
            )
            dummy_i, expected_call = call_queue.take(
                CALL_TYPE_FS, repr(real_call)
            )

            if expected_call != real_call:
                raise call_queue.error_with_context(
                    "\n  expected: '{0}'\n  but was:  '{1}'".format(
                        expected_call, real_call
                    )
                )

            return expected_call.finish()

        return call_fs

    return get_fs_call


def is_fs_call_in(call_queue):
    return call_queue.has_type(CALL_TYPE_FS)