1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
|
package httprc
import (
"context"
"fmt"
"time"
)
type Controller interface {
// Add adds a new `http.Resource` to the controller. If the resource already exists,
// it will return an error.
Add(context.Context, Resource, ...AddOption) error
// Lookup a `httprc.Resource` by its URL. If the resource does not exist, it
// will return an error.
Lookup(context.Context, string) (Resource, error)
// Remove a `httprc.Resource` from the controller by its URL. If the resource does
// not exist, it will return an error.
Remove(context.Context, string) error
// Refresh forces a resource to be refreshed immediately. If the resource does
// not exist, or if the refresh fails, it will return an error.
Refresh(context.Context, string) error
ShutdownContext(context.Context) error
Shutdown(time.Duration) error
}
type controller struct {
cancel context.CancelFunc
incoming chan any // incoming requests to the controller
shutdown chan struct{}
traceSink TraceSink
wl Whitelist
}
// Shutdown is a convenience function that calls ShutdownContext with a
// context that has a timeout of `timeout`.
func (c *controller) Shutdown(timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return c.ShutdownContext(ctx)
}
// ShutdownContext stops the client and all associated goroutines, and waits for them
// to finish. If the context is canceled, the function will return immediately:
// there fore you should not use the context you used to start the client (because
// presumably it's already canceled).
//
// Waiting for the client shutdown will also ensure that all sinks are properly
// flushed.
func (c *controller) ShutdownContext(ctx context.Context) error {
c.cancel()
select {
case <-ctx.Done():
return ctx.Err()
case <-c.shutdown:
return nil
}
}
type ctrlRequest[T any] struct {
reply chan T
resource Resource
u string
}
type addRequest ctrlRequest[backendResponse[struct{}]]
type rmRequest ctrlRequest[backendResponse[struct{}]]
type refreshRequest ctrlRequest[backendResponse[struct{}]]
type lookupRequest ctrlRequest[backendResponse[Resource]]
type synchronousRequest ctrlRequest[backendResponse[struct{}]]
type adjustIntervalRequest struct {
resource Resource
}
type backendResponse[T any] struct {
payload T
err error
}
func sendBackend[TReq any, TB any](ctx context.Context, backendCh chan any, v TReq, replyCh chan backendResponse[TB]) (TB, error) {
select {
case <-ctx.Done():
case backendCh <- v:
}
select {
case <-ctx.Done():
var zero TB
return zero, ctx.Err()
case res := <-replyCh:
return res.payload, res.err
}
}
// Lookup returns a resource by its URL. If the resource does not exist, it
// will return an error.
//
// Unfortunately, due to the way typed parameters are handled in Go, we can only
// return a Resource object (and not a ResourceBase[T] object). This means that
// you will either need to use the `Resource.Get()` method or use a type
// assertion to obtain a `ResourceBase[T]` to get to the actual object you are
// looking for
func (c *controller) Lookup(ctx context.Context, u string) (Resource, error) {
reply := make(chan backendResponse[Resource], 1)
req := lookupRequest{
reply: reply,
u: u,
}
return sendBackend[lookupRequest, Resource](ctx, c.incoming, req, reply)
}
// Add adds a new resource to the controller. If the resource already
// exists, it will return an error.
//
// By default this function will automatically wait for the resource to be
// fetched once (by calling `r.Ready()`). Note that the `r.Ready()` call will NOT
// timeout unless you configure your context object with `context.WithTimeout`.
// To disable waiting, you can specify the `WithWaitReady(false)` option.
func (c *controller) Add(ctx context.Context, r Resource, options ...AddOption) error {
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: START Add(%q)", r.URL()))
defer c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: END Add(%q)", r.URL()))
waitReady := true
for _, option := range options {
switch option.Ident() {
case identWaitReady{}:
if err := option.Value(&waitReady); err != nil {
return fmt.Errorf(`httprc.Controller.Add: failed to parse WaitReady option: %w`, err)
}
}
}
if !c.wl.IsAllowed(r.URL()) {
return fmt.Errorf(`httprc.Controller.AddResource: cannot add %q: %w`, r.URL(), errBlockedByWhitelist)
}
reply := make(chan backendResponse[struct{}], 1)
req := addRequest{
reply: reply,
resource: r,
}
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: sending add request for %q to backend", r.URL()))
if _, err := sendBackend[addRequest, struct{}](ctx, c.incoming, req, reply); err != nil {
return err
}
if waitReady {
c.traceSink.Put(ctx, fmt.Sprintf("httprc controller: waiting for resource %q to be ready", r.URL()))
if err := r.Ready(ctx); err != nil {
return err
}
}
return nil
}
// Remove removes a resource from the controller. If the resource does
// not exist, it will return an error.
func (c *controller) Remove(ctx context.Context, u string) error {
reply := make(chan backendResponse[struct{}], 1)
req := rmRequest{
reply: reply,
u: u,
}
if _, err := sendBackend[rmRequest, struct{}](ctx, c.incoming, req, reply); err != nil {
return err
}
return nil
}
// Refresh forces a resource to be refreshed immediately. If the resource does
// not exist, or if the refresh fails, it will return an error.
//
// This function is synchronous, and will block until the resource has been refreshed.
func (c *controller) Refresh(ctx context.Context, u string) error {
reply := make(chan backendResponse[struct{}], 1)
req := refreshRequest{
reply: reply,
u: u,
}
if _, err := sendBackend[refreshRequest, struct{}](ctx, c.incoming, req, reply); err != nil {
return err
}
return nil
}
|