File: dns.py

package info (click to toggle)
sqlmap 1.9.6-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 12,792 kB
  • sloc: python: 51,991; xml: 13,943; ansic: 989; sh: 304; makefile: 62; sql: 61; perl: 30; cpp: 27; asm: 7
file content (184 lines) | stat: -rw-r--r-- 6,016 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#!/usr/bin/env python

"""
Copyright (c) 2006-2025 sqlmap developers (https://sqlmap.org)
See the file 'LICENSE' for copying permission
"""

from __future__ import print_function

import binascii
import os
import re
import socket
import struct
import threading
import time

class DNSQuery(object):
    """
    >>> DNSQuery(b'|K\\x01 \\x00\\x01\\x00\\x00\\x00\\x00\\x00\\x01\\x03www\\x06google\\x03com\\x00\\x00\\x01\\x00\\x01\\x00\\x00)\\x10\\x00\\x00\\x00\\x00\\x00\\x00\\x0c\\x00\\n\\x00\\x08O4|Np!\\x1d\\xb3')._query == b"www.google.com."
    True
    >>> DNSQuery(b'\\x00')._query == b""
    True
    """

    def __init__(self, raw):
        self._raw = raw
        self._query = b""

        try:
            type_ = (ord(raw[2:3]) >> 3) & 15                   # Opcode bits

            if type_ == 0:                                      # Standard query
                i = 12
                j = ord(raw[i:i + 1])

                while j != 0:
                    self._query += raw[i + 1:i + j + 1] + b'.'
                    i = i + j + 1
                    j = ord(raw[i:i + 1])
        except TypeError:
            pass

    def response(self, resolution):
        """
        Crafts raw DNS resolution response packet
        """

        retVal = b""

        if self._query:
            retVal += self._raw[:2]                                                         # Transaction ID
            retVal += b"\x85\x80"                                                           # Flags (Standard query response, No error)
            retVal += self._raw[4:6] + self._raw[4:6] + b"\x00\x00\x00\x00"                 # Questions and Answers Counts
            retVal += self._raw[12:(12 + self._raw[12:].find(b"\x00") + 5)]                 # Original Domain Name Query
            retVal += b"\xc0\x0c"                                                           # Pointer to domain name
            retVal += b"\x00\x01"                                                           # Type A
            retVal += b"\x00\x01"                                                           # Class IN
            retVal += b"\x00\x00\x00\x20"                                                   # TTL (32 seconds)
            retVal += b"\x00\x04"                                                           # Data length
            retVal += b"".join(struct.pack('B', int(_)) for _ in resolution.split('.'))     # 4 bytes of IP

        return retVal

class DNSServer(object):
    """
    Used for making fake DNS resolution responses based on received
    raw request

    Reference(s):
        https://code.activestate.com/recipes/491264-mini-fake-dns-server/
        https://web.archive.org/web/20150418152405/https://code.google.com/p/marlon-tools/source/browse/tools/dnsproxy/dnsproxy.py
    """

    def __init__(self):
        self._check_localhost()
        self._requests = []
        self._lock = threading.Lock()

        try:
            self._socket = socket._orig_socket(socket.AF_INET, socket.SOCK_DGRAM)
        except AttributeError:
            self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self._socket.bind(("", 53))
        self._running = False
        self._initialized = False

    def _check_localhost(self):
        response = b""

        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            s.connect(("", 53))
            s.send(binascii.unhexlify("6509012000010000000000010377777706676f6f676c6503636f6d00000100010000291000000000000000"))  # A www.google.com
            response = s.recv(512)
        except:
            pass
        finally:
            if response and b"google" in response:
                raise socket.error("another DNS service already running on '0.0.0.0:53'")

    def pop(self, prefix=None, suffix=None):
        """
        Returns received DNS resolution request (if any) that has given
        prefix/suffix combination (e.g. prefix.<query result>.suffix.domain)
        """

        retVal = None

        if prefix and hasattr(prefix, "encode"):
            prefix = prefix.encode()

        if suffix and hasattr(suffix, "encode"):
            suffix = suffix.encode()

        with self._lock:
            for _ in self._requests:
                if prefix is None and suffix is None or re.search(b"%s\\..+\\.%s" % (prefix, suffix), _, re.I):
                    self._requests.remove(_)
                    retVal = _.decode()
                    break

        return retVal

    def run(self):
        """
        Runs a DNSServer instance as a daemon thread (killed by program exit)
        """

        def _():
            try:
                self._running = True
                self._initialized = True

                while True:
                    data, addr = self._socket.recvfrom(1024)
                    _ = DNSQuery(data)
                    self._socket.sendto(_.response("127.0.0.1"), addr)

                    with self._lock:
                        self._requests.append(_._query)

            except KeyboardInterrupt:
                raise

            finally:
                self._running = False

        thread = threading.Thread(target=_)
        thread.daemon = True
        thread.start()

if __name__ == "__main__":
    server = None
    try:
        server = DNSServer()
        server.run()

        while not server._initialized:
            time.sleep(0.1)

        while server._running:
            while True:
                _ = server.pop()

                if _ is None:
                    break
                else:
                    print("[i] %s" % _)

            time.sleep(1)

    except socket.error as ex:
        if 'Permission' in str(ex):
            print("[x] Please run with sudo/Administrator privileges")
        else:
            raise
    except KeyboardInterrupt:
        os._exit(0)
    finally:
        if server:
            server._running = False