1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
|
#!/usr/bin/env python
"""
Copyright (c) 2006-2025 sqlmap developers (https://sqlmap.org)
See the file 'LICENSE' for copying permission
"""
from __future__ import print_function
import binascii
import os
import re
import socket
import struct
import threading
import time
class DNSQuery(object):
"""
>>> DNSQuery(b'|K\\x01 \\x00\\x01\\x00\\x00\\x00\\x00\\x00\\x01\\x03www\\x06google\\x03com\\x00\\x00\\x01\\x00\\x01\\x00\\x00)\\x10\\x00\\x00\\x00\\x00\\x00\\x00\\x0c\\x00\\n\\x00\\x08O4|Np!\\x1d\\xb3')._query == b"www.google.com."
True
>>> DNSQuery(b'\\x00')._query == b""
True
"""
def __init__(self, raw):
self._raw = raw
self._query = b""
try:
type_ = (ord(raw[2:3]) >> 3) & 15 # Opcode bits
if type_ == 0: # Standard query
i = 12
j = ord(raw[i:i + 1])
while j != 0:
self._query += raw[i + 1:i + j + 1] + b'.'
i = i + j + 1
j = ord(raw[i:i + 1])
except TypeError:
pass
def response(self, resolution):
"""
Crafts raw DNS resolution response packet
"""
retVal = b""
if self._query:
retVal += self._raw[:2] # Transaction ID
retVal += b"\x85\x80" # Flags (Standard query response, No error)
retVal += self._raw[4:6] + self._raw[4:6] + b"\x00\x00\x00\x00" # Questions and Answers Counts
retVal += self._raw[12:(12 + self._raw[12:].find(b"\x00") + 5)] # Original Domain Name Query
retVal += b"\xc0\x0c" # Pointer to domain name
retVal += b"\x00\x01" # Type A
retVal += b"\x00\x01" # Class IN
retVal += b"\x00\x00\x00\x20" # TTL (32 seconds)
retVal += b"\x00\x04" # Data length
retVal += b"".join(struct.pack('B', int(_)) for _ in resolution.split('.')) # 4 bytes of IP
return retVal
class DNSServer(object):
"""
Used for making fake DNS resolution responses based on received
raw request
Reference(s):
https://code.activestate.com/recipes/491264-mini-fake-dns-server/
https://web.archive.org/web/20150418152405/https://code.google.com/p/marlon-tools/source/browse/tools/dnsproxy/dnsproxy.py
"""
def __init__(self):
self._check_localhost()
self._requests = []
self._lock = threading.Lock()
try:
self._socket = socket._orig_socket(socket.AF_INET, socket.SOCK_DGRAM)
except AttributeError:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self._socket.bind(("", 53))
self._running = False
self._initialized = False
def _check_localhost(self):
response = b""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("", 53))
s.send(binascii.unhexlify("6509012000010000000000010377777706676f6f676c6503636f6d00000100010000291000000000000000")) # A www.google.com
response = s.recv(512)
except:
pass
finally:
if response and b"google" in response:
raise socket.error("another DNS service already running on '0.0.0.0:53'")
def pop(self, prefix=None, suffix=None):
"""
Returns received DNS resolution request (if any) that has given
prefix/suffix combination (e.g. prefix.<query result>.suffix.domain)
"""
retVal = None
if prefix and hasattr(prefix, "encode"):
prefix = prefix.encode()
if suffix and hasattr(suffix, "encode"):
suffix = suffix.encode()
with self._lock:
for _ in self._requests:
if prefix is None and suffix is None or re.search(b"%s\\..+\\.%s" % (prefix, suffix), _, re.I):
self._requests.remove(_)
retVal = _.decode()
break
return retVal
def run(self):
"""
Runs a DNSServer instance as a daemon thread (killed by program exit)
"""
def _():
try:
self._running = True
self._initialized = True
while True:
data, addr = self._socket.recvfrom(1024)
_ = DNSQuery(data)
self._socket.sendto(_.response("127.0.0.1"), addr)
with self._lock:
self._requests.append(_._query)
except KeyboardInterrupt:
raise
finally:
self._running = False
thread = threading.Thread(target=_)
thread.daemon = True
thread.start()
if __name__ == "__main__":
server = None
try:
server = DNSServer()
server.run()
while not server._initialized:
time.sleep(0.1)
while server._running:
while True:
_ = server.pop()
if _ is None:
break
else:
print("[i] %s" % _)
time.sleep(1)
except socket.error as ex:
if 'Permission' in str(ex):
print("[x] Please run with sudo/Administrator privileges")
else:
raise
except KeyboardInterrupt:
os._exit(0)
finally:
if server:
server._running = False
|