File: blockingdemo.py

package info (click to toggle)
twisted 25.5.0-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 20,560 kB
  • sloc: python: 203,171; makefile: 200; sh: 92; javascript: 36; xml: 31
file content (107 lines) | stat: -rw-r--r-- 2,549 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.


from twisted.internet import _threadedselect

_threadedselect.install()

from itertools import count

from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.python.failure import Failure
from twisted.python.runtime import seconds

try:
    # Python 3
    from queue import Empty, Queue
except ImportError:
    # Python 2
    from Queue import Empty, Queue


class TwistedManager:
    def __init__(self):
        self.twistedQueue = Queue()
        self.key = count()
        self.results = {}

    def getKey(self):
        # get a unique identifier
        return next(self.key)

    def start(self):
        # start the reactor
        reactor.interleave(self.twistedQueue.put)

    def _stopIterating(self, value, key):
        self.results[key] = value

    def stop(self):
        # stop the reactor
        key = self.getKey()
        reactor.addSystemEventTrigger(
            "after", "shutdown", self._stopIterating, True, key
        )
        reactor.stop()
        self.iterate(key)

    def getDeferred(self, d):
        # get the result of a deferred or raise if it failed
        key = self.getKey()
        d.addBoth(self._stopIterating, key)
        res = self.iterate(key)
        if isinstance(res, Failure):
            res.raiseException()
        return res

    def poll(self, noLongerThan=1.0):
        # poll the reactor for up to noLongerThan seconds
        base = seconds()
        try:
            while (seconds() - base) <= noLongerThan:
                callback = self.twistedQueue.get_nowait()
                callback()
        except Empty:
            pass

    def iterate(self, key=None):
        # iterate the reactor until it has the result we're looking for
        while key not in self.results:
            callback = self.twistedQueue.get()
            callback()
        return self.results.pop(key)


def fakeDeferred(msg):
    d = Deferred()

    def cb():
        print("deferred called back")
        d.callback(msg)

    reactor.callLater(2, cb)
    return d


def fakeCallback():
    print("twisted is still running")


def main():
    m = TwistedManager()
    print("starting")
    m.start()
    print("setting up a 1sec callback")
    reactor.callLater(1, fakeCallback)
    print("getting a deferred")
    res = m.getDeferred(fakeDeferred("got it!"))
    print("got the deferred:", res)
    print("stopping")
    m.stop()
    print("stopped")


if __name__ == "__main__":
    main()