1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
|
#!/usr/bin/env python3
import logging
import functools
import rpyc
import threading
import random
import time
THREAD_SAFE = True # Toggles thread safe and unsafe behavior
def synchronize(lock):
""" Decorator that invokes the lock acquire call before a function call and releases after """
def sync_func(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
lock.acquire()
res = func(*args, **kwargs)
lock.release()
return res
return wrapper
return sync_func
class SharingComponent(object):
""" Initialized in the class definition of SharingService and shared by all instances of SharingService """
lock = threading.Lock()
def __init__(self):
self.sequence_id = 0
def sleepy_sequence_id(self):
""" increment id and sometimes sleep to force race condition """
self.sequence_id += 1
_expected_sequence_id = self.sequence_id
if random.randint(0, 1) == 1:
time.sleep(1)
if self.sequence_id == _expected_sequence_id:
return self.sequence_id
else:
raise RuntimeError("Unexpected sequence_id behavior (race condition).")
@synchronize(lock)
def get_sequence_id(self):
""" provides a thread-safe execution frame to otherwise unsafe functions """
return self.sleepy_sequence_id()
class SharingService(rpyc.Service):
""" A class that allows for sharing components between connection instances """
__shared__ = SharingComponent()
@property
def shared(self):
""" convenient access to an otherwise long object name """
return SharingService.__shared__
def exposed_echo(self, message):
""" example of the potential perils when threading shared state """
if THREAD_SAFE:
seq_id = self.shared.get_sequence_id()
else:
seq_id = self.shared.sleepy_sequence_id()
if message == "Echo":
return f"Echo Reply {seq_id}"
else:
return f"Parameter Problem {seq_id}"
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
debugging_config = {'allow_all_attrs': True, 'sync_request_timeout': None}
echo_svc = rpyc.ThreadedServer(service=SharingService, port=18861, protocol_config=debugging_config)
echo_svc.start()
|