1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
|
# Helpers.
# Run this cell always after kernel restarts. All other cells are autonomous.
from __future__ import print_function
import inspect
import logging
# getting the current thread
import threading
import time
from random import randint
import reactivex
logging.basicConfig(format="%(threadName)s:%(message)s")
log = logging.getLogger("Rx")
log.setLevel(logging.WARNING)
sleep, now = time.sleep, time.time
O = reactivex.Observable
ts_glob = 0 # global start time
def reset_start_time(show_doc_for=None, title=True, sleep=None):
"resets global start time and also prints doc strings"
global ts_glob
if sleep:
log("main thread sleeping %ss" % sleep)
time.sleep(sleep)
ts_glob, d = time.time(), show_doc_for
if title:
if d:
if title == True:
title = d.__name__
header(title)
if not d:
return
# print the function sig and doc if given
# py2 compatible way:
deco, fdef = inspect.getsource(d).split("def ", 1)
fdef = "def ".join((deco, fdef.split(")", 1)[0])) + "):"
print(
"module %s\n%s\n%s\n%s"
% (
d.__module__,
fdef.strip(),
(" " + (d.__doc__ or "n.a.").strip()),
"-" * 80,
)
)
rst = reset_start_time
def log(*msg):
s = " ".join([str(s) for s in msg])
print("%s %s %s" % (dt(ts_glob), cur_thread(), s))
def header(msg):
print("\n\n%s %s %s\n" % ("=" * 10, msg, "=" * 10))
def rand():
return randint(100, 999)
def to_int(s):
return int(s) if s.isdigit() else s
def dt(ts):
# the time delta of now to given ts (in millis, 1 float)
return str("%.1f" % ((time.time() - ts) * 1000)).rjust(6)
class ItemGetter:
"allows to throw an object onto a format string"
def __init__(self, obj):
self.obj = obj
def __getitem__(self, k, d=None):
return getattr(self.obj, k, d)
class Subscriber:
def __init__(self, observed_stream, **kw):
print("")
name = kw.get("name", str(hash(self))[-5:])
log(
"New subscription (%s) on stream" % str(name).strip(), hash(observed_stream)
)
self.ts = time.time() # tstart, for dts at events
# no postifx after name, sometimes it ends with '\n':
self.name = name
def _on(self, what, v=""):
print(
"%s %s [%s] %s: %s -> %s"
% (dt(ts_glob), cur_thread(), what, dt(self.ts), v, self.name)
)
def on_next(self, v):
return self._on("next", v)
def on_error(self, v):
return self._on("err ", v)
def on_completed(self):
return self._on("cmpl", "fin")
def subs(src, **kw):
# required for e.g. .multicast:
obs = Subscriber(src, **kw)
subscription = src.subscribe(obs)
if kw.pop("return_subscriber", None):
return subscription, obs
return subscription
threads = []
def cur_thread():
def _cur():
"return a unique number for the current thread"
n = threading.current_thread().name
if "Main" in n:
return " M"
return "%5s" % ("T" + n.rsplit("-", 1)[-1])
# you could show all running threads via this:
# threads = ' '.join([t.name for t in threading.enumerate()])
# return '%s of %s' % (_cur(), threads)
return _cur()
def marble_stream(s):
return O.from_marbles(s).to_blocking()
|