File: jwe.rb

package info (click to toggle)
ruby-json-jwt 1.16.7-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 324 kB
  • sloc: ruby: 2,858; makefile: 4
file content (300 lines) | stat: -rw-r--r-- 9,694 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
require 'securerandom'
require 'bindata'
require 'aes_key_wrap'

module JSON
  class JWE
    class InvalidFormat < JWT::InvalidFormat; end
    class DecryptionFailed < JWT::VerificationFailed; end
    class UnexpectedAlgorithm < JWT::UnexpectedAlgorithm; end

    NUM_OF_SEGMENTS = 5

    include JOSE

    attr_accessor(
      :public_key_or_secret, :private_key_or_secret,
      :plain_text, :cipher_text, :iv, :auth_data,
      :content_encryption_key, :encryption_key, :mac_key
    )
    attr_writer :jwe_encrypted_key, :authentication_tag

    register_header_keys :enc, :epk, :zip, :apu, :apv
    alias_method :encryption_method, :enc

    def initialize(input = nil)
      self.plain_text = input.to_s
    end

    def encrypt!(public_key_or_secret)
      self.public_key_or_secret = with_jwk_support public_key_or_secret
      cipher.encrypt
      self.content_encryption_key = generate_content_encryption_key
      self.mac_key, self.encryption_key = derive_encryption_and_mac_keys
      cipher.key = encryption_key
      self.iv = cipher.random_iv # NOTE: 'iv' has to be set after 'key' for GCM
      self.auth_data = Base64.urlsafe_encode64 header.to_json, padding: false
      cipher.auth_data = auth_data if gcm?
      self.cipher_text = cipher.update(plain_text) + cipher.final
      self
    end

    def decrypt!(private_key_or_secret, algorithms = nil, encryption_methods = nil)
      raise UnexpectedAlgorithm.new('Unexpected alg header') unless algorithms.blank? || Array(algorithms).include?(alg)
      raise UnexpectedAlgorithm.new('Unexpected enc header') unless encryption_methods.blank? || Array(encryption_methods).include?(enc)
      self.private_key_or_secret = with_jwk_support private_key_or_secret
      self.content_encryption_key = decrypt_content_encryption_key
      self.mac_key, self.encryption_key = derive_encryption_and_mac_keys

      verify_cbc_authentication_tag! if cbc?

      cipher.decrypt
      cipher.key = encryption_key
      cipher.iv = iv # NOTE: 'iv' has to be set after 'key' for GCM
      if gcm?
        # https://github.com/ruby/openssl/issues/63
        raise DecryptionFailed.new('Invalid authentication tag') if authentication_tag.length < 16
        cipher.auth_tag = authentication_tag
        cipher.auth_data = auth_data
      end

      begin
        self.plain_text = cipher.update(cipher_text) + cipher.final
      rescue OpenSSL::OpenSSLError
        # Ensure that the same error is raised for invalid PKCS7 padding
        # as for invalid signatures. This prevents padding-oracle attacks.
        raise DecryptionFailed
      end

      self
    end

    def to_s
      [
        header.to_json,
        jwe_encrypted_key,
        iv,
        cipher_text,
        authentication_tag
      ].collect do |segment|
        Base64.urlsafe_encode64 segment.to_s, padding: false
      end.join('.')
    end

    def as_json(options = {})
      case options[:syntax]
      when :general
        {
          protected:  Base64.urlsafe_encode64(header.to_json, padding: false),
          recipients: [{
            encrypted_key: Base64.urlsafe_encode64(jwe_encrypted_key, padding: false)
          }],
          iv:         Base64.urlsafe_encode64(iv, padding: false),
          ciphertext: Base64.urlsafe_encode64(cipher_text, padding: false),
          tag:        Base64.urlsafe_encode64(authentication_tag, padding: false)
        }
      else
        {
          protected:     Base64.urlsafe_encode64(header.to_json, padding: false),
          encrypted_key: Base64.urlsafe_encode64(jwe_encrypted_key, padding: false),
          iv:            Base64.urlsafe_encode64(iv, padding: false),
          ciphertext:    Base64.urlsafe_encode64(cipher_text, padding: false),
          tag:           Base64.urlsafe_encode64(authentication_tag, padding: false)
        }
      end
    end

    private

    # common

    def gcm?
      [:A128GCM, :A256GCM].include? encryption_method&.to_sym
    end

    def cbc?
      [:'A128CBC-HS256', :'A256CBC-HS512'].include? encryption_method&.to_sym
    end

    def dir?
      :dir == alg&.to_sym
    end

    def cipher
      raise "#{cipher_name} isn't supported" unless OpenSSL::Cipher.ciphers.include?(cipher_name)
      @cipher ||= OpenSSL::Cipher.new cipher_name
    end

    def cipher_name
      case encryption_method&.to_sym
      when :A128GCM
        'aes-128-gcm'
      when :A256GCM
        'aes-256-gcm'
      when :'A128CBC-HS256'
        'aes-128-cbc'
      when :'A256CBC-HS512'
        'aes-256-cbc'
      else
        raise UnexpectedAlgorithm.new('Unknown Encryption Algorithm')
      end
    end

    def sha_size
      case encryption_method&.to_sym
      when :'A128CBC-HS256'
        256
      when :'A256CBC-HS512'
        512
      else
        raise UnexpectedAlgorithm.new('Unknown Hash Size')
      end
    end

    def sha_digest
      OpenSSL::Digest.new "SHA#{sha_size}"
    end

    def derive_encryption_and_mac_keys
      case
      when gcm?
        [:wont_be_used, content_encryption_key]
      when cbc?
        content_encryption_key.unpack(
          "a#{content_encryption_key.length / 2}" * 2
        )
      end
    end

    # encryption

    def jwe_encrypted_key
      @jwe_encrypted_key ||= case alg&.to_sym
      when :RSA1_5
        public_key_or_secret.public_encrypt content_encryption_key
      when :'RSA-OAEP'
        public_key_or_secret.public_encrypt content_encryption_key, OpenSSL::PKey::RSA::PKCS1_OAEP_PADDING
      when :A128KW, :A256KW
        AESKeyWrap.wrap content_encryption_key, public_key_or_secret
      when :dir
        ''
      when :'ECDH-ES'
        raise NotImplementedError.new('ECDH-ES not supported yet')
      when :'ECDH-ES+A128KW'
        raise NotImplementedError.new('ECDH-ES+A128KW not supported yet')
      when :'ECDH-ES+A256KW'
        raise NotImplementedError.new('ECDH-ES+A256KW not supported yet')
      else
        raise UnexpectedAlgorithm.new('Unknown Encryption Algorithm')
      end
    end

    def generate_content_encryption_key
      case
      when dir?
        public_key_or_secret
      when gcm?
        cipher.random_key
      when cbc?
        SecureRandom.random_bytes sha_size / 8
      end
    end

    def authentication_tag
      @authentication_tag ||= case
      when gcm?
        cipher.auth_tag
      when cbc?
        secured_input = [
          auth_data,
          iv,
          cipher_text,
          BinData::Uint64be.new(auth_data.length * 8).to_binary_s
        ].join
        OpenSSL::HMAC.digest(
          sha_digest, mac_key, secured_input
        )[0, sha_size / 2 / 8]
      end
    end

    # decryption

    def decrypt_content_encryption_key
      fake_content_encryption_key = generate_content_encryption_key # NOTE: do this always not to make timing difference
      case alg&.to_sym
      when :RSA1_5
        private_key_or_secret.private_decrypt jwe_encrypted_key
      when :'RSA-OAEP'
        private_key_or_secret.private_decrypt jwe_encrypted_key, OpenSSL::PKey::RSA::PKCS1_OAEP_PADDING
      when :A128KW, :A256KW
        AESKeyWrap.unwrap jwe_encrypted_key, private_key_or_secret
      when :dir
        private_key_or_secret
      when :'ECDH-ES'
        raise NotImplementedError.new('ECDH-ES not supported yet')
      when :'ECDH-ES+A128KW'
        raise NotImplementedError.new('ECDH-ES+A128KW not supported yet')
      when :'ECDH-ES+A256KW'
        raise NotImplementedError.new('ECDH-ES+A256KW not supported yet')
      else
        raise UnexpectedAlgorithm.new('Unknown Encryption Algorithm')
      end
    rescue OpenSSL::PKey::PKeyError
      fake_content_encryption_key
    end

    def verify_cbc_authentication_tag!
      secured_input = [
        auth_data,
        iv,
        cipher_text,
        BinData::Uint64be.new(auth_data.length * 8).to_binary_s
      ].join
      expected_authentication_tag = OpenSSL::HMAC.digest(
        sha_digest, mac_key, secured_input
      )[0, sha_size / 2 / 8]
      unless secure_compare(authentication_tag, expected_authentication_tag)
        raise DecryptionFailed
      end
    end

    class << self
      def decode_compact_serialized(input, private_key_or_secret, algorithms = nil, encryption_methods = nil, _allow_blank_payload = false)
        unless input.count('.') + 1 == NUM_OF_SEGMENTS
          raise InvalidFormat.new("Invalid JWE Format. JWE should include #{NUM_OF_SEGMENTS} segments.")
        end
        jwe = new
        _header_json_, jwe.jwe_encrypted_key, jwe.iv, jwe.cipher_text, jwe.authentication_tag = input.split('.', NUM_OF_SEGMENTS).collect do |segment|
          begin
            Base64.urlsafe_decode64 segment
          rescue ArgumentError
            raise DecryptionFailed
          end
        end
        jwe.auth_data = input.split('.').first
        jwe.header = JSON.parse(_header_json_).with_indifferent_access
        unless private_key_or_secret == :skip_decryption
          jwe.decrypt! private_key_or_secret, algorithms, encryption_methods
        end
        jwe
      end

      def decode_json_serialized(input, private_key_or_secret, algorithms = nil, encryption_methods = nil, _allow_blank_payload = false)
        input = input.with_indifferent_access
        jwe_encrypted_key = if input[:recipients].present?
          input[:recipients].first[:encrypted_key]
        else
          input[:encrypted_key]
        end
        compact_serialized = [
          input[:protected],
          jwe_encrypted_key,
          input[:iv],
          input[:ciphertext],
          input[:tag]
        ].join('.')
        decode_compact_serialized compact_serialized, private_key_or_secret, algorithms, encryption_methods
      end
    end
  end
end