1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
|
// Package cache
//
// Implement client side caching for client.Client. This is useful for burst-happy applications that will try to query
// a lot of the same information in small chunks.
//
// For example, an application could try to check the state of nodes, but do so using one request per node. This is
// obviously not ideal in larger cluster, where it would be more efficient to request the state of all nodes at once.
// Depending on the application, this may not be possible, however.
//
// This package contains ready-to-use client side caches with configurable duration and automatic invalidation under the
// assumption that modifications are made from the same client.
package cache
import (
"fmt"
"strings"
"sync"
"time"
log "github.com/sirupsen/logrus"
"github.com/LINBIT/golinstor/client"
)
var (
yes = true
cacheOpt = &client.ListOpts{Cached: &yes}
)
type Cache interface {
apply(c *client.Client)
}
// WithCaches sets up the given caches on the client.Client.
func WithCaches(caches ...Cache) client.Option {
return func(cl *client.Client) error {
for _, ca := range caches {
ca.apply(cl)
}
return nil
}
}
type cache struct {
mu sync.Mutex
lastUpdate time.Time
cache any
}
// Invalidate forcefully resets the cache.
// The next call to Get will always invoke the provided function.
func (c *cache) Invalidate() {
c.mu.Lock()
c.lastUpdate = time.Time{}
c.cache = nil
c.mu.Unlock()
}
// Get returns a cached response or the result of the provided update function.
//
// If the cache is current, it will return the last successful cached response.
// If the cache is outdated, it will run the provided function to retrieve a result. A successful response
// is cached for later use.
func (c *cache) Get(timeout time.Duration, updateFunc func() (any, error)) (any, error) {
c.mu.Lock()
defer c.mu.Unlock()
now := time.Now()
if timeout != 0 && c.lastUpdate.Add(timeout).Before(now) {
result, err := updateFunc()
if err != nil {
return nil, err
}
c.cache = result
c.lastUpdate = now
}
return c.cache, nil
}
// filterNodeAndPoolOpts filters generic items based on the provided client.ListOpts
// This tries to mimic the behaviour of LINSTOR when using the node and storage pool query parameters.
func filterNodeAndPoolOpts[T Filterable](items []T, opts ...*client.ListOpts) []T {
filterNodes := make(map[string]struct{})
filterPools := make(map[string]struct{})
var filterProps []string
if hasUnsupportedOpts(opts...) {
log.WithField("opts", opts).Warn("unsupported filter opts in cache")
}
for _, o := range opts {
for _, n := range o.Node {
filterNodes[n] = struct{}{}
}
for _, sp := range o.StoragePool {
filterPools[sp] = struct{}{}
}
filterProps = append(filterProps, o.Prop...)
}
var result []T
outer:
for i := range items {
if len(filterNodes) > 0 {
if !anyMatches(filterNodes, nodes(&items[i])) {
continue
}
}
if len(filterPools) > 0 {
if !anyMatches(filterPools, pools(&items[i])) {
continue
}
}
itemProps := props(&items[i])
for _, filterProp := range filterProps {
key, val, found := strings.Cut(filterProp, "=")
itemVal, ok := itemProps[key]
if !ok {
continue outer
}
if found && val != itemVal {
continue outer
}
}
result = append(result, items[i])
}
return result
}
func hasUnsupportedOpts(opts ...*client.ListOpts) bool {
for _, opt := range opts {
if len(opt.Snapshots) > 0 || len(opt.Resource) > 0 || opt.Limit != 0 || opt.Offset != 0 {
return true
}
}
return false
}
type Filterable interface {
client.Node | client.StoragePool | client.ResourceWithVolumes | client.Snapshot | client.PhysicalStorageViewItem
}
func anyMatches(haystack map[string]struct{}, items []string) bool {
for _, item := range items {
if _, ok := haystack[item]; ok {
return true
}
}
return false
}
func nodes(item any) []string {
switch item.(type) {
case *client.Node:
return []string{item.(*client.Node).Name}
case *client.StoragePool:
return []string{item.(*client.StoragePool).NodeName}
case *client.ResourceWithVolumes:
return []string{item.(*client.ResourceWithVolumes).NodeName}
case *client.Snapshot:
return item.(*client.Snapshot).Nodes
case *client.PhysicalStorageViewItem:
var result []string
for k := range item.(*client.PhysicalStorageViewItem).Nodes {
result = append(result, k)
}
return result
default:
panic(fmt.Sprintf("unsupported item type: %T", item))
}
}
func pools(item any) []string {
switch item.(type) {
case *client.Node:
return nil
case *client.StoragePool:
return []string{item.(*client.StoragePool).StoragePoolName}
case *client.ResourceWithVolumes:
var result []string
for _, vol := range item.(*client.ResourceWithVolumes).Volumes {
result = append(result, vol.StoragePoolName)
}
return result
case *client.Snapshot:
return nil
case *client.PhysicalStorageViewItem:
return nil
default:
panic(fmt.Sprintf("unsupported item type: %T", item))
}
}
func props(item any) map[string]string {
switch item.(type) {
case *client.Node:
return item.(*client.Node).Props
case *client.StoragePool:
return item.(*client.StoragePool).Props
case *client.ResourceWithVolumes:
return item.(*client.ResourceWithVolumes).Props
case *client.Snapshot:
return item.(*client.Snapshot).Props
case *client.PhysicalStorageViewItem:
return nil
default:
panic(fmt.Sprintf("unsupported item type: %T", item))
}
}
|