1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
|
import rpyc
from rpyc.utils.server import ThreadedServer
import time
import threading
class Web8Service(rpyc.Service):
def on_connect(self, conn):
self._conn = conn
def exposed_get_page(self, gtk, content, page):
self.gtk = gtk
self.content = content
page = page.replace(" ", "_").lower()
pagefunc = getattr(self, f"page_{page}", None)
if pagefunc:
pagefunc()
else:
lbl1 = self.gtk.Label(f"Page {page!r} does not exist")
lbl1.show()
self.content.pack_start(lbl1)
def page_main(self):
counter = [0]
lbl1 = self.gtk.Label("Hello mate, this is the main page")
lbl1.show()
self.content.pack_start(lbl1)
def on_btn1_clicked(src):
counter[0] += 1
lbl2.set_text(f"You have clicked the button {counter[0]} times")
btn1 = self.gtk.Button("Add 1")
btn1.connect("clicked", on_btn1_clicked)
btn1.show()
self.content.pack_start(btn1)
lbl2 = self.gtk.Label("You have clicked the button 0 times")
lbl2.show()
self.content.pack_start(lbl2)
def on_btn2_clicked(src):
self._conn.root.navigate("/hello_world")
btn2 = self.gtk.Button("Go to the 'hello world' page")
btn2.connect("clicked", on_btn2_clicked)
btn2.show()
self.content.pack_start(btn2)
active = [False]
def bg_timer_thread():
while active[0]:
rpyc.async_(lbl3.set_text)(f"Server time is: {time.ctime()}")
time.sleep(1)
bg_thread = [None]
def on_btn3_clicked(src):
if btn3.get_label() == "Start timer":
bg_thread[0] = threading.Thread(target=bg_timer_thread)
active[0] = True
bg_thread[0].start()
btn3.set_label("Stop timer")
else:
active[0] = False
bg_thread[0].join()
btn3.set_label("Start timer")
btn3 = self.gtk.Button("Start timer")
btn3.connect("clicked", on_btn3_clicked)
btn3.show()
self.content.pack_start(btn3)
lbl3 = self.gtk.Label("Server time is: ?")
lbl3.show()
self.content.pack_start(lbl3)
def page_hello_world(self):
lbl = self.gtk.Label("Hello world!")
lbl.show()
self.content.pack_start(lbl)
if __name__ == "__main__":
t = ThreadedServer(Web8Service, port=18833)
t.start()
|