File: dnslookup.py

package info (click to toggle)
golang-github-google-certificate-transparency 0.0~git20160709.0.0f6e3d1~ds1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye, buster
  • size: 5,676 kB
  • sloc: cpp: 35,278; python: 11,838; java: 1,911; sh: 1,885; makefile: 950; xml: 520; ansic: 225
file content (121 lines) | stat: -rwxr-xr-x 3,851 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#!/usr/bin/env python

# Implement DNS inclusion proof checking, see [TBD].
#
# Unfortunately, getting at the SCTs in general is hard in Python, so this
# does not start with an SSL connection, but instead fetches a log entry by
# index and then verifies the proof over DNS.

# You will need to install DNSPython (http://www.dnspython.org/)

import base64
import dns.resolver
import hashlib
import json
import logging
import os
import sys
import urllib2

basepath = os.path.dirname(sys.argv[0])
sys.path.append(os.path.join(basepath, '../../../python'))
from ct.crypto import merkle, pem, verify
from ct.proto import client_pb2

class CTDNSLookup:
    def __init__(self, domain, verifier, resolver=None):
        self.verifier = verifier
        self.domain = domain
        self.resolver = resolver
        if not self.resolver:
            self.resolver = dns.resolver.get_default_resolver()

    def Get(self, name):
        logging.info('get %s', name)
        answers = self.resolver.query(name, 'TXT')
        assert answers.rdtype == dns.rdatatype.TXT
        return answers

    def GetOne(self, name):
        name += '.%s' % self.domain
        answers = self.Get(name)
        assert len(answers) == 1
        txt = answers[0]
        return ''.join(txt.strings)

    def GetSTH(self):
        sth_str = self.GetOne('sth')
        sth = client_pb2.SthResponse()
        parts = str(sth_str).split('.')
        sth.tree_size = int(parts[0])
        sth.timestamp = int(parts[1])
        sth.sha256_root_hash = base64.b64decode(parts[2])
        sth.tree_head_signature = base64.b64decode(parts[3])

        self.verifier.verify_sth(sth)

        return sth

    def GetEntry(self, level, index, size):
        return self.GetOne('%d.%d.%d.tree' % (level, index, size))

    def GetIndexFromHash(self, hash):
        return self.GetOne('%s.hash' % base64.b32encode(hash).rstrip('='))

if __name__ == '__main__':
    logging.basicConfig(level='INFO')

    index = sys.argv[1]

    keypem = ('-----BEGIN PUBLIC KEY-----\n'
              'MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEfahLEimAoz2t01p\n'
              '3uMziiLOl/fHTDM0YDOhBRuiBARsV4UvxG2LdNgoIGLrtCzWE0J\n'
              '5APC2em4JlvR8EEEFMoA==\n'
              '-----END PUBLIC KEY-----\n')
    logurl = 'http://ct.googleapis.com/pilot';
    logdns = 'pilot.ct.googleapis.com'

    response = urllib2.urlopen('%s/ct/v1/get-entries?start=%s&end=%s'
                               %  (logurl, index, index))
    j = response.read()
    j = json.loads(j)
    leaf_input = j['entries'][0]['leaf_input']
    logging.info('leaf = %s', leaf_input)
    leaf = base64.b64decode(leaf_input)
    leaf_hash = hashlib.sha256(chr(0) + leaf).digest()

    keyinfo = verify.create_key_info_from_raw_key(pem.from_pem(keypem, 'PUBLIC KEY')[0])
    log_verifier = verify.LogVerifier(keyinfo)

    lookup = CTDNSLookup(logdns, log_verifier)
    sth = lookup.GetSTH()
    logging.info('sth = %s', sth)

    logging.info('hash = %s', base64.b64encode(leaf_hash))
    verifier = merkle.MerkleVerifier()
    index = int(index)
    audit_path = []
    prev = None
    apl = verifier.audit_path_length(index, sth.tree_size)
    for level in range(0, apl):
        h = lookup.GetEntry(level, index, sth.tree_size)
        logging.info('hash = %s', base64.b64encode(h))
        audit_path.append(h[:32])

        if prev:
            if level < apl - 6:
                assert prev[32:] == h[:-32]
            else:
                assert prev[32:] == h
        else:
            assert len(h) == 32 * min(7, apl)

        prev = h

    logging.info('path = %s', map(base64.b64encode, audit_path))

    assert verifier.verify_leaf_hash_inclusion(leaf_hash, index, audit_path,
                                               sth)

    hash_info = lookup.GetIndexFromHash(leaf_hash)
    assert hash_info == str(index)