File: auth_token_service.rb

package info (click to toggle)
gitlab 17.6.5-19
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 629,368 kB
  • sloc: ruby: 1,915,304; javascript: 557,307; sql: 60,639; xml: 6,509; sh: 4,567; makefile: 1,239; python: 406
file content (60 lines) | stat: -rw-r--r-- 2,017 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# frozen_string_literal: true

module DependencyProxy
  class AuthTokenService < DependencyProxy::BaseService
    attr_reader :token

    def initialize(token)
      @token = token
    end

    def execute
      JSONWebToken::HMACToken.decode(token, ::Auth::DependencyProxyAuthenticationService.secret).first
    end

    # TODO: Rename to make it obvious how it's used in Gitlab::Auth::RequestAuthenticator
    # which is to return an <object>.<id> that is used as a rack-attack discriminator
    # that way it cannot be confused with `.user_or_token_from_jwt`
    # https://gitlab.com/gitlab-org/gitlab/-/issues/454518
    def self.user_or_deploy_token_from_jwt(raw_jwt)
      token_payload = self.new(raw_jwt).execute

      if token_payload['user_id']
        User.find(token_payload['user_id'])
      elsif token_payload['deploy_token']
        DeployToken.active.find_by_token(token_payload['deploy_token'])
      end
    rescue JWT::DecodeError, JWT::ExpiredSignature, JWT::ImmatureSignature
      nil
    end

    def self.user_or_token_from_jwt(raw_jwt)
      token_payload = self.new(raw_jwt).execute

      if token_payload['personal_access_token']
        get_personal_access_token(token_payload['personal_access_token'])
      elsif token_payload['group_access_token']
        # a group access token is a personal access token in disguise
        get_personal_access_token(token_payload['group_access_token'])
      elsif token_payload['user_id']
        get_user(token_payload['user_id'])
      elsif token_payload['deploy_token']
        get_deploy_token(token_payload['deploy_token'])
      end
    rescue JWT::DecodeError, JWT::ExpiredSignature, JWT::ImmatureSignature
      nil
    end

    def self.get_user(user_id)
      User.find(user_id)
    end

    def self.get_personal_access_token(raw_token)
      PersonalAccessTokensFinder.new(state: 'active').find_by_token(raw_token)
    end

    def self.get_deploy_token(raw_token)
      DeployToken.active.find_by_token(raw_token)
    end
  end
end