File: DebugClient.py

package info (click to toggle)
boa-constructor 0.3.0-3
  • links: PTS
  • area: main
  • in suites: sarge
  • size: 8,188 kB
  • ctags: 8,857
  • sloc: python: 54,163; sh: 66; makefile: 36
file content (152 lines) | stat: -rw-r--r-- 4,288 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import sys

from Tasks import ThreadedTaskHandler
from wxPython.wx import wxNewId, wxPyCommandEvent

'''wxPython debugging client code.  This runs in the IDE.
A debug client connects to a debug server, generally in a different
process.  The debug server does the dirty work of stepping and
stopping at breakpoints.
'''

wxEVT_DEBUGGER_OK = wxNewId()
wxEVT_DEBUGGER_EXC = wxNewId()
wxEVT_DEBUGGER_START = wxNewId()
wxEVT_DEBUGGER_STOPPED = wxNewId()

def EVT_DEBUGGER_OK(win, id, func):
    win.Connect(id, -1, wxEVT_DEBUGGER_OK, func)

def EVT_DEBUGGER_EXC(win, id, func):
    win.Connect(id, -1, wxEVT_DEBUGGER_EXC, func)

def EVT_DEBUGGER_START(win, id, func):
    win.Connect(id, -1, wxEVT_DEBUGGER_START, func)

def EVT_DEBUGGER_STOPPED(win, id, func):
    win.Connect(id, -1, wxEVT_DEBUGGER_STOPPED, func)


class EmptyResponseError (Exception):
    """Empty debugger response"""


class DebuggerCommEvent(wxPyCommandEvent):
    receiver_name = None
    receiver_args = ()
    result = None
    task = None
    t = None
    v = None
    tb = ('', 0)

    def __init__(self, evtType, id):
        wxPyCommandEvent.__init__(self, evtType, id)

    def SetResult(self, result):
        self.result = result

    def GetResult(self):
        return self.result

    def SetTask(self, task):
        self.task = task

    def GetTask(self):
        return self.task

    def SetReceiverName(self, name):
        self.receiver_name = name

    def GetReceiverName(self):
        return self.receiver_name

    def SetReceiverArgs(self, args):
        self.receiver_args = args

    def GetReceiverArgs(self):
        return self.receiver_args

    def SetExc(self, t, v):
        self.t, self.v = t, v

    def GetExc(self):
        return self.t, self.v


class DebugClient:
    """The base class expected to be used by all DebugClients.
    """
    def __init__(self, win):
        self.win_id = win.GetId()
        self.event_handler = win.GetEventHandler()

    def invokeOnServer(self, m_name, m_args=(), r_name=None, r_args=()):
        """Invokes an event on the debug server."""
        raise NotImplementedError

    def kill(self):
        """Terminates the debugger."""
        raise NotImplementedError

    def getProcessId(self):
        """Returns the process ID if this client is connected to another
        process."""
        return 0

    def createEvent(self, typ):
        """Creates an event."""
        return DebuggerCommEvent(typ, self.win_id)

    def postEvent(self, evt):
        """Adds an event to the event queue."""
        if self.event_handler:
            self.event_handler.AddPendingEvent(evt)

    def pollStreams(self):
        """Returns the data sent to stdout and stderr."""
        return ('', '')


class DebuggerTask:
    """Calls invoke() on a debug client then posts an event on return.
    """
    def __init__(self, client, m_name, m_args=(), r_name='', r_args=()):
        self.client = client
        self.m_name = m_name
        self.m_args = m_args
        self.r_name = r_name
        self.r_args = r_args

    def __repr__(self):
        return '<DebuggerTask: %s:%s:%s:%s>'%(self.m_name, self.m_args,
                                              self.r_name, self.r_args)

    def __call__(self):
        evt = None
        try:
            result = self.client.invoke(self.m_name, self.m_args)
        except:
            t, v = sys.exc_info()[:2]
            evt = self.client.createEvent(wxEVT_DEBUGGER_EXC)
            evt.SetExc(t, v)
        else:
            if self.r_name:
                evt = self.client.createEvent(wxEVT_DEBUGGER_OK)
                evt.SetReceiverName(self.r_name)
                evt.SetReceiverArgs(self.r_args)
                evt.SetResult(result)
        if evt:
            self.client.postEvent(evt)


class MultiThreadedDebugClient (DebugClient):

    taskHandler = ThreadedTaskHandler()

    def invoke(self, m_name, m_args):
        raise NotImplementedError

    def invokeOnServer(self, m_name, m_args=(), r_name=None, r_args=()):
        task = DebuggerTask(self, m_name, m_args, r_name, r_args)
        self.taskHandler.addTask(task)