1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
|
"""GDB Python API bridge."""
import gdb
import socket
from threading import Condition
import time
from rpyc.core.protocol import Connection
from rpyc.core.service import Service
from rpyc.lib import spawn
from rpyc.lib.compat import select_error
from rpyc.utils.server import ThreadedServer
class ServeResult:
"""Result of serving requests on GDB thread."""
def __init__(self):
self.cv = Condition()
self.done = False
self.exc = None
def set(self, exc):
with self.cv:
self.done = True
self.exc = exc
self.cv.notify()
def wait(self):
with self.cv:
while not self.done:
self.cv.wait()
if self.exc is not None:
raise self.exc
class GdbConnection(Connection):
"""A Connection implementation that serves requests on GDB thread.
Serving on GDB thread might not be ideal from the responsiveness
perspective, however, it is simple and reliable.
"""
SERVE_TIME = 0.1 # Number of seconds to serve.
IDLE_TIME = 0.1 # Number of seconds to wait after serving.
def serve_gdb_thread(self, serve_result):
"""Serve requests on GDB thread."""
try:
deadline = time.time() + self.SERVE_TIME
while True:
timeout = deadline - time.time()
if timeout < 0:
break
super().serve(timeout=timeout)
except Exception as exc:
serve_result.set(exc)
else:
serve_result.set(None)
def serve_all(self):
"""Modified version of rpyc.core.protocol.Connection.serve_all."""
try:
while not self.closed:
serve_result = ServeResult()
gdb.post_event(lambda: self.serve_gdb_thread(serve_result))
serve_result.wait()
time.sleep(self.IDLE_TIME)
except (socket.error, select_error, IOError):
if not self.closed:
raise
except EOFError:
pass
finally:
self.close()
class GdbService(Service):
"""A public interface for Pwntools."""
_protocol = GdbConnection # Connection subclass.
exposed_gdb = gdb # ``gdb`` module.
def exposed_set_breakpoint(self, client, has_stop, *args, **kwargs):
"""Create a breakpoint and connect it with the client-side mirror."""
if has_stop:
class Breakpoint(gdb.Breakpoint):
def stop(self):
return client.stop()
return Breakpoint(*args, **kwargs)
return gdb.Breakpoint(*args, **kwargs)
def exposed_set_finish_breakpoint(self, client, has_stop, has_out_of_scope, *args, **kwargs):
"""Create a finish breakpoint and connect it with the client-side mirror."""
class FinishBreakpoint(gdb.FinishBreakpoint):
if has_stop:
def stop(self):
return client.stop()
if has_out_of_scope:
def out_of_scope(self):
client.out_of_scope()
return FinishBreakpoint(*args, **kwargs)
def exposed_quit(self):
"""Terminate GDB."""
gdb.post_event(lambda: gdb.execute('quit'))
spawn(ThreadedServer(
service=GdbService(),
socket_path=socket_path,
protocol_config={
'allow_all_attrs': True,
'allow_setattr': True,
},
).start)
|