1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
|
# frozen_string_literal: true
module Aws
module Plugins
# @api private
class EndpointDiscovery < Seahorse::Client::Plugin
option(:endpoint_discovery,
doc_default: Proc.new { |options| options[:require_endpoint_discovery] },
doc_type: 'Boolean',
docstring: <<-DOCS) do |cfg|
When set to `true`, endpoint discovery will be enabled for operations when available.
DOCS
resolve_endpoint_discovery(cfg)
end
option(:endpoint_cache_max_entries,
default: 1000,
doc_type: Integer,
docstring: <<-DOCS
Used for the maximum size limit of the LRU cache storing endpoints data
for endpoint discovery enabled operations. Defaults to 1000.
DOCS
)
option(:endpoint_cache_max_threads,
default: 10,
doc_type: Integer,
docstring: <<-DOCS
Used for the maximum threads in use for polling endpoints to be cached, defaults to 10.
DOCS
)
option(:endpoint_cache_poll_interval,
default: 60,
doc_type: Integer,
docstring: <<-DOCS
When :endpoint_discovery and :active_endpoint_cache is enabled,
Use this option to config the time interval in seconds for making
requests fetching endpoints information. Defaults to 60 sec.
DOCS
)
option(:endpoint_cache) do |cfg|
Aws::EndpointCache.new(
max_entries: cfg.endpoint_cache_max_entries,
max_threads: cfg.endpoint_cache_max_threads
)
end
option(:active_endpoint_cache,
default: false,
doc_type: 'Boolean',
docstring: <<-DOCS
When set to `true`, a thread polling for endpoints will be running in
the background every 60 secs (default). Defaults to `false`.
DOCS
)
def add_handlers(handlers, config)
handlers.add(Handler, priority: 90) if config.regional_endpoint
end
class Handler < Seahorse::Client::Handler
def call(context)
if context.operation.endpoint_operation
context.http_request.headers['x-amz-api-version'] = context.config.api.version
_apply_endpoint_discovery_user_agent(context)
elsif discovery_cfg = context.operation.endpoint_discovery
endpoint = _discover_endpoint(
context,
Aws::Util.str_2_bool(discovery_cfg["required"])
)
context.http_request.endpoint = _valid_uri(endpoint.address) if endpoint
if endpoint || context.config.endpoint_discovery
_apply_endpoint_discovery_user_agent(context)
end
end
@handler.call(context)
end
private
def _valid_uri(address)
# returned address can be missing scheme
if address.start_with?('http')
URI.parse(address)
else
URI.parse("https://" + address)
end
end
def _apply_endpoint_discovery_user_agent(ctx)
if ctx.config.user_agent_suffix.nil?
ctx.config.user_agent_suffix = "endpoint-discovery"
elsif !ctx.config.user_agent_suffix.include? "endpoint-discovery"
ctx.config.user_agent_suffix += "endpoint-discovery"
end
end
def _discover_endpoint(ctx, required)
cache = ctx.config.endpoint_cache
key = cache.extract_key(ctx)
if required
unless ctx.config.endpoint_discovery
raise ArgumentError, "Operation #{ctx.operation.name} requires "\
'endpoint_discovery to be enabled.'
end
# required for the operation
unless cache.key?(key)
cache.update(key, ctx)
end
endpoint = cache[key]
# hard fail if endpoint is not discovered
raise Aws::Errors::EndpointDiscoveryError.new unless endpoint
endpoint
elsif ctx.config.endpoint_discovery
# not required for the operation
# but enabled
if cache.key?(key)
cache[key]
elsif ctx.config.active_endpoint_cache
# enabled active cache pull
interval = ctx.config.endpoint_cache_poll_interval
if key.include?('_')
# identifier related, kill the previous polling thread by key
# because endpoint req params might be changed
cache.delete_polling_thread(key)
end
# start a thread for polling endpoints when non-exist
unless cache.threads_key?(key)
thread = Thread.new do
while !cache.key?(key) do
cache.update(key, ctx)
sleep(interval)
end
end
cache.update_polling_pool(key, thread)
end
cache[key]
else
# disabled active cache pull
# attempt, buit fail soft
cache.update(key, ctx)
cache[key]
end
end
end
end
private
def self.resolve_endpoint_discovery(cfg)
env = ENV['AWS_ENABLE_ENDPOINT_DISCOVERY']
default = cfg.api.require_endpoint_discovery
shared_cfg = Aws.shared_config.endpoint_discovery_enabled(profile: cfg.profile)
resolved = Aws::Util.str_2_bool(env) || Aws::Util.str_2_bool(shared_cfg)
env.nil? && shared_cfg.nil? ? default : !!resolved
end
end
end
end
|