1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
|
import sys
from Tasks import ThreadedTaskHandler
from wxPython.wx import wxNewId, wxPyCommandEvent
'''wxPython debugging client code. This runs in the IDE.
A debug client connects to a debug server, generally in a different
process. The debug server does the dirty work of stepping and
stopping at breakpoints.
'''
wxEVT_DEBUGGER_OK = wxNewId()
wxEVT_DEBUGGER_EXC = wxNewId()
wxEVT_DEBUGGER_START = wxNewId()
wxEVT_DEBUGGER_STOPPED = wxNewId()
def EVT_DEBUGGER_OK(win, id, func):
win.Connect(id, -1, wxEVT_DEBUGGER_OK, func)
def EVT_DEBUGGER_EXC(win, id, func):
win.Connect(id, -1, wxEVT_DEBUGGER_EXC, func)
def EVT_DEBUGGER_START(win, id, func):
win.Connect(id, -1, wxEVT_DEBUGGER_START, func)
def EVT_DEBUGGER_STOPPED(win, id, func):
win.Connect(id, -1, wxEVT_DEBUGGER_STOPPED, func)
class EmptyResponseError (Exception):
"""Empty debugger response"""
class DebuggerCommEvent(wxPyCommandEvent):
receiver_name = None
receiver_args = ()
result = None
task = None
t = None
v = None
tb = ('', 0)
def __init__(self, evtType, id):
wxPyCommandEvent.__init__(self, evtType, id)
def SetResult(self, result):
self.result = result
def GetResult(self):
return self.result
def SetTask(self, task):
self.task = task
def GetTask(self):
return self.task
def SetReceiverName(self, name):
self.receiver_name = name
def GetReceiverName(self):
return self.receiver_name
def SetReceiverArgs(self, args):
self.receiver_args = args
def GetReceiverArgs(self):
return self.receiver_args
def SetExc(self, t, v):
self.t, self.v = t, v
def GetExc(self):
return self.t, self.v
class DebugClient:
"""The base class expected to be used by all DebugClients.
"""
def __init__(self, win):
self.win_id = win.GetId()
self.event_handler = win.GetEventHandler()
def invokeOnServer(self, m_name, m_args=(), r_name=None, r_args=()):
"""Invokes an event on the debug server."""
raise NotImplementedError
def kill(self):
"""Terminates the debugger."""
raise NotImplementedError
def getProcessId(self):
"""Returns the process ID if this client is connected to another
process."""
return 0
def createEvent(self, typ):
"""Creates an event."""
return DebuggerCommEvent(typ, self.win_id)
def postEvent(self, evt):
"""Adds an event to the event queue."""
if self.event_handler:
self.event_handler.AddPendingEvent(evt)
def pollStreams(self):
"""Returns the data sent to stdout and stderr."""
return ('', '')
class DebuggerTask:
"""Calls invoke() on a debug client then posts an event on return.
"""
def __init__(self, client, m_name, m_args=(), r_name='', r_args=()):
self.client = client
self.m_name = m_name
self.m_args = m_args
self.r_name = r_name
self.r_args = r_args
def __repr__(self):
return '<DebuggerTask: %s:%s:%s:%s>'%(self.m_name, self.m_args,
self.r_name, self.r_args)
def __call__(self):
evt = None
try:
result = self.client.invoke(self.m_name, self.m_args)
except:
t, v = sys.exc_info()[:2]
evt = self.client.createEvent(wxEVT_DEBUGGER_EXC)
evt.SetExc(t, v)
else:
if self.r_name:
evt = self.client.createEvent(wxEVT_DEBUGGER_OK)
evt.SetReceiverName(self.r_name)
evt.SetReceiverArgs(self.r_args)
evt.SetResult(result)
if evt:
self.client.postEvent(evt)
class MultiThreadedDebugClient (DebugClient):
taskHandler = ThreadedTaskHandler()
def invoke(self, m_name, m_args):
raise NotImplementedError
def invokeOnServer(self, m_name, m_args=(), r_name=None, r_args=()):
task = DebuggerTask(self, m_name, m_args, r_name, r_args)
self.taskHandler.addTask(task)
|