File: _common.py

package info (click to toggle)
python-txaio 2.5.1%2B2016.10.03.git.623ef68776-1
  • links: PTS, VCS
  • area: main
  • in suites: stretch
  • size: 384 kB
  • ctags: 429
  • sloc: python: 1,968; makefile: 221
file content (117 lines) | stat: -rw-r--r-- 4,134 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117

import math
from txaio.interfaces import IBatchedTimer


class _BatchedCall(object):
    """
    Wraps IDelayedCall-implementing objects, implementing only the API
    which txaio promised in the first place: .cancel

    Do not create these yourself; use _BatchedTimer.call_later()
    """

    def __init__(self, timer, index, the_call):
        # XXX WeakRef?
        self._timer = timer
        self._index = index
        self._call = the_call

    def cancel(self):
        self._timer._remove_call(self._index, self)
        self._timer = None

    def __call__(self):
        return self._call()


class _BatchedTimer(IBatchedTimer):
    """
    Internal helper.

    Instances of this are returned from
    :meth:`txaio.make_batched_timer` and that is the only way they
    should be instantiated. You may depend on methods from the
    interface class only (:class:`txaio.IBatchedTimer`)

    **NOTE** that the times are in milliseconds in this class!
    """

    def __init__(self, bucket_milliseconds, chunk_size,
                 seconds_provider, delayed_call_creator):
        self._bucket_milliseconds = float(bucket_milliseconds)
        self._chunk_size = chunk_size
        self._get_seconds = seconds_provider
        self._create_delayed_call = delayed_call_creator
        self._buckets = dict()  # real seconds -> (IDelayedCall, list)

    def call_later(self, delay, func, *args, **kwargs):
        """
        IBatchedTimer API
        """
        # "quantize" the delay to the nearest bucket
        real_time = int(self._get_seconds() + delay) * 1000
        real_time -= int(real_time % self._bucket_milliseconds)
        call = _BatchedCall(self, real_time, lambda: func(*args, **kwargs))
        try:
            self._buckets[real_time][1].append(call)
        except KeyError:
            # new bucket; need to add "actual" underlying IDelayedCall
            delayed_call = self._create_delayed_call(
                (real_time / 1000.0) - self._get_seconds(),
                self._notify_bucket, real_time,
            )
            self._buckets[real_time] = (delayed_call, [call])
        return call

    def _notify_bucket(self, real_time):
        """
        Internal helper. This 'does' the callbacks in a particular bucket.

        :param real_time: the bucket to do callbacks on
        """
        (delayed_call, calls) = self._buckets[real_time]
        del self._buckets[real_time]
        errors = []

        def notify_one_chunk(calls, chunk_size, chunk_delay_ms):
            for call in calls[:chunk_size]:
                try:
                    call()
                except Exception as e:
                    errors.append(e)
            calls = calls[chunk_size:]
            if calls:
                self._create_delayed_call(
                    chunk_delay_ms / 1000.0,
                    notify_one_chunk, calls, chunk_size, chunk_delay_ms,
                )
            else:
                # done all calls; make sure there were no errors
                if len(errors):
                    msg = u"Error(s) processing call_later bucket:\n"
                    for e in errors:
                        msg += u"{}\n".format(e)
                    raise RuntimeError(msg)
        # ceil()ing because we want the number of chunks, and a
        # partial chunk is still a chunk
        delay_ms = self._bucket_milliseconds / math.ceil(float(len(calls)) / self._chunk_size)
        notify_one_chunk(calls, self._chunk_size, delay_ms)

    def _remove_call(self, real_time, call):
        """
        Internal helper. Removes a (possibly still pending) call from a
        bucket. It is *not* an error of the bucket is gone (e.g. the
        call has already happened).
        """
        try:
            (delayed_call, calls) = self._buckets[real_time]
        except KeyError:
            # no such bucket ... error? swallow?
            return
        # remove call; if we're empty, cancel underlying
        # bucket-timeout IDelayedCall
        calls.remove(call)
        if not calls:
            del self._buckets[real_time]
            delayed_call.cancel()