1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
|
"""Abstract I/O dispatcher. Defines standard dispatcher API"""
from pysnmp.carrier import error
class AbstractTransportDispatcher:
def __init__(self):
self.__transports = {}
self.__jobs = {}
self.__recvCbFun = self.__timerCbFun = None
def _cbFun(self, incomingTransport, transportAddress, incomingMessage):
for name, transport in self.__transports.items():
if transport is incomingTransport:
transportDomain = name
break
else:
raise error.CarrierError(
'Unregistered transport %s' % incomingTransport
)
if self.__recvCbFun is None:
raise error.CarrierError(
'Receive callback not registered -- loosing incoming event'
)
self.__recvCbFun(
self, transportDomain, transportAddress, incomingMessage
)
# Dispatcher API
def registerRecvCbFun(self, recvCbFun):
if self.__recvCbFun is not None:
raise error.CarrierError(
'Receive callback already registered: %s' % self.__recvCbFun
)
self.__recvCbFun = recvCbFun
def unregisterRecvCbFun(self):
self.__recvCbFun = None
def registerTimerCbFun(self, timerCbFun):
if self.__timerCbFun is not None:
raise error.CarrierError(
'Callback already registered: %s' % self.__timerCbFun
)
self.__timerCbFun = timerCbFun
def unregisterTimerCbFun(self):
self.__timerCbFun = None
def registerTransport(self, tDomain, transport):
if self.__transports.has_key(tDomain):
raise error.CarrierError(
'Transport %s already registered' % tDomain
)
transport.registerCbFun(self._cbFun)
self.__transports[tDomain] = transport
def unregisterTransport(self, tDomain):
if not self.__transports.has_key(tDomain):
raise error.CarrierError(
'Transport %s not registered' % tDomain
)
self.__transports[tDomain].unregisterCbFun()
del self.__transports[tDomain]
def getTransport(self, transportDomain):
return self.__transports.get(transportDomain)
def sendMessage(
self, outgoingMessage, transportDomain, transportAddress
):
transport = self.__transports.get(transportDomain)
if transport is None:
raise error.CarrierError(
'No suitable transport domain for %s' % (transportDomain,)
)
transport.sendMessage(outgoingMessage, transportAddress)
def handleTimerTick(self, timeNow):
if self.__timerCbFun:
self.__timerCbFun(timeNow)
def jobStarted(self, jobId):
self.__jobs[jobId] = self.__jobs.get(jobId, 0) + 1
def jobFinished(self, jobId):
self.__jobs[jobId] = self.__jobs[jobId] - 1
if self.__jobs[jobId] == 0:
del self.__jobs[jobId]
def jobsArePending(self):
if self.__jobs:
return 1
else:
return 0
def runDispatcher(self, timeout=0.0):
raise error.CarrierError('Method not implemented')
def closeDispatcher(self):
for tDomain in self.__transports.keys():
self.__transports[tDomain].closeTransport()
self.unregisterTransport(tDomain)
self.unregisterRecvCbFun()
self.unregisterTimerCbFun()
|