File: digest.rb

package info (click to toggle)
ruby-httpx 1.7.2-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,816 kB
  • sloc: ruby: 12,209; makefile: 4
file content (142 lines) | stat: -rw-r--r-- 3,982 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# frozen_string_literal: true

require "time"
require "securerandom"
require "digest"

module HTTPX
  module Plugins
    module Authentication
      class Digest
        Error = Class.new(Error)

        def initialize(user, password, hashed: false, **)
          @user = user
          @password = password
          @nonce = 0
          @hashed = hashed
        end

        def can_authenticate?(authenticate)
          authenticate && /Digest .*/.match?(authenticate)
        end

        def authenticate(request, authenticate)
          "Digest #{generate_header(request.verb, request.path, authenticate)}"
        end

        private

        def generate_header(meth, uri, authenticate)
          # discard first token, it's Digest
          auth_info = authenticate[/^(\w+) (.*)/, 2]

          raise_format_error unless auth_info

          s = StringScanner.new(auth_info)

          params = {}
          until s.eos?
            k = s.scan_until(/=/)
            raise_format_error unless k&.end_with?("=")

            if s.peek(1) == "\""
              s.skip("\"")
              v = s.scan_until(/"/)
              raise_format_error unless v&.end_with?("\"")

              v = v[0..-2]
              s.skip_until(/,/)
            else
              v = s.scan_until(/,|$/)

              if v&.end_with?(",")
                v = v[0..-2]
              else
                raise_format_error unless s.eos?
              end

              v = v[0..-2] if v&.end_with?(",")
            end
            params[k[0..-2]] = v
            s.skip(/\s/)
          end

          nonce = params["nonce"]
          nc = next_nonce

          # verify qop
          qop = params["qop"]

          if qop
            # some servers send multiple values wrapped in parentheses (i.e. "(qauth,)")
            qop = qop[/\(?([^)]+)\)?/, 1]
            qop = qop.split(",").map { |s| s.delete_prefix("'").delete_suffix("'") }.delete_if(&:empty?).map.first
          end

          if params["algorithm"] =~ /(.*?)(-sess)?$/
            alg = Regexp.last_match(1)
            algorithm = ::Digest.const_get(alg)
            raise Error, "unknown algorithm \"#{alg}\"" unless algorithm

            sess = Regexp.last_match(2)
          else
            algorithm = ::Digest::MD5
          end

          if qop || sess
            cnonce = make_cnonce
            nc = format("%<nonce>08x", nonce: nc)
          end

          a1 = if sess
            [
              (@hashed ? @password : algorithm.hexdigest("#{@user}:#{params["realm"]}:#{@password}")),
              nonce,
              cnonce,
            ].join ":"
          else
            @hashed ? @password : "#{@user}:#{params["realm"]}:#{@password}"
          end

          ha1 = algorithm.hexdigest(a1)
          ha2 = algorithm.hexdigest("#{meth}:#{uri}")
          request_digest = [ha1, nonce]
          request_digest.push(nc, cnonce, qop) if qop
          request_digest << ha2
          request_digest = request_digest.join(":")

          header = [
            %(username="#{@user}"),
            %(nonce="#{nonce}"),
            %(uri="#{uri}"),
            %(response="#{algorithm.hexdigest(request_digest)}"),
          ]
          header << %(realm="#{params["realm"]}") if params.key?("realm")
          header << %(algorithm=#{params["algorithm"]}) if params.key?("algorithm")
          header << %(cnonce="#{cnonce}") if cnonce
          header << %(nc=#{nc})
          header << %(qop=#{qop}) if qop
          header << %(opaque="#{params["opaque"]}") if params.key?("opaque")
          header.join ", "
        end

        def make_cnonce
          ::Digest::MD5.hexdigest [
            Time.now.to_i,
            Process.pid,
            SecureRandom.random_number(2**32),
          ].join ":"
        end

        def next_nonce
          @nonce += 1
        end

        def raise_format_error
          raise Error, "unsupported digest header format"
        end
      end
    end
  end
end