1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
|
#!/usr/bin/python
#
from pyasn1.codec.der import decoder, encoder
from pyasn1_modules import rfc2560, rfc2459, pem
from pyasn1.type import univ
import sys, hashlib
try:
import urllib2
except ImportError:
import urllib.request as urllib2
sha1oid = univ.ObjectIdentifier((1, 3, 14, 3, 2, 26))
class ValueOnlyBitStringEncoder(encoder.encoder.BitStringEncoder):
# These methods just do not encode tag and length fields of TLV
def encodeTag(self, *args): return ''
def encodeLength(self, *args): return ''
def encodeValue(*args):
substrate, isConstructed = encoder.encoder.BitStringEncoder.encodeValue(*args)
# OCSP-specific hack follows: cut off the "unused bit count"
# encoded bit-string value.
return substrate[1:], isConstructed
def __call__(self, bitStringValue):
return self.encode(None, bitStringValue, defMode=1, maxChunkSize=0)
valueOnlyBitStringEncoder = ValueOnlyBitStringEncoder()
def mkOcspRequest(issuerCert, userCert):
issuerTbsCertificate = issuerCert.getComponentByName('tbsCertificate')
issuerSubject = issuerTbsCertificate.getComponentByName('subject')
userTbsCertificate = userCert.getComponentByName('tbsCertificate')
userIssuer = userTbsCertificate.getComponentByName('issuer')
assert issuerSubject == userIssuer, '%s\n%s' % (
issuerSubject.prettyPrint(), userIssuer.prettyPrint()
)
userIssuerHash = hashlib.sha1(
encoder.encode(userIssuer)
).digest()
issuerSubjectPublicKey = issuerTbsCertificate.getComponentByName('subjectPublicKeyInfo').getComponentByName('subjectPublicKey')
issuerKeyHash = hashlib.sha1(
valueOnlyBitStringEncoder(issuerSubjectPublicKey)
).digest()
userSerialNumber = userTbsCertificate.getComponentByName('serialNumber')
# Build request object
request = rfc2560.Request()
reqCert = request.setComponentByName('reqCert').getComponentByName('reqCert')
hashAlgorithm = reqCert.setComponentByName('hashAlgorithm').getComponentByName('hashAlgorithm')
hashAlgorithm.setComponentByName('algorithm', sha1oid)
reqCert.setComponentByName('issuerNameHash', userIssuerHash)
reqCert.setComponentByName('issuerKeyHash', issuerKeyHash)
reqCert.setComponentByName('serialNumber', userSerialNumber)
ocspRequest = rfc2560.OCSPRequest()
tbsRequest = ocspRequest.setComponentByName('tbsRequest').getComponentByName('tbsRequest')
tbsRequest.setComponentByName('version', 'v1')
requestList = tbsRequest.setComponentByName('requestList').getComponentByName('requestList')
requestList.setComponentByPosition(0, request)
return ocspRequest
def parseOcspRequest(ocspRequest):
tbsRequest = ocspRequest['responseStatus']
assert responseStatus == rfc2560.OCSPResponseStatus('successful'), responseStatus.prettyPrint()
responseBytes = ocspResponse.getComponentByName('responseBytes')
responseType = responseBytes.getComponentByName('responseType')
assert responseType == id_pkix_ocsp_basic, responseType.prettyPrint()
response = responseBytes.getComponentByName('response')
basicOCSPResponse, _ = decoder.decode(
response, asn1Spec=rfc2560.BasicOCSPResponse()
)
tbsResponseData = basicOCSPResponse.getComponentByName('tbsResponseData')
response0 = tbsResponseData.getComponentByName('responses').getComponentByPosition(0)
return (
tbsResponseData.getComponentByName('producedAt'),
response0.getComponentByName('certID'),
response0.getComponentByName('certStatus').getName(),
response0.getComponentByName('thisUpdate')
)
if len(sys.argv) != 2:
print("""Usage:
$ cat CACertificate.pem userCertificate.pem | %s <ocsp-responder-url>""" % sys.argv[0])
sys.exit(-1)
else:
ocspUrl = sys.argv[1]
# Parse CA and user certificates
issuerCert, _ = decoder.decode(
pem.readPemFromFile(sys.stdin)[1],
asn1Spec=rfc2459.Certificate()
)
userCert, _ = decoder.decode(
pem.readPemFromFile(sys.stdin)[1],
asn1Spec=rfc2459.Certificate()
)
# Build OCSP request
ocspReq = mkOcspRequest(issuerCert, userCert)
# Use HTTP POST to get response (see Appendix A of RFC 2560)
# In case you need proxies, set the http_proxy env variable
httpReq = urllib2.Request(
ocspUrl,
encoder.encode(ocspReq),
{ 'Content-Type': 'application/ocsp-request' }
)
httpRsp = urllib2.urlopen(httpReq).read()
# Process OCSP response
ocspRsp, _ = decoder.decode(httpRsp, asn1Spec=rfc2560.OCSPResponse())
producedAt, certId, certStatus, thisUpdate = parseOcspResponse(ocspRsp)
print('Certificate ID %s is %s at %s till %s\n' % (
certId.getComponentByName('serialNumber'),
certStatus,
producedAt,
thisUpdate
))
|