1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
|
package httprc
import (
"context"
"fmt"
"sync"
)
type worker struct {
httpcl HTTPClient
incoming chan any
next <-chan Resource
nextsync <-chan synchronousRequest
errSink ErrorSink
traceSink TraceSink
}
func (w worker) Run(ctx context.Context, readywg *sync.WaitGroup, donewg *sync.WaitGroup) {
w.traceSink.Put(ctx, "httprc worker: START worker loop")
defer w.traceSink.Put(ctx, "httprc worker: END worker loop")
defer donewg.Done()
ctx = withTraceSink(ctx, w.traceSink)
ctx = withHTTPClient(ctx, w.httpcl)
readywg.Done()
for {
select {
case <-ctx.Done():
w.traceSink.Put(ctx, "httprc worker: stopping worker loop")
return
case r := <-w.next:
w.traceSink.Put(ctx, fmt.Sprintf("httprc worker: syncing %q (async)", r.URL()))
if err := r.Sync(ctx); err != nil {
w.errSink.Put(ctx, err)
}
r.SetBusy(false)
w.sendAdjustIntervalRequest(ctx, r)
case sr := <-w.nextsync:
w.traceSink.Put(ctx, fmt.Sprintf("httprc worker: syncing %q (synchronous)", sr.resource.URL()))
if err := sr.resource.Sync(ctx); err != nil {
w.traceSink.Put(ctx, fmt.Sprintf("httprc worker: FAILED to sync %q (synchronous): %s", sr.resource.URL(), err))
sendReply(ctx, sr.reply, struct{}{}, err)
sr.resource.SetBusy(false)
return
}
w.traceSink.Put(ctx, fmt.Sprintf("httprc worker: SUCCESS syncing %q (synchronous)", sr.resource.URL()))
sr.resource.SetBusy(false)
sendReply(ctx, sr.reply, struct{}{}, nil)
w.sendAdjustIntervalRequest(ctx, sr.resource)
}
}
}
func (w worker) sendAdjustIntervalRequest(ctx context.Context, r Resource) {
w.traceSink.Put(ctx, "httprc worker: Sending interval adjustment request for "+r.URL())
select {
case <-ctx.Done():
case w.incoming <- adjustIntervalRequest{resource: r}:
}
w.traceSink.Put(ctx, "httprc worker: Sent interval adjustment request for "+r.URL())
}
|