File: endpoint_discovery.rb

package info (click to toggle)
ruby-aws-sdk-core 3.104.3-3%2Bdeb11u2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 1,444 kB
  • sloc: ruby: 11,201; makefile: 4
file content (168 lines) | stat: -rw-r--r-- 5,544 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# frozen_string_literal: true

module Aws
  module Plugins
    # @api private
    class EndpointDiscovery < Seahorse::Client::Plugin

      option(:endpoint_discovery,
        doc_default: Proc.new { |options| options[:require_endpoint_discovery] },
        doc_type: 'Boolean',
        docstring: <<-DOCS) do |cfg|
When set to `true`, endpoint discovery will be enabled for operations when available.
        DOCS
        resolve_endpoint_discovery(cfg)
      end

      option(:endpoint_cache_max_entries,
        default: 1000,
        doc_type: Integer,
        docstring: <<-DOCS
Used for the maximum size limit of the LRU cache storing endpoints data
for endpoint discovery enabled operations. Defaults to 1000.
        DOCS
      )

      option(:endpoint_cache_max_threads,
        default: 10,
        doc_type: Integer,
        docstring: <<-DOCS
Used for the maximum threads in use for polling endpoints to be cached, defaults to 10.
        DOCS
      )

      option(:endpoint_cache_poll_interval,
        default: 60,
        doc_type: Integer,
        docstring: <<-DOCS
When :endpoint_discovery and :active_endpoint_cache is enabled,
Use this option to config the time interval in seconds for making
requests fetching endpoints information. Defaults to 60 sec.
        DOCS
      )

      option(:endpoint_cache) do |cfg|
        Aws::EndpointCache.new(
          max_entries: cfg.endpoint_cache_max_entries,
          max_threads: cfg.endpoint_cache_max_threads
        )
      end

      option(:active_endpoint_cache,
        default: false,
        doc_type: 'Boolean',
        docstring: <<-DOCS
When set to `true`, a thread polling for endpoints will be running in
the background every 60 secs (default). Defaults to `false`.
        DOCS
      )

      def add_handlers(handlers, config)
        handlers.add(Handler, priority: 90) if config.regional_endpoint
      end

      class Handler < Seahorse::Client::Handler

        def call(context)
          if context.operation.endpoint_operation
            context.http_request.headers['x-amz-api-version'] = context.config.api.version
            _apply_endpoint_discovery_user_agent(context)
          elsif discovery_cfg = context.operation.endpoint_discovery
            endpoint = _discover_endpoint(
              context,
              Aws::Util.str_2_bool(discovery_cfg["required"])
            )
            context.http_request.endpoint = _valid_uri(endpoint.address) if endpoint
            if endpoint || context.config.endpoint_discovery
              _apply_endpoint_discovery_user_agent(context)
            end
          end
          @handler.call(context)
        end

        private

        def _valid_uri(address)
          # returned address can be missing scheme
          if address.start_with?('http')
            URI.parse(address)
          else
            URI.parse("https://" + address)
          end
        end

        def _apply_endpoint_discovery_user_agent(ctx)
          if ctx.config.user_agent_suffix.nil?
            ctx.config.user_agent_suffix = "endpoint-discovery"
          elsif !ctx.config.user_agent_suffix.include? "endpoint-discovery"
            ctx.config.user_agent_suffix += "endpoint-discovery"
          end
        end

        def _discover_endpoint(ctx, required)
          cache = ctx.config.endpoint_cache 
          key = cache.extract_key(ctx)

          if required
            unless ctx.config.endpoint_discovery
              raise ArgumentError, "Operation #{ctx.operation.name} requires "\
                'endpoint_discovery to be enabled.'
            end
            # required for the operation
            unless cache.key?(key)
              cache.update(key, ctx)
            end
            endpoint = cache[key]
            # hard fail if endpoint is not discovered
            raise Aws::Errors::EndpointDiscoveryError.new unless endpoint
            endpoint
          elsif ctx.config.endpoint_discovery
            # not required for the operation
            # but enabled
            if cache.key?(key)
              cache[key]
            elsif ctx.config.active_endpoint_cache
              # enabled active cache pull
              interval = ctx.config.endpoint_cache_poll_interval
              if key.include?('_')
                # identifier related, kill the previous polling thread by key
                # because endpoint req params might be changed
                cache.delete_polling_thread(key)
              end

              # start a thread for polling endpoints when non-exist
              unless cache.threads_key?(key)
                thread = Thread.new do
                  while !cache.key?(key) do
                    cache.update(key, ctx)
                    sleep(interval)
                  end
                end
                cache.update_polling_pool(key, thread)
              end

              cache[key]
            else
              # disabled active cache pull
              # attempt, buit fail soft
              cache.update(key, ctx)
              cache[key]
            end
          end
        end

      end

      private

      def self.resolve_endpoint_discovery(cfg)
        env = ENV['AWS_ENABLE_ENDPOINT_DISCOVERY']
        default = cfg.api.require_endpoint_discovery
        shared_cfg = Aws.shared_config.endpoint_discovery_enabled(profile: cfg.profile)
        resolved = Aws::Util.str_2_bool(env) || Aws::Util.str_2_bool(shared_cfg)
        env.nil? && shared_cfg.nil? ? default : !!resolved
      end

    end
  end
end