File: blaeu-cert

package info (click to toggle)
blaeu 2.2-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 272 kB
  • sloc: python: 2,014; makefile: 3
file content (220 lines) | stat: -rwxr-xr-x 8,398 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

""" Python code to start a RIPE Atlas UDM (User-Defined
Measurement). This one is to test X.509/PKIX certificates in TLS servers.

You'll need an API key in ~/.atlas/auth.

After launching the measurement, it downloads the results and analyzes
them, displaying the name ("subject" in X.509 parlance) or issuer.

Stéphane Bortzmeyer <stephane+frama@bortzmeyer.org>
"""

import json
import time
import os
import string
import sys
import time
import socket
import collections
import copy

import Blaeu
from Blaeu import Host_Type

# https://github.com/pyca/pyopenssl https://pyopenssl.readthedocs.org/en/stable/
import OpenSSL.crypto
import cryptography

config = Blaeu.Config()
# Default values
config.display = "n" #Name
config.sni = True
config.hostname = None
config.entire_chain = False
# Override what's in the Blaeu package
config.port = 443

X509DATETIME = "%Y%m%d%H%M%SZ"
RFC3339DATETIME = "%Y-%m-%dT%H:%M:%SZ"

class Set():
    def __init__(self):
        self.total = 0

def usage(msg=None):
    print("Usage: %s target-name-or-IP" % sys.argv[0], file=sys.stderr)
    config.usage(msg)
    print("""Also:
    --issuer or -I : displays the issuer (default is to display the name)
    --key or -k : displays the public key (default is to display the name)
    --serial or -S : displays the serial number (default is to display the name)
    --expiration or -E : displays the expiration datetime (default is to display the name)
    --entire-chain : displays all the certificates (default is to display the first)
    --hostname : hostname to send for SNI (default is to use the target)
    --no-sni : do not send the SNI (Server Name Indication) (default is to send it)""",
          file=sys.stderr)

def format_name(n):
    result = ""
    components = n.get_components()
    for (k, v) in components:
        result += "/%s=%s" % (k.decode(), v.decode())
    return result

def specificParse(config, option, value):
    result = True
    if option == "--issuer" or option == "-I":
        config.display = "i"
    elif option == "--key" or option == "-k":
        config.display = "k"
    elif option == "--serial" or option == "-S":
        config.display = "s"
    elif option == "--expiration" or option == "-E":
        config.display = "e"
    elif option == "--entire-chain":
        config.entire_chain = True
    elif option == "--hostname":
        config.hostname = value
    elif option == "--no-sni":
        config.sni = False
    else:
        result = False
    return result

(args, data) = config.parse("IkSE", ["issuer", "serial", "expiration", "key", "entire-chain",
                                     "hostname=", "no-sni"],
                            specificParse, usage)

if len(args) != 1:
    usage("Not the good number of arguments")
    sys.exit(1)
target = args[0]

