File: generate_ev_whitelist.py

package info (click to toggle)
golang-github-google-certificate-transparency 0.0~git20160709.0.0f6e3d1~ds1-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, bullseye, buster
  • size: 5,676 kB
  • sloc: cpp: 35,278; python: 11,838; java: 1,911; sh: 1,885; makefile: 950; xml: 520; ansic: 225
file content (149 lines) | stat: -rw-r--r-- 5,337 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python
"""Generates a list of hashes of EV certificates found in a log."""

import hashlib
import functools
import pickle
import os
import sys

import ev_metadata
import gflags

from ct.client import scanner
from ct.crypto import cert
from ct.proto import client_pb2

FLAGS = gflags.FLAGS

gflags.DEFINE_integer("hash_trim", 8, "Number of bytes of the SHA-256 digest "
                      "to use in the whitelist.")

gflags.DEFINE_integer("multi", 2, "Number of cert fetching and parsing "
                      "processes to use, in addition to the main process.")

gflags.DEFINE_string("output_directory", None,
                     "Output directory for individual EV certificates. "
                     "If provided, individual EV certs will be written there.")


def calculate_certificate_hash(certificate):
    """Hash the input's DER representation and trim it."""
    hasher = hashlib.sha256(certificate.to_der())
    return hasher.digest()[0:FLAGS.hash_trim]


def find_matching_policies(certificate):
    """Returns the certificate's EV policy OID, if exists."""
    try:
        matching_policies = []
        for policy in certificate.policies():
            if policy['policyIdentifier'] in ev_metadata.EV_POLICIES:
                matching_policies.append(policy['policyIdentifier'])
        return matching_policies
    except cert.CertificateError:
        pass
    return []


def does_root_match_policy(policy_oid, cert_chain):
    """Returns true if the fingerprint of the root certificate matches the
    expected fingerprint for this EV policy OID."""
    if not cert_chain: # Empty chain
        return False
    root_fingerprint = hashlib.sha1(cert_chain[-1]).digest()
    return root_fingerprint in ev_metadata.EV_POLICIES[policy_oid]


def _write_cert_and_chain(
        output_dir, certificate, extra_data, certificate_index):
    """Writes the certificate and its chain to files for later analysis."""
    open(
        os.path.join(output_dir,
                     "cert_%d.der" % certificate_index), "wb"
        ).write(certificate.to_der())

    pickle.dump(
        list(extra_data.certificate_chain),
        open(os.path.join(output_dir,
                          "cert_%d_extra_data.pickle" % certificate_index),
             "wb"))

def _ev_match(
        output_dir, last_acceptable_entry_index, certificate, entry_type,
        extra_data, certificate_index):
    """Matcher function for the scanner. Returns the certificate's hash if
    it is a valid, non-expired, EV certificate, None otherwise."""
    # Only generate whitelist for non-precertificates. It is expected that if
    # a precertificate was submitted then the issued SCT would be embedded
    # in the final certificate.
    if entry_type != client_pb2.X509_ENTRY:
        return None
    # No point in including expired certificates.
    if certificate.is_expired():
        return None
    # Do not include entries beyond the last entry included in the whitelist
    # generated on January 1st, 2015.
    if certificate_index > last_acceptable_entry_index:
        return None

    # Only include certificates that have an EV OID.
    matching_policies = find_matching_policies(certificate)
    if not matching_policies:
        return None

    # Removed the requirement that the root of the chain matches the root that
    # should be used for the EV policy OID.
    # See https://code.google.com/p/chromium/issues/detail?id=524635 for
    # details.

    # Matching certificate
    if output_dir:
        _write_cert_and_chain(
            output_dir, certificate, extra_data, certificate_index)

    return calculate_certificate_hash(certificate)


def generate_ev_cert_hashes_from_log(
        log_url, num_processes, output_directory, last_acceptable_entry_index):
    """Scans the given log and generates a list of hashes for all EV
    certificates in it.

    Returns a tuple of (scan_results, hashes_list)"""
    if output_directory and not os.path.exists(output_directory):
        os.makedirs(output_directory)

    ev_hashes = set()
    def add_hash(cert_hash):
        """Store the hash. Always called from the main process, so safe."""
        ev_hashes.add(cert_hash)
    bound_ev_match = functools.partial(_ev_match, output_directory,
                                       last_acceptable_entry_index)
    res = scanner.scan_log(bound_ev_match, log_url, num_processes, add_hash)
    return (res, ev_hashes)

def main(output_file):
    """Scan and save results to a file."""
    res, hashes_set = generate_ev_cert_hashes_from_log(
        "https://ct.googleapis.com/pilot",
        FLAGS.multi,
        FLAGS.output_directory)
    print "Scanned %d, %d matched and %d failed strict or partial parsing" % (
        res.total, res.matches, res.errors)
    print "There are %d EV hashes." % (len(hashes_set))
    with open(output_file, "wb") as hashes_file:
        hashes_list = list(hashes_set)
        hashes_list.sort()
        for trimmed_hash in hashes_list:
            hashes_file.write(trimmed_hash)


if __name__ == "__main__":
    sys.argv = FLAGS(sys.argv)
    if len(sys.argv) < 2:
        sys.stderr.write(
            "Usage: %s <output file>\n  <output file> will contain the "
            "sorted, truncated hashes list.\n" % sys.argv[0])
        sys.exit(1)
    main(sys.argv[1])