1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
|
#!/usr/bin/env python
"""
Copyright (c) 2006-2025 sqlmap developers (https://sqlmap.org)
See the file 'LICENSE' for copying permission
"""
import base64
import datetime
import io
import re
import time
from lib.core.bigarray import BigArray
from lib.core.convert import getBytes
from lib.core.convert import getText
from lib.core.settings import VERSION
from thirdparty.six.moves import BaseHTTPServer as _BaseHTTPServer
from thirdparty.six.moves import http_client as _http_client
# Reference: https://dvcs.w3.org/hg/webperf/raw-file/tip/specs/HAR/Overview.html
# http://www.softwareishard.com/har/viewer/
class HTTPCollectorFactory(object):
def __init__(self, harFile=False):
self.harFile = harFile
def create(self):
return HTTPCollector()
class HTTPCollector(object):
def __init__(self):
self.messages = BigArray()
self.extendedArguments = {}
def setExtendedArguments(self, arguments):
self.extendedArguments = arguments
def collectRequest(self, requestMessage, responseMessage, startTime=None, endTime=None):
self.messages.append(RawPair(requestMessage, responseMessage,
startTime=startTime, endTime=endTime,
extendedArguments=self.extendedArguments))
def obtain(self):
return {"log": {
"version": "1.2",
"creator": {"name": "sqlmap", "version": VERSION},
"entries": [pair.toEntry().toDict() for pair in self.messages],
}}
class RawPair(object):
def __init__(self, request, response, startTime=None, endTime=None, extendedArguments=None):
self.request = getBytes(request)
self.response = getBytes(response)
self.startTime = startTime
self.endTime = endTime
self.extendedArguments = extendedArguments or {}
def toEntry(self):
return Entry(request=Request.parse(self.request), response=Response.parse(self.response),
startTime=self.startTime, endTime=self.endTime,
extendedArguments=self.extendedArguments)
class Entry(object):
def __init__(self, request, response, startTime, endTime, extendedArguments):
self.request = request
self.response = response
self.startTime = startTime or 0
self.endTime = endTime or 0
self.extendedArguments = extendedArguments
def toDict(self):
out = {
"request": self.request.toDict(),
"response": self.response.toDict(),
"cache": {},
"timings": {
"send": -1,
"wait": -1,
"receive": -1,
},
"time": int(1000 * (self.endTime - self.startTime)),
"startedDateTime": "%s%s" % (datetime.datetime.fromtimestamp(self.startTime).isoformat(), time.strftime("%z")) if self.startTime else None
}
out.update(self.extendedArguments)
return out
class Request(object):
def __init__(self, method, path, httpVersion, headers, postBody=None, raw=None, comment=None):
self.method = method
self.path = path
self.httpVersion = httpVersion
self.headers = headers or {}
self.postBody = postBody
self.comment = comment.strip() if comment else comment
self.raw = raw
@classmethod
def parse(cls, raw):
request = HTTPRequest(raw)
return cls(method=request.command,
path=request.path,
httpVersion=request.request_version,
headers=request.headers,
postBody=request.rfile.read(),
comment=request.comment,
raw=raw)
@property
def url(self):
host = self.headers.get("Host", "unknown")
return "http://%s%s" % (host, self.path)
def toDict(self):
out = {
"httpVersion": self.httpVersion,
"method": self.method,
"url": self.url,
"headers": [dict(name=key.capitalize(), value=value) for key, value in self.headers.items()],
"cookies": [],
"queryString": [],
"headersSize": -1,
"bodySize": -1,
"comment": getText(self.comment),
}
if self.postBody:
contentType = self.headers.get("Content-Type")
out["postData"] = {
"mimeType": contentType,
"text": getText(self.postBody).rstrip("\r\n"),
}
return out
class Response(object):
extract_status = re.compile(b'\\((\\d{3}) (.*)\\)')
def __init__(self, httpVersion, status, statusText, headers, content, raw=None, comment=None):
self.raw = raw
self.httpVersion = httpVersion
self.status = status
self.statusText = statusText
self.headers = headers
self.content = content
self.comment = comment.strip() if comment else comment
@classmethod
def parse(cls, raw):
altered = raw
comment = b""
if altered.startswith(b"HTTP response [") or altered.startswith(b"HTTP redirect ["):
stream = io.BytesIO(raw)
first_line = stream.readline()
parts = cls.extract_status.search(first_line)
status_line = "HTTP/1.0 %s %s" % (getText(parts.group(1)), getText(parts.group(2)))
remain = stream.read()
altered = getBytes(status_line) + b"\r\n" + remain
comment = first_line
response = _http_client.HTTPResponse(FakeSocket(altered))
response.begin()
try:
content = response.read()
except _http_client.IncompleteRead:
content = raw[raw.find(b"\r\n\r\n") + 4:].rstrip(b"\r\n")
return cls(httpVersion="HTTP/1.1" if response.version == 11 else "HTTP/1.0",
status=response.status,
statusText=response.reason,
headers=response.msg,
content=content,
comment=comment,
raw=raw)
def toDict(self):
content = {
"mimeType": self.headers.get("Content-Type"),
"text": self.content,
"size": len(self.content or "")
}
binary = set([b'\0', b'\1'])
if any(c in binary for c in self.content):
content["encoding"] = "base64"
content["text"] = getText(base64.b64encode(self.content))
else:
content["text"] = getText(content["text"])
return {
"httpVersion": self.httpVersion,
"status": self.status,
"statusText": self.statusText,
"headers": [dict(name=key.capitalize(), value=value) for key, value in self.headers.items() if key.lower() != "uri"],
"cookies": [],
"content": content,
"headersSize": -1,
"bodySize": -1,
"redirectURL": "",
"comment": getText(self.comment),
}
class FakeSocket(object):
# Original source:
# https://stackoverflow.com/questions/24728088/python-parse-http-response-string
def __init__(self, response_text):
self._file = io.BytesIO(response_text)
def makefile(self, *args, **kwargs):
return self._file
class HTTPRequest(_BaseHTTPServer.BaseHTTPRequestHandler):
# Original source:
# https://stackoverflow.com/questions/4685217/parse-raw-http-headers
def __init__(self, request_text):
self.comment = None
self.rfile = io.BytesIO(request_text)
self.raw_requestline = self.rfile.readline()
if self.raw_requestline.startswith(b"HTTP request ["):
self.comment = self.raw_requestline
self.raw_requestline = self.rfile.readline()
self.error_code = self.error_message = None
self.parse_request()
def send_error(self, code, message):
self.error_code = code
self.error_message = message
|