1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
|
# frozen_string_literal: true
module Aws
# @api private
# a LRU cache caching endpoints data
class EndpointCache
# default cache entries limit
MAX_ENTRIES = 1000
# default max threads pool size
MAX_THREADS = 10
def initialize(options = {})
@max_entries = options[:max_entries] || MAX_ENTRIES
@entries = {} # store endpoints
@max_threads = options[:max_threads] || MAX_THREADS
@pool = {} # store polling threads
@mutex = Mutex.new
@require_identifier = nil # whether endpoint operation support identifier
end
# @return [Integer] Max size limit of cache
attr_reader :max_entries
# @return [Integer] Max count of polling threads
attr_reader :max_threads
# return [Hash] Polling threads pool
attr_reader :pool
# @param [String] key
# @return [Endpoint]
def [](key)
@mutex.synchronize do
# fetching an existing endpoint delete it and then append it
endpoint = @entries[key]
if endpoint
@entries.delete(key)
@entries[key] = endpoint
end
endpoint
end
end
# @param [String] key
# @param [Hash] value
def []=(key, value)
@mutex.synchronize do
# delete the least recent used endpoint when cache is full
unless @entries.size < @max_entries
old_key, = @entries.shift
delete_polling_thread(old_key)
end
# delete old value if exists
@entries.delete(key)
@entries[key] = Endpoint.new(value.to_hash)
end
end
# checking whether an unexpired endpoint key exists in cache
# @param [String] key
# @return [Boolean]
def key?(key)
@mutex.synchronize do
if @entries.key?(key) && (@entries[key].nil? || @entries[key].expired?)
@entries.delete(key)
end
@entries.key?(key)
end
end
# checking whether an polling thread exist for the key
# @param [String] key
# @return [Boolean]
def threads_key?(key)
@pool.key?(key)
end
# remove entry only
# @param [String] key
def delete(key)
@mutex.synchronize do
@entries.delete(key)
end
end
# kill the old polling thread and remove it from pool
# @param [String] key
def delete_polling_thread(key)
Thread.kill(@pool[key]) if threads_key?(key)
@pool.delete(key)
end
# update cache with requests (using service endpoint operation)
# to fetch endpoint list (with identifiers when available)
# @param [String] key
# @param [RequestContext] ctx
def update(key, ctx)
resp = _request_endpoint(ctx)
if resp && resp.endpoints
resp.endpoints.each { |e| self[key] = e }
end
end
# extract the key to be used in the cache from request context
# @param [RequestContext] ctx
# @return [String]
def extract_key(ctx)
parts = []
# fetching from cred provider directly gives warnings
parts << ctx.config.credentials.credentials.access_key_id
if _endpoint_operation_identifier(ctx)
parts << ctx.operation_name
ctx.operation.input.shape.members.inject(parts) do |p, (name, ref)|
p << ctx.params[name] if ref['endpointdiscoveryid']
p
end
end
parts.join('_')
end
# update polling threads pool
# param [String] key
# param [Thread] thread
def update_polling_pool(key, thread)
unless @pool.size < @max_threads
_, thread = @pool.shift
Thread.kill(thread)
end
@pool[key] = thread
end
# kill all polling threads
def stop_polling!
@pool.each { |_, t| Thread.kill(t) }
@pool = {}
end
private
def _request_endpoint(ctx)
params = {}
if _endpoint_operation_identifier(ctx)
# build identifier params when available
params[:operation] = ctx.operation.name
ctx.operation.input.shape.members.inject(params) do |p, (name, ref)|
if ref['endpointdiscoveryid']
p[:identifiers] ||= {}
p[:identifiers][ref.location_name] = ctx.params[name]
end
p
end
end
begin
endpoint_operation_name = ctx.config.api.endpoint_operation
ctx.client.send(endpoint_operation_name, params)
rescue Aws::Errors::ServiceError
nil
end
end
def _endpoint_operation_identifier(ctx)
return @require_identifier unless @require_identifier.nil?
operation_name = ctx.config.api.endpoint_operation
operation = ctx.config.api.operation(operation_name)
@require_identifier = operation.input.shape.members.any?
end
class Endpoint
# default endpoint cache time, 1 minute
CACHE_PERIOD = 1
def initialize(options)
@address = options.fetch(:address)
@cache_period = options[:cache_period_in_minutes] || CACHE_PERIOD
@created_time = Time.now
end
# [String] valid URI address (with path)
attr_reader :address
def expired?
Time.now - @created_time > @cache_period * 60
end
end
end
end
|