1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
|
# frozen_string_literal: true
require "time"
require "securerandom"
require "digest"
module HTTPX
module Plugins
module Authentication
class Digest
Error = Class.new(Error)
def initialize(user, password, hashed: false, **)
@user = user
@password = password
@nonce = 0
@hashed = hashed
end
def can_authenticate?(authenticate)
authenticate && /Digest .*/.match?(authenticate)
end
def authenticate(request, authenticate)
"Digest #{generate_header(request.verb, request.path, authenticate)}"
end
private
def generate_header(meth, uri, authenticate)
# discard first token, it's Digest
auth_info = authenticate[/^(\w+) (.*)/, 2]
raise_format_error unless auth_info
s = StringScanner.new(auth_info)
params = {}
until s.eos?
k = s.scan_until(/=/)
raise_format_error unless k&.end_with?("=")
if s.peek(1) == "\""
s.skip("\"")
v = s.scan_until(/"/)
raise_format_error unless v&.end_with?("\"")
v = v[0..-2]
s.skip_until(/,/)
else
v = s.scan_until(/,|$/)
if v&.end_with?(",")
v = v[0..-2]
else
raise_format_error unless s.eos?
end
v = v[0..-2] if v&.end_with?(",")
end
params[k[0..-2]] = v
s.skip(/\s/)
end
nonce = params["nonce"]
nc = next_nonce
# verify qop
qop = params["qop"]
if qop
# some servers send multiple values wrapped in parentheses (i.e. "(qauth,)")
qop = qop[/\(?([^)]+)\)?/, 1]
qop = qop.split(",").map { |s| s.delete_prefix("'").delete_suffix("'") }.delete_if(&:empty?).map.first
end
if params["algorithm"] =~ /(.*?)(-sess)?$/
alg = Regexp.last_match(1)
algorithm = ::Digest.const_get(alg)
raise Error, "unknown algorithm \"#{alg}\"" unless algorithm
sess = Regexp.last_match(2)
else
algorithm = ::Digest::MD5
end
if qop || sess
cnonce = make_cnonce
nc = format("%<nonce>08x", nonce: nc)
end
a1 = if sess
[
(@hashed ? @password : algorithm.hexdigest("#{@user}:#{params["realm"]}:#{@password}")),
nonce,
cnonce,
].join ":"
else
@hashed ? @password : "#{@user}:#{params["realm"]}:#{@password}"
end
ha1 = algorithm.hexdigest(a1)
ha2 = algorithm.hexdigest("#{meth}:#{uri}")
request_digest = [ha1, nonce]
request_digest.push(nc, cnonce, qop) if qop
request_digest << ha2
request_digest = request_digest.join(":")
header = [
%(username="#{@user}"),
%(nonce="#{nonce}"),
%(uri="#{uri}"),
%(response="#{algorithm.hexdigest(request_digest)}"),
]
header << %(realm="#{params["realm"]}") if params.key?("realm")
header << %(algorithm=#{params["algorithm"]}) if params.key?("algorithm")
header << %(cnonce="#{cnonce}") if cnonce
header << %(nc=#{nc})
header << %(qop=#{qop}) if qop
header << %(opaque="#{params["opaque"]}") if params.key?("opaque")
header.join ", "
end
def make_cnonce
::Digest::MD5.hexdigest [
Time.now.to_i,
Process.pid,
SecureRandom.random_number(2**32),
].join ":"
end
def next_nonce
@nonce += 1
end
def raise_format_error
raise Error, "unsupported digest header format"
end
end
end
end
end
|