1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
|
import sys
import random
from Pyro5.api import expose, oneway, Daemon, Proxy, config, register_dict_to_class, register_class_to_dict
import robot
import remote
config.SERVERTYPE = "multiplex" # to make sure all calls run in the same thread
class DrunkenGameObserver(remote.GameObserver):
@oneway
@expose
def world_update(self, iteration, world, robotdata):
# change directions randomly
if random.random() > 0.8:
self.robot._pyroClaimOwnership() # lets our thread do the proxy calls
if random.random() >= 0.5:
dx, dy = random.randint(-1, 1), 0
else:
dx, dy = 0, random.randint(-1, 1)
if random.random() > 0.7:
self.robot.emote("..Hic! *burp*")
self.robot.change_direction((dx, dy))
class AngryGameObserver(remote.GameObserver):
def __init__(self):
super(AngryGameObserver, self).__init__()
self.directions = [(1, 0), (0, 1), (-1, 0), (0, -1)] # clockwise motion
self.directioncounter = 0
@oneway
@expose
def world_update(self, iteration, world, robotdata):
# move in a loop yelling angry stuff
self.robot._pyroClaimOwnership() # lets our thread do the proxy calls
if iteration % 50 == 0:
self.robot.emote("I'll kill you all! GRR")
if iteration % 10 == 0:
self.directioncounter = (self.directioncounter + 1) % 4
self.robot.change_direction(self.directions[self.directioncounter])
class ScaredGameObserver(remote.GameObserver):
def __init__(self):
super(ScaredGameObserver, self).__init__()
# run to a corner
self.direction = random.choice([(-1, -1), (1, -1), (1, 1), (-1, 1)])
@oneway
@expose
def start(self):
super(ScaredGameObserver, self).start()
self.robot.change_direction(self.direction)
@oneway
@expose
def world_update(self, iteration, world, robotdata):
if iteration % 50 == 0:
self.robot._pyroClaimOwnership() # lets our thread do the proxy calls
self.robot.emote("I'm scared!")
observers = {
"drunk": DrunkenGameObserver,
"angry": AngryGameObserver,
"scared": ScaredGameObserver,
}
# register the Robot class with Pyro's serializers:
register_class_to_dict(robot.Robot, robot.Robot.robot_to_dict)
register_dict_to_class("robot.Robot", robot.Robot.dict_to_robot)
def main(args):
if len(args) != 3:
print("usage: client.py <robotname> <robottype>")
print(" type is one of: %s" % list(observers.keys()))
return
name = args[1]
observertype = args[2]
with Daemon() as daemon:
observer = observers[observertype]()
daemon.register(observer)
gameserver = Proxy("PYRONAME:example.robotserver")
robot = gameserver.register(name, observer)
with robot: # make sure it disconnects, before the daemon thread uses it later
robot.emote("Hi there! I'm here to kick your ass")
observer.robot = robot
print("Pyro server registered on %s" % daemon.locationStr)
daemon.requestLoop()
if __name__ == "__main__":
main(sys.argv)
|