File: agnostic_token_revocation_service.rb

package info (click to toggle)
gitlab 17.6.5-19
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 629,368 kB
  • sloc: ruby: 1,915,304; javascript: 557,307; sql: 60,639; xml: 6,509; sh: 4,567; makefile: 1,239; python: 406
file content (137 lines) | stat: -rw-r--r-- 4,802 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# frozen_string_literal: true

# This Service takes authentication tokens of multiple types, and will
# call a revocation for it if the token has access to the
# group or any of the group's descendants. If revocation is not
# possible, the token will be rotated or otherwise made unusable.
#
# If the token provided has access to the group and is revoked, it will
# be returned by the service with a :success status.
# If the token type is not supported, if the token doesn't have access
# to the group, or if any error occurs, a generic :failure status is
# returned.
#
# This Service does not create logs or Audit events. Those can be found
# at the API layer or in specific revocation services.
#
# This Service returns a ServiceResponse object.
module Groups # rubocop:disable Gitlab/BoundedContexts -- This service is strictly related to groups
  class AgnosticTokenRevocationService < Groups::BaseService
    AUDIT_SOURCE = :group_token_revocation_service

    def initialize(group, current_user, plaintext)
      @group = group
      @current_user = current_user
      @plaintext = plaintext.to_s
    end

    def execute
      return error("Feature not enabled") unless Feature.enabled?(:group_agnostic_token_revocation, group)
      return error("Group cannot be a subgroup") if group.subgroup?
      return error("Unauthorized") unless can?(current_user, :admin_group, group)

      @token = ::Authn::AgnosticTokenIdentifier.token_for(plaintext, AUDIT_SOURCE)
      @revocable = token.revocable unless token.blank?

      # Perform checks based on token type and group scope:
      case token
      when ::Authn::Tokens::PersonalAccessToken
        handle_personal_access_token
      when ::Authn::Tokens::DeployToken
        handle_deploy_token
      when ::Authn::Tokens::FeedToken
        handle_feed_token
      else
        error('Unsupported token type')
      end
    end

    private

    attr_reader :plaintext, :group, :current_user, :token, :revocable

    def success(revocable, type, api_entity: nil)
      api_entity ||= type
      ServiceResponse.success(
        message: "#{type} is revoked",
        payload: {
          revocable: revocable,
          type: type,
          api_entity: api_entity
        }
      )
    end

    def error(message)
      ServiceResponse.error(message: message)
    end

    def handle_personal_access_token
      return error('PAT not found') unless revocable

      if user_has_group_membership?(revocable.user)
        # Only revoke active tokens. (Ignore expired tokens)
        token.revoke!(current_user) if revocable.active?

        # Always validate that, if we're returning token info, it
        # has been successfully revoked
        return success(revocable, 'PersonalAccessToken') if revocable.reset.revoked?
      end

      # If we get here the token exists but either:
      #  - didn't belong to the group or descendants
      #  - did, but was already expired
      #  - does and is active, but revocation failed for some reason
      error('PAT revocation failed')
    end

    # Validate whether the user has access to a group or any of its
    # descendants. Includes membership that might not be active, but
    # could be later, e.g. bans. Includes membership of non-human
    # users.
    def user_has_group_membership?(user)
      ::GroupMember
        .with_user(user)
        .with_source_id(group.self_and_descendants)
        .any? ||
        ::ProjectMember
        .with_user(user)
        .in_namespaces(group.self_and_descendants)
        .any?
    end

    def handle_deploy_token
      return error('DeployToken not found') unless revocable && revocable.group_type?

      if group.self_and_descendants.include?(revocable.group)
        token.revoke!(current_user) if revocable.active?

        return success(revocable, 'DeployToken') if revocable.reset.revoked?
      end

      error('DeployToken revocation failed')
    end

    def handle_feed_token
      return error('Feed Token not found') unless revocable

      if user_has_group_membership?(revocable)
        current_token = revocable.feed_token

        response = token.revoke!(current_user)

        # Always validate that, if we're returning token info, it
        # has been successfully revoked. Feed tokens can only be rotated
        # so we also check that the old and new value are different.
        if response.success? && !ActiveSupport::SecurityUtils.secure_compare(current_token, revocable.reset.feed_token)
          return success(revocable, 'FeedToken', api_entity: 'UserSafe')
        end
      end

      # If we get here the feed token exists but either:
      #  - the user didn't belong to the group or descendants
      #  - rotation failed for some reason
      error('Feed token revocation failed')
    end
  end
end