1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
|
package icingaredis
import (
"context"
"github.com/icinga/icinga-go-library/com"
"github.com/icinga/icinga-go-library/database"
"github.com/icinga/icinga-go-library/redis"
"github.com/icinga/icinga-go-library/strcase"
"github.com/icinga/icinga-go-library/types"
"github.com/icinga/icingadb/pkg/common"
"github.com/icinga/icingadb/pkg/contracts"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"runtime"
)
// CreateEntities streams and creates entities from the
// given Redis field value pairs using the specified factory function,
// and streams them on a returned channel.
func CreateEntities(ctx context.Context, factoryFunc database.EntityFactoryFunc, pairs <-chan redis.HPair, concurrent int) (<-chan database.Entity, <-chan error) {
entities := make(chan database.Entity)
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
defer close(entities)
g, ctx := errgroup.WithContext(ctx)
for range concurrent {
g.Go(func() error {
for {
select {
case pair, ok := <-pairs:
if !ok {
return nil
}
var id types.Binary
if err := id.UnmarshalText([]byte(pair.Field)); err != nil {
return errors.Wrapf(err, "can't create ID from value %#v", pair.Field)
}
e := factoryFunc()
if err := types.UnmarshalJSON([]byte(pair.Value), e); err != nil {
return err
}
e.SetID(id)
select {
case entities <- e:
case <-ctx.Done():
return ctx.Err()
}
case <-ctx.Done():
return ctx.Err()
}
}
})
}
return g.Wait()
})
return entities, com.WaitAsync(g)
}
// SetChecksums concurrently streams from the given entities and
// sets their checksums using the specified map and
// streams the results on a returned channel.
func SetChecksums(ctx context.Context, entities <-chan database.Entity, checksums map[string]database.Entity, concurrent int) (<-chan database.Entity, <-chan error) {
entitiesWithChecksum := make(chan database.Entity)
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
defer close(entitiesWithChecksum)
g, ctx := errgroup.WithContext(ctx)
for range concurrent {
g.Go(func() error {
for {
select {
case entity, ok := <-entities:
if !ok {
return nil
}
if checksumer, ok := checksums[entity.ID().String()]; ok {
entity, entityOk := entity.(contracts.Checksumer)
if !entityOk {
return errors.New("entity does not implement contracts.Checksumer")
}
checksumer, checksumerOk := checksumer.(contracts.Checksumer)
if !checksumerOk {
return errors.New("checksumer does not implement contracts.Checksumer")
}
entity.SetChecksum(checksumer.Checksum())
} else {
return errors.Errorf("no checksum for %#v", entity)
}
select {
case entitiesWithChecksum <- entity:
case <-ctx.Done():
return ctx.Err()
}
case <-ctx.Done():
return ctx.Err()
}
}
})
}
return g.Wait()
})
return entitiesWithChecksum, com.WaitAsync(g)
}
// YieldAll yields all entities from Redis that belong to the specified SyncSubject.
func YieldAll(ctx context.Context, c *redis.Client, subject *common.SyncSubject) (<-chan database.Entity, <-chan error) {
key := strcase.Delimited(types.Name(subject.Entity()), ':')
if subject.WithChecksum() {
key = "icinga:checksum:" + key
} else {
key = "icinga:" + key
}
pairs, errs := c.HYield(ctx, key)
g, ctx := errgroup.WithContext(ctx)
// Let errors from HYield cancel the group.
com.ErrgroupReceive(g, errs)
desired, errs := CreateEntities(ctx, subject.FactoryForDelta(), pairs, runtime.NumCPU())
// Let errors from CreateEntities cancel the group.
com.ErrgroupReceive(g, errs)
return desired, com.WaitAsync(g)
}
|