File: timeflies_tkinter.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (50 lines) | stat: -rw-r--r-- 1,342 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import tkinter
from tkinter import Event, Frame, Label, Tk
from typing import Any, Tuple

import reactivex
from reactivex import Observable
from reactivex import operators as ops
from reactivex.scheduler.mainloop import TkinterScheduler
from reactivex.subject import Subject


def main() -> None:
    root = Tk()
    root.title("Rx for Python rocks")
    scheduler = TkinterScheduler(root)

    mousemoves: Subject[Event[Any]] = Subject()

    frame = Frame(root, width=600, height=600)
    frame.bind("<Motion>", mousemoves.on_next)

    text = "TIME FLIES LIKE AN ARROW"

    def on_next(info: Tuple[tkinter.Label, "Event[Frame]", int]) -> None:
        label, ev, i = info
        label.place(x=ev.x + i * 12 + 15, y=ev.y)

    def label2stream(
        label: tkinter.Label, index: int
    ) -> Observable[Tuple[tkinter.Label, "Event[Frame]", int]]:

        return mousemoves.pipe(
            ops.map(lambda ev: (label, ev, index)),
            ops.delay(index * 0.1),
        )

    def char2label(char: str) -> Label:
        return Label(frame, text=char, borderwidth=0, padx=0, pady=0)

    reactivex.from_(text).pipe(
        ops.map(char2label),
        ops.flat_map_indexed(label2stream),
    ).subscribe(on_next, on_error=print, scheduler=scheduler)

    frame.pack()
    root.mainloop()


if __name__ == "__main__":
    main()