1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
|
# frozen_string_literal: true
require "cose/algorithm"
require "cose/error"
require "cose/rsapkcs1_algorithm"
require "openssl"
require "webauthn/authenticator_data/attested_credential_data"
require "webauthn/error"
module WebAuthn
module AttestationStatement
class UnsupportedAlgorithm < Error; end
ATTESTATION_TYPE_NONE = "None"
ATTESTATION_TYPE_BASIC = "Basic"
ATTESTATION_TYPE_SELF = "Self"
ATTESTATION_TYPE_ATTCA = "AttCA"
ATTESTATION_TYPE_BASIC_OR_ATTCA = "Basic_or_AttCA"
ATTESTATION_TYPE_ANONCA = "AnonCA"
ATTESTATION_TYPES_WITH_ROOT = [
ATTESTATION_TYPE_BASIC,
ATTESTATION_TYPE_BASIC_OR_ATTCA,
ATTESTATION_TYPE_ATTCA,
ATTESTATION_TYPE_ANONCA
].freeze
class Base
AAGUID_EXTENSION_OID = "1.3.6.1.4.1.45724.1.1.4"
def initialize(statement)
@statement = statement
end
def valid?(_authenticator_data, _client_data_hash)
raise NotImplementedError
end
def format
WebAuthn::AttestationStatement::FORMAT_TO_CLASS.key(self.class)
end
def attestation_certificate
certificates&.first
end
def attestation_certificate_key_id
attestation_certificate.subject_key_identifier&.unpack("H*")&.[](0)
end
private
attr_reader :statement
def matching_aaguid?(attested_credential_data_aaguid)
extension = attestation_certificate&.find_extension(AAGUID_EXTENSION_OID)
if extension
aaguid_value = OpenSSL::ASN1.decode(extension.value_der).value
aaguid_value == attested_credential_data_aaguid
else
true
end
end
def matching_public_key?(authenticator_data)
attestation_certificate.public_key.to_der == authenticator_data.credential.public_key_object.to_der
end
def certificates
@certificates ||=
raw_certificates&.map do |raw_certificate|
OpenSSL::X509::Certificate.new(raw_certificate)
end
end
def algorithm
statement["alg"]
end
def raw_certificates
statement["x5c"]
end
def signature
statement["sig"]
end
def attestation_trust_path
if certificates&.any?
certificates
end
end
def trustworthy?(aaguid: nil, attestation_certificate_key_id: nil)
if ATTESTATION_TYPES_WITH_ROOT.include?(attestation_type)
configuration.acceptable_attestation_types.include?(attestation_type) &&
valid_certificate_chain?(aaguid: aaguid, attestation_certificate_key_id: attestation_certificate_key_id)
else
configuration.acceptable_attestation_types.include?(attestation_type)
end
end
def valid_certificate_chain?(aaguid: nil, attestation_certificate_key_id: nil)
attestation_root_certificates_store(
aaguid: aaguid,
attestation_certificate_key_id: attestation_certificate_key_id
).verify(attestation_certificate, attestation_trust_path)
end
def attestation_root_certificates_store(aaguid: nil, attestation_certificate_key_id: nil)
OpenSSL::X509::Store.new.tap do |store|
root_certificates(
aaguid: aaguid,
attestation_certificate_key_id: attestation_certificate_key_id
).each do |cert|
store.add_cert(cert)
end
end
end
def root_certificates(aaguid: nil, attestation_certificate_key_id: nil)
root_certificates =
configuration.attestation_root_certificates_finders.reduce([]) do |certs, finder|
if certs.empty?
finder.find(
attestation_format: format,
aaguid: aaguid,
attestation_certificate_key_id: attestation_certificate_key_id
) || []
else
certs
end
end
if root_certificates.empty? && respond_to?(:default_root_certificates, true)
default_root_certificates
else
root_certificates
end
end
def valid_signature?(authenticator_data, client_data_hash, public_key = attestation_certificate.public_key)
raise("Incompatible algorithm and key") unless cose_algorithm.compatible_key?(public_key)
cose_algorithm.verify(
public_key,
signature,
verification_data(authenticator_data, client_data_hash)
)
rescue COSE::Error
false
end
def verification_data(authenticator_data, client_data_hash)
authenticator_data.data + client_data_hash
end
def cose_algorithm
@cose_algorithm ||=
COSE::Algorithm.find(algorithm).tap do |alg|
alg && configuration.algorithms.include?(alg.name) ||
raise(UnsupportedAlgorithm, "Unsupported algorithm #{algorithm}")
end
end
def configuration
WebAuthn.configuration
end
end
end
end
|