File: server.py

package info (click to toggle)
rpyc 6.0.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,324 kB
  • sloc: python: 6,442; makefile: 122
file content (88 lines) | stat: -rw-r--r-- 2,545 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import rpyc
from rpyc.utils.server import ThreadedServer
import time
import threading


class Web8Service(rpyc.Service):
    def on_connect(self, conn):
        self._conn = conn

    def exposed_get_page(self, gtk, content, page):
        self.gtk = gtk
        self.content = content
        page = page.replace(" ", "_").lower()
        pagefunc = getattr(self, f"page_{page}", None)
        if pagefunc:
            pagefunc()
        else:
            lbl1 = self.gtk.Label(f"Page {page!r} does not exist")
            lbl1.show()
            self.content.pack_start(lbl1)

    def page_main(self):
        counter = [0]

        lbl1 = self.gtk.Label("Hello mate, this is the main page")
        lbl1.show()
        self.content.pack_start(lbl1)

        def on_btn1_clicked(src):
            counter[0] += 1
            lbl2.set_text(f"You have clicked the button {counter[0]} times")

        btn1 = self.gtk.Button("Add 1")
        btn1.connect("clicked", on_btn1_clicked)
        btn1.show()
        self.content.pack_start(btn1)

        lbl2 = self.gtk.Label("You have clicked the button 0 times")
        lbl2.show()
        self.content.pack_start(lbl2)

        def on_btn2_clicked(src):
            self._conn.root.navigate("/hello_world")

        btn2 = self.gtk.Button("Go to the 'hello world' page")
        btn2.connect("clicked", on_btn2_clicked)
        btn2.show()
        self.content.pack_start(btn2)

        active = [False]

        def bg_timer_thread():
            while active[0]:
                rpyc.async_(lbl3.set_text)(f"Server time is: {time.ctime()}")
                time.sleep(1)

        bg_thread = [None]

        def on_btn3_clicked(src):
            if btn3.get_label() == "Start timer":
                bg_thread[0] = threading.Thread(target=bg_timer_thread)
                active[0] = True
                bg_thread[0].start()
                btn3.set_label("Stop timer")
            else:
                active[0] = False
                bg_thread[0].join()
                btn3.set_label("Start timer")

        btn3 = self.gtk.Button("Start timer")
        btn3.connect("clicked", on_btn3_clicked)
        btn3.show()
        self.content.pack_start(btn3)

        lbl3 = self.gtk.Label("Server time is: ?")
        lbl3.show()
        self.content.pack_start(lbl3)

    def page_hello_world(self):
        lbl = self.gtk.Label("Hello world!")
        lbl.show()
        self.content.pack_start(lbl)


if __name__ == "__main__":
    t = ThreadedServer(Web8Service, port=18833)
    t.start()