1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
|
"""GNUmed scripting listener.
This module implements threaded listening for scripting.
"""
#=====================================================================
# $Source: /home/ncq/Projekte/cvs2git/vcs-mirror/gnumed/gnumed/client/pycommon/gmScriptingListener.py,v $
__version__ = "$Revision: 1.7 $"
__author__ = "K.Hilbert <karsten.hilbert@gmx.net>"
import sys, time, threading, SimpleXMLRPCServer, select, logging
_log = logging.getLogger('gm.scripting')
_log.info(__version__)
#=====================================================================
class cScriptingListener:
# FIXME: this should use /var/run/gnumed/xml-rpc-port.pid
# FIXME: and store the current port there
"""This class handles how GNUmed listens for external requests.
It starts an XML-RPC server and forks a thread which
listens for incoming requests. Those requests are then
handed over to a macro executor and the results handed
back to the caller.
"""
def __init__(self, port = None, macro_executor = None, poll_interval = 3):
# listener thread will regularly try to acquire
# this lock, when it succeeds it will quit
self._quit_lock = threading.Lock()
if not self._quit_lock.acquire(0):
_log.error('cannot acquire thread quit lock !?! aborting')
import thread
raise thread.error("cannot acquire thread quit-lock")
# check for data every 'poll_interval' seconds
self._poll_interval = poll_interval
# localhost only for somewhat better security
self._listener_address = '127.0.0.1'
self._port = int(port)
self._macro_executor = macro_executor
self._server = SimpleXMLRPCServer.SimpleXMLRPCServer(addr=(self._listener_address, self._port), logRequests=False)
self._server.register_instance(self._macro_executor)
self._server.allow_reuse_address = True
self._thread = threading.Thread (
target = self._process_RPCs,
name = self.__class__.__name__
)
self._thread.setDaemon(True)
self._thread.start()
_log.info('scripting listener started on [%s:%s]' % (self._listener_address, self._port))
_log.info('macro executor: %s' % self._macro_executor)
_log.info('poll interval: %s seconds', self._poll_interval)
#-------------------------------
# public API
#-------------------------------
def shutdown(self):
"""Cleanly shut down. Complement to __init__()."""
if self._thread is None:
return
_log.info('stopping frontend scripting listener thread')
self._quit_lock.release()
try:
# give the worker thread time to terminate
self._thread.join(self._poll_interval+5)
try:
if self._thread.isAlive():
_log.error('listener thread still alive after join()')
_log.debug('active threads: %s' % threading.enumerate())
except:
pass
except:
print sys.exc_info()
self._thread = None
try:
self._server.socket.shutdown(2)
except:
_log.exception('cannot cleanly shutdown(5) scripting listener socket')
try:
self._server.socket.close()
except:
_log.exception('cannot cleanly close() scripting listener socket')
#-------------------------------
# internal helpers
#-------------------------------
def _process_RPCs(self):
"""The actual thread code."""
while 1:
if self._quit_lock.acquire(0):
break
time.sleep(0.35) # give others time to acquire lock
if self._quit_lock.acquire(0):
break
# wait at most self.__poll_interval for new data
ready_input_sockets = select.select([self._server.socket], [], [], self._poll_interval)[0]
# any input available ?
if len(ready_input_sockets) != 0:
# we may be in __del__ so we might fail here
try:
self._server.handle_request()
except:
print "cannot serve RPC"
break
if self._quit_lock.acquire(0):
break
time.sleep(0.25)
if self._quit_lock.acquire(0):
break
else:
time.sleep(0.35)
if self._quit_lock.acquire(0):
break
# exit thread activity
return
#=====================================================================
# main
#=====================================================================
if __name__ == "__main__":
#-------------------------------
class runner:
def tell_time(self):
return time.asctime()
#-------------------------------
if (len(sys.argv) > 1) and (sys.argv[1] == u'test'):
import xmlrpclib
try:
listener = cScriptingListener(macro_executor=runner(), port=9999)
except:
_log.exception('cannot instantiate scripting listener')
sys.exit(1)
s = xmlrpclib.ServerProxy('http://localhost:9999')
try:
t = s.tell_time()
print t
except:
_log.exception('cannot interact with server')
listener.shutdown()
#=====================================================================
# $Log: gmScriptingListener.py,v $
# Revision 1.7 2009-01-15 11:34:17 ncq
# - improved logging
#
# Revision 1.6 2008/06/23 21:49:58 ncq
# - clean up thread handling
#
# Revision 1.5 2007/12/12 16:17:16 ncq
# - better logger names
#
# Revision 1.4 2007/12/11 15:39:01 ncq
# - use std lib logging
#
# Revision 1.3 2007/12/03 20:43:53 ncq
# - lots of cleanup
# - fix/enhance test suite
#
# Revision 1.2 2004/09/13 08:51:03 ncq
# - make sure port is an integer
# - start XML RPC server with logRequests=False
# - socket allow_reuse_address
#
# Revision 1.1 2004/02/25 09:30:13 ncq
# - moved here from python-common
#
# Revision 1.3 2004/02/05 23:46:21 ncq
# - use serverproxy() instead of server() as is recommended
#
# Revision 1.2 2004/02/05 18:40:01 ncq
# - quit thread if we can't handle_request()
#
# Revision 1.1 2004/02/02 21:58:20 ncq
# - first cut
#
|