File: multi_reverse_lookup.py

package info (click to toggle)
twisted 25.5.0-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 20,560 kB
  • sloc: python: 203,171; makefile: 200; sh: 92; javascript: 36; xml: 31
file content (107 lines) | stat: -rw-r--r-- 2,927 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#!/usr/bin/env python
# -*- test-case-name: twisted.names.test.test_examples -*-

# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.

"""
Lookup the reverse DNS pointer records for one or more IP addresses.

 python  multi_reverse_lookup.py 127.0.0.1  192.0.2.100

IPADDRESS: An IPv4 or IPv6 address.
"""
import socket
import sys

from twisted.internet import defer, task
from twisted.names import client
from twisted.python import usage


class Options(usage.Options):
    synopsis = "Usage: multi_reverse_lookup.py IPADDRESS [IPADDRESS]"

    def parseArgs(self, *addresses):
        self["addresses"] = addresses


def reverseNameFromIPv4Address(address):
    """
    Return a reverse domain name for the given IPv4 address.
    """
    tokens = list(reversed(address.split("."))) + ["in-addr", "arpa", ""]
    return ".".join(tokens)


def reverseNameFromIPv6Address(address):
    """
    Return a reverse domain name for the given IPv6 address.
    """
    # Expand addresses that are in compressed format eg ::1
    fullHex = "".join(
        f"{ord(c):02x}" for c in socket.inet_pton(socket.AF_INET6, address)
    )
    tokens = list(reversed(fullHex)) + ["ip6", "arpa", ""]
    return ".".join(tokens)


def reverseNameFromIPAddress(address):
    """
    Return a reverse domain name for the given IP address.
    """
    try:
        socket.inet_pton(socket.AF_INET, address)
    except OSError:
        return reverseNameFromIPv6Address(address)
    else:
        return reverseNameFromIPv4Address(address)


def printResult(result):
    """
    Print a comma separated list of reverse domain names and associated pointer
    records.
    """
    answers, authority, additional = result
    if answers:
        sys.stdout.write(
            ", ".join(f"{a.name.name} IN {a.payload}" for a in answers) + "\n"
        )


def printSummary(results):
    """
    Print a summary showing the total number of responses and queries.
    """
    statuses = zip(*results)[0]
    sys.stdout.write(
        f"{statuses.count(True)} responses to {len(statuses)} queries" + "\n"
    )


def main(reactor, *argv):
    options = Options()
    try:
        options.parseOptions(argv)
    except usage.UsageError as errortext:
        sys.stderr.write(str(options) + "\n")
        sys.stderr.write(f"ERROR: {errortext}\n")
        raise SystemExit(1)

    pending = []
    for address in options["addresses"]:
        pointerName = reverseNameFromIPAddress(address)
        # Force a single 1s timeout, so that slow or offline servers don't
        # adversely slow down the script.
        result = client.lookupPointer(pointerName, timeout=(1,))
        result.addCallback(printResult)
        pending.append(result)

    allResults = defer.DeferredList(pending, consumeErrors=False)
    allResults.addCallback(printSummary)
    return allResults


if __name__ == "__main__":
    task.react(main, sys.argv[1:])