if config.measurement_id is None:
    data["definitions"][0]["target"] = target
    data["definitions"][0]["type"] = "sslcert"
    data["definitions"][0]["description"] = "X.509 cert of %s" % target
    del data["definitions"][0]["size"] # Meaningless argument
    target_type = Blaeu.host_type(target)
    # RFC 6066 say that we cannot accept a literal IP
    # address as hostname.
    if target_type != Host_Type.Name and config.hostname is None and config.sni:
        usage("If the target is an IP address, we need --hostname (or --no-sni)")
        sys.exit(1)
    if target_type == Host_Type.IPv6:
        config.ipv4 = False
        af = 6
        if config.include is not None:
            data["probes"][0]["tags"]["include"] = copy.copy(config.include)
            data["probes"][0]["tags"]["include"].append("system-ipv6-works")
        else:
            data["probes"][0]["tags"]["include"] = ["system-ipv6-works",]
    elif target_type == Host_Type.IPv4:
        config.ipv4 = True
        af = 4
        if config.include is not None:
            data["probes"][0]["tags"]["include"] = copy.copy(config.include)
            data["probes"][0]["tags"]["include"].append("system-ipv4-works")
        else:
            data["probes"][0]["tags"]["include"] = ["system-ipv4-works",]
    else:
        # Hostname
        if config.ipv4:
            af = 4
        else:
            af = 6
    data["definitions"][0]['af'] = af
    if config.sni: # See above about RFC 6066
        if config.hostname is not None: # Even if the target is a host
                                        # name, we honor --hostname.
            data["definitions"][0]['hostname'] = config.hostname
        else:
            data["definitions"][0]['hostname'] = target

    if config.verbose:
        print(data)

    try:
        measurement = Blaeu.Measurement(data)
    except Blaeu.RequestSubmissionError as error:
        print(Blaeu.format_error(error), file=sys.stderr)
        sys.exit(1)        
    if config.verbose:
            print("Measurement #%s to %s uses %i probes" % (measurement.id, target,
                                                        measurement.num_probes))
    rdata = measurement.results(wait=True, percentage_required=config.percentage_required)
else:
    measurement = Blaeu.Measurement(data=None, id=config.measurement_id)
    rdata = measurement.results(wait=False)

sets = collections.defaultdict(Set)
if config.display_probe_asns:
    config.display_probes = True
if config.display_probes:
    probes_sets = collections.defaultdict(Set)
print(("%s probes reported" % len(rdata)))
for result in rdata:
        if config.display_probes:
            probe_id = result["prb_id"]
        if config.display_probe_asns:
            details = Blaeu.ProbeCache.cache_probe_id(config.cache_probes, probe_id) \
                if config.cache_probes else Blaeu.Probe(probe_id)
            asn = getattr(details, "asn_v%i" % (4 if config.ipv4 else 6), None)
        if 'cert' in result:
            value = []
            if config.entire_chain:
                num = len(result['cert'])
            else:
                num = 1
            for i in range(0, num):
                x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
                                                       str(result['cert'][i]))
                detail = ""
                content = format_name(x509.get_subject())
                if config.display == "i":
                    content += format_name(x509.get_issuer())
                elif config.display == "k":
                    key = x509.get_pubkey()
                    content = "%s, type %s, %s bits" % \
                        (key.to_cryptography_key().public_bytes(cryptography.hazmat.primitives.serialization.Encoding.PEM, cryptography.hazmat.primitives.serialization.PublicFormat.SubjectPublicKeyInfo).decode().replace("\n", " ").replace("-----BEGIN PUBLIC KEY----- ", "")[:80] + "...",
                         key.type(), key.bits())
                elif config.display == "s":
                    content = format(x509.get_serial_number(), '05x')
                elif config.display == "e":
                    if x509.has_expired():
                        detail = " (EXPIRED)"
                    t = time.strptime(x509.get_notAfter().decode(), X509DATETIME)
                    content = "%s%s" % (time.strftime(RFC3339DATETIME, t), detail)
                value.append("%s%s" % (content, detail))
            value = "; ".join(value)
        else:
            if 'err' in result:
                error = result['err']
            elif 'alert' in result:
                error = result['alert']
            else:
                error = "UNKNOWN ERROR"
            value = "FAILED TO GET A CERT: %s" % error
        sets[value].total += 1
        if config.display_probes:
            if config.display_probe_asns:
                info = [probe_id, asn]
            else:
                info = probe_id
            if value in probes_sets:
                probes_sets[value].append(info)
            else:
                probes_sets[value] = [info,]
sets_data = sorted(sets, key=lambda s: sets[s].total, reverse=False)
for myset in sets_data:
    detail = ""
    if config.display_probes:
        detail = "(probes %s)" % probes_sets[myset]
    print("[%s] : %i occurrences %s" % (myset, sets[myset].total, detail))

print(("Test #%s done at %s" % (measurement.id,
                                time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()))))