1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
|
from __future__ import print_function
import re
import select
import threading
import traceback
import codecs
from six.moves import queue
def _handle_output_packet_string(packet_contents):
if (not packet_contents) or (len(packet_contents) < 1):
return None
elif packet_contents[0] != "O":
return None
elif packet_contents == "OK":
return None
else:
return packet_contents[1:].decode("hex")
def _dump_queue(the_queue):
while not the_queue.empty():
print(codecs.encode(the_queue.get(True), "string_escape"))
print("\n")
class PumpQueues(object):
def __init__(self):
self._output_queue = queue.Queue()
self._packet_queue = queue.Queue()
def output_queue(self):
return self._output_queue
def packet_queue(self):
return self._packet_queue
def verify_queues_empty(self):
# Warn if there is any content left in any of the queues.
# That would represent unmatched packets.
if not self.output_queue().empty():
print("warning: output queue entries still exist:")
_dump_queue(self.output_queue())
print("from here:")
traceback.print_stack()
if not self.packet_queue().empty():
print("warning: packet queue entries still exist:")
_dump_queue(self.packet_queue())
print("from here:")
traceback.print_stack()
class SocketPacketPump(object):
"""A threaded packet reader that partitions packets into two streams.
All incoming $O packet content is accumulated with the current accumulation
state put into the OutputQueue.
All other incoming packets are placed in the packet queue.
A select thread can be started and stopped, and runs to place packet
content into the two queues.
"""
_GDB_REMOTE_PACKET_REGEX = re.compile(r'^\$([^\#]*)#[0-9a-fA-F]{2}')
def __init__(self, pump_socket, pump_queues, logger=None):
if not pump_socket:
raise Exception("pump_socket cannot be None")
self._thread = None
self._stop_thread = False
self._socket = pump_socket
self._logger = logger
self._receive_buffer = ""
self._accumulated_output = ""
self._pump_queues = pump_queues
def __enter__(self):
"""Support the python 'with' statement.
Start the pump thread."""
self.start_pump_thread()
return self
def __exit__(self, exit_type, value, the_traceback):
"""Support the python 'with' statement.
Shut down the pump thread."""
self.stop_pump_thread()
def start_pump_thread(self):
if self._thread:
raise Exception("pump thread is already running")
self._stop_thread = False
self._thread = threading.Thread(target=self._run_method)
self._thread.start()
def stop_pump_thread(self):
self._stop_thread = True
if self._thread:
self._thread.join()
def _process_new_bytes(self, new_bytes):
if not new_bytes:
return
if len(new_bytes) < 1:
return
# Add new bytes to our accumulated unprocessed packet bytes.
self._receive_buffer += new_bytes
# Parse fully-formed packets into individual packets.
has_more = len(self._receive_buffer) > 0
while has_more:
if len(self._receive_buffer) <= 0:
has_more = False
# handle '+' ack
elif self._receive_buffer[0] == "+":
self._pump_queues.packet_queue().put("+")
self._receive_buffer = self._receive_buffer[1:]
if self._logger:
self._logger.debug(
"parsed packet from stub: +\n" +
"new receive_buffer: {}".format(
self._receive_buffer))
else:
packet_match = self._GDB_REMOTE_PACKET_REGEX.match(
self._receive_buffer)
if packet_match:
# Our receive buffer matches a packet at the
# start of the receive buffer.
new_output_content = _handle_output_packet_string(
packet_match.group(1))
if new_output_content:
# This was an $O packet with new content.
self._accumulated_output += new_output_content
self._pump_queues.output_queue().put(self._accumulated_output)
else:
# Any packet other than $O.
self._pump_queues.packet_queue().put(packet_match.group(0))
# Remove the parsed packet from the receive
# buffer.
self._receive_buffer = self._receive_buffer[
len(packet_match.group(0)):]
if self._logger:
self._logger.debug(
"parsed packet from stub: " +
packet_match.group(0))
self._logger.debug(
"new receive_buffer: " +
self._receive_buffer)
else:
# We don't have enough in the receive bufferto make a full
# packet. Stop trying until we read more.
has_more = False
def _run_method(self):
self._receive_buffer = ""
self._accumulated_output = ""
if self._logger:
self._logger.info("socket pump starting")
# Keep looping around until we're asked to stop the thread.
while not self._stop_thread:
can_read, _, _ = select.select([self._socket], [], [], 0)
if can_read and self._socket in can_read:
try:
new_bytes = self._socket.recv(4096)
if self._logger and new_bytes and len(new_bytes) > 0:
self._logger.debug(
"pump received bytes: {}".format(new_bytes))
except:
# Likely a closed socket. Done with the pump thread.
if self._logger:
self._logger.debug(
"socket read failed, stopping pump read thread\n" +
traceback.format_exc(3))
break
self._process_new_bytes(new_bytes)
if self._logger:
self._logger.info("socket pump exiting")
def get_accumulated_output(self):
return self._accumulated_output
def get_receive_buffer(self):
return self._receive_buffer
|