File: pdbcapture.py

package info (click to toggle)
python-weberror 0.13.1%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: buster, stretch
  • size: 408 kB
  • ctags: 441
  • sloc: python: 2,774; makefile: 18
file content (144 lines) | stat: -rw-r--r-- 5,006 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from webob import Request, Response
import threading
from paste.util import threadedprint
from itertools import count
import tempita
from paste.urlparser import StaticURLParser
from paste.util.filemixin import FileMixin
import os
import sys

try:
    import json
except ImportError: # pragma: no cover
    import simplejson as json

here = os.path.dirname(os.path.abspath(__file__))

#def debug(msg, *args):
#    args = '%s %s' % (msg, ' '.join(map(repr, args)))
#    print >> sys.stderr, args

class PdbCapture(object):

    def __init__(self, app):
        self.app = app
        threadedprint.install(leave_stdout=True)
        threadedprint.install_stdin()
        self.counter = count()
        self.static_app = StaticURLParser(os.path.join(here, 'pdbcapture/static'))
        self.media_app = StaticURLParser(os.path.join(here, 'eval-media'))
        self.states = {}

    def get_template(self, template_name):
        filename = os.path.join(os.path.dirname(__file__), template_name)
        return tempita.HTMLTemplate.from_filename(filename)
        
    def __call__(self, environ, start_response):
        req = Request(environ)
        if req.GET.get('__pdbid__'):
            id = int(req.GET['__pdbid__'])
            response = self.states[id]['response']
            return response(environ, start_response)
        if req.path_info_peek() == '.pdbcapture':
            req.path_info_pop()
            if req.path_info_peek() == 'static':
                req.path_info_pop()
                return self.static_app(environ, start_response)
            if req.path_info_peek() == 'media':
                req.path_info_pop()
                return self.media_app(environ, start_response)
            resp = self.internal_request(req)
            return resp(environ, start_response)
        id = self.counter.next()
        state = dict(id=id,
                     event=threading.Event(),
                     base_url=req.application_url,
                     stdout=[],
                     stdin=[],
                     stdin_event=threading.Event())
        t = threading.Thread(target=self.call_app, args=(req, state))
        t.setDaemon(True)
        t.start()
        state['event'].wait()
        if 'response' in state:
            # Normal request, nothing happened
            resp = state['response']
            return resp(environ, start_response)
        if 'exc_info' in state:
            raise state['exc_info'][0], state['exc_info'][1], state['exc_info'][2]
        self.states[id] = state
        tmpl = self.get_template('pdbcapture_response.html')
        body = tmpl.substitute(req=req, state=state, id=id)
        resp = Response(body)
        return resp(environ, start_response)

    def internal_request(self, req):
        id = int(req.params['id'])
        state = self.states[id]
        if 'response' in state:
            body = {'response': 1}
        else:
            if req.params.get('stdin'):
                state['stdin'].append(req.params['stdin'])
                state['stdin_event'].set()
            stdout = ''.join(state['stdout'])
            state['stdout'][:] = []
            body = {'stdout': stdout}
        if not state['stdin_event'].isSet():
            body['stdinPending'] = 1
        resp = Response(content_type='application/json',
                        body=json.dumps(body))
        return resp

    def call_app(self, req, state):
        event = state['event']
        stream_handler = StreamHandler(stdin=state['stdin'], stdin_event=state['stdin_event'], stdout=state['stdout'],
                                       signal_event=state['event'])
        threadedprint.register(stream_handler)
        threadedprint.register_stdin(stream_handler)
        try:
            resp = req.get_response(self.app)
            state['response'] = resp
        except:
            state['exc_info'] = sys.exc_info()
        event.set()

class StreamHandler(FileMixin):

    def __init__(self, stdin, stdout, stdin_event, signal_event):
        self.stdin = stdin
        self.stdout = stdout
        self.stdin_event = stdin_event
        self.signal_event = signal_event

    def write(self, text):
        self.stdout.append(text)

    def read(self, size=None):
        self.signal_event.set()
        text = ''.join(self.stdin)
        if size is None or size == -1:
            self.stdin[:] = []
            sys.stdout.write(text)
            return text
        while len(text) < size:
            self.stdin_event.clear()
            self.stdin_event.wait()
            text = ''.join(self.stdin)
        pending = text[:size]
        self.stdin[:] = [text[size:]]
        sys.stdout.write(pending)
        return pending

def test_app(environ, start_response):
    import pdb
    message = "Hey, what's up?"
    pdb.set_trace()
    start_response('200 OK', [('Content-type', 'text/plain')])
    return [message]

if __name__ == '__main__':
    from paste import httpserver
    httpserver.serve(PdbCapture(test_app))