1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
|
package endpointdiscovery
import (
"sync"
"sync/atomic"
)
// EndpointCache is an LRU cache that holds a series of endpoints
// based on some key. The data structure makes use of a read write
// mutex to enable asynchronous use.
type EndpointCache struct {
endpoints sync.Map
endpointLimit int64
// size is used to count the number elements in the cache.
// The atomic package is used to ensure this size is accurate when
// using multiple goroutines.
size int64
}
// NewEndpointCache will return a newly initialized cache with a limit
// of endpointLimit entries.
func NewEndpointCache(endpointLimit int64) *EndpointCache {
return &EndpointCache{
endpointLimit: endpointLimit,
endpoints: sync.Map{},
}
}
// Get is a concurrent safe get operation that will retrieve an endpoint
// based on endpointKey. A boolean will also be returned to illustrate whether
// or not the endpoint had been found.
func (c *EndpointCache) get(endpointKey string) (Endpoint, bool) {
endpoint, ok := c.endpoints.Load(endpointKey)
if !ok {
return Endpoint{}, false
}
ev := endpoint.(Endpoint)
ev.Prune()
c.endpoints.Store(endpointKey, ev)
return endpoint.(Endpoint), true
}
// Has returns if the enpoint cache contains a valid entry for the endpoint key
// provided.
func (c *EndpointCache) Has(endpointKey string) bool {
_, found := c.Get(endpointKey)
return found
}
// Get will retrieve a weighted address based off of the endpoint key. If an endpoint
// should be retrieved, due to not existing or the current endpoint has expired
// the Discoverer object that was passed in will attempt to discover a new endpoint
// and add that to the cache.
func (c *EndpointCache) Get(endpointKey string) (WeightedAddress, bool) {
endpoint, ok := c.get(endpointKey)
if !ok {
return WeightedAddress{}, false
}
return endpoint.GetValidAddress()
}
// Add is a concurrent safe operation that will allow new endpoints to be added
// to the cache. If the cache is full, the number of endpoints equal endpointLimit,
// then this will remove the oldest entry before adding the new endpoint.
func (c *EndpointCache) Add(endpoint Endpoint) {
// de-dups multiple adds of an endpoint with a pre-existing key
if iface, ok := c.endpoints.Load(endpoint.Key); ok {
e := iface.(Endpoint)
if e.Len() > 0 {
return
}
}
size := atomic.AddInt64(&c.size, 1)
if size > 0 && size > c.endpointLimit {
c.deleteRandomKey()
}
c.endpoints.Store(endpoint.Key, endpoint)
}
// deleteRandomKey will delete a random key from the cache. If
// no key was deleted false will be returned.
func (c *EndpointCache) deleteRandomKey() bool {
atomic.AddInt64(&c.size, -1)
found := false
c.endpoints.Range(func(key, value interface{}) bool {
found = true
c.endpoints.Delete(key)
return false
})
return found
}
|