File: startup.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (143 lines) | stat: -rw-r--r-- 3,422 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# Helpers.
# Run this cell always after kernel restarts. All other cells are autonomous.
from __future__ import print_function

import inspect
import logging

# getting the current thread
import threading
import time
from random import randint

import reactivex

logging.basicConfig(format="%(threadName)s:%(message)s")
log = logging.getLogger("Rx")
log.setLevel(logging.WARNING)

sleep, now = time.sleep, time.time
O = reactivex.Observable

ts_glob = 0  # global start time


def reset_start_time(show_doc_for=None, title=True, sleep=None):
    "resets global start time and also prints doc strings"
    global ts_glob
    if sleep:
        log("main thread sleeping %ss" % sleep)
        time.sleep(sleep)
    ts_glob, d = time.time(), show_doc_for
    if title:
        if d:
            if title == True:
                title = d.__name__
        header(title)
    if not d:
        return
    # print the function sig and doc if given
    # py2 compatible way:
    deco, fdef = inspect.getsource(d).split("def ", 1)
    fdef = "def ".join((deco, fdef.split(")", 1)[0])) + "):"
    print(
        "module %s\n%s\n%s\n%s"
        % (
            d.__module__,
            fdef.strip(),
            ("    " + (d.__doc__ or "n.a.").strip()),
            "-" * 80,
        )
    )


rst = reset_start_time


def log(*msg):
    s = " ".join([str(s) for s in msg])
    print("%s %s %s" % (dt(ts_glob), cur_thread(), s))


def header(msg):
    print("\n\n%s %s %s\n" % ("=" * 10, msg, "=" * 10))


def rand():
    return randint(100, 999)


def to_int(s):
    return int(s) if s.isdigit() else s


def dt(ts):
    # the time delta of now to given ts (in millis, 1 float)
    return str("%.1f" % ((time.time() - ts) * 1000)).rjust(6)


class ItemGetter:
    "allows to throw an object onto a format string"

    def __init__(self, obj):
        self.obj = obj

    def __getitem__(self, k, d=None):
        return getattr(self.obj, k, d)


class Subscriber:
    def __init__(self, observed_stream, **kw):
        print("")
        name = kw.get("name", str(hash(self))[-5:])
        log(
            "New subscription (%s) on stream" % str(name).strip(), hash(observed_stream)
        )
        self.ts = time.time()  # tstart, for dts at events
        # no postifx after name, sometimes it ends with '\n':
        self.name = name

    def _on(self, what, v=""):
        print(
            "%s %s [%s] %s: %s -> %s"
            % (dt(ts_glob), cur_thread(), what, dt(self.ts), v, self.name)
        )

    def on_next(self, v):
        return self._on("next", v)

    def on_error(self, v):
        return self._on("err ", v)

    def on_completed(self):
        return self._on("cmpl", "fin")


def subs(src, **kw):
    # required for e.g. .multicast:
    obs = Subscriber(src, **kw)
    subscription = src.subscribe(obs)
    if kw.pop("return_subscriber", None):
        return subscription, obs
    return subscription


threads = []


def cur_thread():
    def _cur():
        "return a unique number for the current thread"
        n = threading.current_thread().name
        if "Main" in n:
            return "    M"
        return "%5s" % ("T" + n.rsplit("-", 1)[-1])

    # you could show all running threads via this:
    # threads = ' '.join([t.name for t in threading.enumerate()])
    # return '%s of %s' % (_cur(), threads)
    return _cur()


def marble_stream(s):
    return O.from_marbles(s).to_blocking()