File: function_call.py

package info (click to toggle)
python-pyforge 1.3.0-7
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 576 kB
  • sloc: python: 3,729; makefile: 13; sh: 7
file content (77 lines) | stat: -rw-r--r-- 2,623 bytes parent folder | download | duplicates (5)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from numbers import Number
from sentinels import NOTHING
from .exceptions import ConflictingActions
from .queued_object import QueuedObject


class FunctionCall(QueuedObject):
    def __init__(self, target, args, kwargs, caller_info):
        super(FunctionCall, self).__init__(caller_info)
        self.target = target
        self.args = target.__forge__.signature.get_normalized_args(args, kwargs)
        self._call_funcs = []
        self._call_funcs_with_args = []
        self._return_value = NOTHING
        self._raised_exception = NOTHING

    def matches(self, call):
        if not isinstance(call, FunctionCall):
            return False
        return self.target is call.target and self.args == call.args

    def describe(self):
        return "%s(%s)" % (
            self.target.__forge__.describe(),
            self._get_argument_string(),
            )

    def _get_argument_string(self):
        positional_args = sorted(k for k in self.args if isinstance(k, Number))
        keyword_args = sorted(k for k in self.args if not isinstance(k, Number))
        args = [repr(self.args[arg_index]) for arg_index in positional_args]
        args.extend("%s=%r" % (arg_name, self.args[arg_name])
                    for arg_name in keyword_args)
        return ", ".join(args)

    def whenever(self):
        self.target.__forge__.forge.queue.allow_whenever(self)
        return self

    def and_call(self, func, args=(), kwargs=None):
        if kwargs is None:
            kwargs = {}
        self._call_funcs.append((func, list(args), kwargs))
        return self
    then_call = and_call

    def and_call_with_args(self, func):
        self._call_funcs_with_args.append(func)
        return self
    then_call_with_args = and_call_with_args

    def and_raise(self, exc):
        if self._return_value is not NOTHING:
            raise ConflictingActions()
        self._raised_exception = exc
        return exc
    then_raise = and_raise

    def and_return(self, rv):
        if self._raised_exception is not NOTHING:
            raise ConflictingActions()
        self._return_value = rv
        return rv
    then_return = and_return

    def do_side_effects(self, args, kwargs):
        for call_func, args, kwargs in self._call_funcs:
            call_func(*args, **kwargs)
        for call_func in self._call_funcs_with_args:
            call_func(*args, **kwargs)
        if self._raised_exception is not NOTHING:
            raise self._raised_exception

    def get_return_value(self):
        if self._return_value is not NOTHING:
            return self._return_value
        return None