1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136
|
import datetime
import re
import subprocess
import sys
import time
from threading import Thread
from .env import HttpdTestEnv
class CurlPiper:
def __init__(self, env: HttpdTestEnv, url: str):
self.env = env
self.url = url
self.proc = None
self.args = None
self.headerfile = None
self._stderr = []
self._stdout = []
self.stdout_thread = None
self.stderr_thread = None
self._exitcode = -1
self._r = None
@property
def exitcode(self):
return self._exitcode
@property
def response(self):
return self._r.response if self._r else None
def __repr__(self):
return f'CurlPiper[exitcode={self._exitcode}, stderr={self._stderr}, stdout={self._stdout}]'
def start(self):
self.args, self.headerfile = self.env.curl_complete_args([self.url], timeout=5, options=[
"-T", "-", "-X", "POST", "--trace-ascii", "%", "--trace-time"
])
self.args.append(self.url)
sys.stderr.write("starting: {0}\n".format(self.args))
self.proc = subprocess.Popen(self.args, stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=0)
def read_output(fh, buffer):
while True:
chunk = fh.read()
if not chunk:
break
buffer.append(chunk.decode())
# collect all stdout and stderr until we are done
# use separate threads to not block ourself
self._stderr = []
self._stdout = []
if self.proc.stderr:
self.stderr_thread = Thread(target=read_output, args=(self.proc.stderr, self._stderr))
self.stderr_thread.start()
if self.proc.stdout:
self.stdout_thread = Thread(target=read_output, args=(self.proc.stdout, self._stdout))
self.stdout_thread.start()
return self.proc
def send(self, data: str):
self.proc.stdin.write(data.encode())
self.proc.stdin.flush()
def close(self) -> ([str], [str]):
self.proc.stdin.close()
self.stdout_thread.join()
self.stderr_thread.join()
self._end()
return self._stdout, self._stderr
def _end(self):
if self.proc:
# noinspection PyBroadException
try:
if self.proc.stdin:
# noinspection PyBroadException
try:
self.proc.stdin.close()
except Exception:
pass
if self.proc.stdout:
self.proc.stdout.close()
if self.proc.stderr:
self.proc.stderr.close()
except Exception:
self.proc.terminate()
finally:
self.proc.wait()
self.stdout_thread = None
self.stderr_thread = None
self._exitcode = self.proc.returncode
self.proc = None
self._r = self.env.curl_parse_headerfile(self.headerfile)
def stutter_check(self, chunks: [str], stutter: datetime.timedelta):
if not self.proc:
self.start()
for chunk in chunks:
self.send(chunk)
time.sleep(stutter.total_seconds())
recv_out, recv_err = self.close()
# assert we got everything back
assert "".join(chunks) == "".join(recv_out)
# now the tricky part: check *when* we got everything back
recv_times = []
for line in "".join(recv_err).split('\n'):
m = re.match(r'^\s*(\d+:\d+:\d+(\.\d+)?) <= Recv data, (\d+) bytes.*', line)
if m and int(m.group(3)) > 0:
recv_times.append(datetime.time.fromisoformat(m.group(1)))
# received as many chunks as we sent
assert len(chunks) == len(recv_times), "received response not in {0} chunks, but {1}".format(
len(chunks), len(recv_times))
def microsecs(tdelta):
return ((tdelta.hour * 60 + tdelta.minute) * 60 + tdelta.second) * 1000000 + tdelta.microsecond
recv_deltas = []
last_mics = microsecs(recv_times[0])
for ts in recv_times[1:]:
mics = microsecs(ts)
delta_mics = mics - last_mics
if delta_mics < 0:
delta_mics += datetime.time(23, 59, 59, 999999)
recv_deltas.append(datetime.timedelta(microseconds=delta_mics))
last_mics = mics
stutter_td = datetime.timedelta(seconds=stutter.total_seconds() * 0.75) # 25% leeway
for idx, td in enumerate(recv_deltas[1:]):
assert stutter_td < td, \
f"chunk {idx} arrived too early \n{recv_deltas}\nafter {td}\n{recv_err}"
|