File: utils.go

package info (click to toggle)
icingadb 1.5.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 59,960 kB
  • sloc: ansic: 170,157; asm: 7,097; sql: 4,098; sh: 1,614; cpp: 1,132; makefile: 438; xml: 160
file content (140 lines) | stat: -rw-r--r-- 3,816 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package icingaredis

import (
	"context"
	"github.com/icinga/icinga-go-library/com"
	"github.com/icinga/icinga-go-library/database"
	"github.com/icinga/icinga-go-library/redis"
	"github.com/icinga/icinga-go-library/strcase"
	"github.com/icinga/icinga-go-library/types"
	"github.com/icinga/icingadb/pkg/common"
	"github.com/icinga/icingadb/pkg/contracts"
	"github.com/pkg/errors"
	"golang.org/x/sync/errgroup"
	"runtime"
)

// CreateEntities streams and creates entities from the
// given Redis field value pairs using the specified factory function,
// and streams them on a returned channel.
func CreateEntities(ctx context.Context, factoryFunc database.EntityFactoryFunc, pairs <-chan redis.HPair, concurrent int) (<-chan database.Entity, <-chan error) {
	entities := make(chan database.Entity)
	g, ctx := errgroup.WithContext(ctx)

	g.Go(func() error {
		defer close(entities)

		g, ctx := errgroup.WithContext(ctx)

		for range concurrent {
			g.Go(func() error {
				for {
					select {
					case pair, ok := <-pairs:
						if !ok {
							return nil
						}

						var id types.Binary

						if err := id.UnmarshalText([]byte(pair.Field)); err != nil {
							return errors.Wrapf(err, "can't create ID from value %#v", pair.Field)
						}

						e := factoryFunc()
						if err := types.UnmarshalJSON([]byte(pair.Value), e); err != nil {
							return err
						}
						e.SetID(id)

						select {
						case entities <- e:
						case <-ctx.Done():
							return ctx.Err()
						}
					case <-ctx.Done():
						return ctx.Err()
					}
				}
			})
		}

		return g.Wait()
	})

	return entities, com.WaitAsync(g)
}

// SetChecksums concurrently streams from the given entities and
// sets their checksums using the specified map and
// streams the results on a returned channel.
func SetChecksums(ctx context.Context, entities <-chan database.Entity, checksums map[string]database.Entity, concurrent int) (<-chan database.Entity, <-chan error) {
	entitiesWithChecksum := make(chan database.Entity)
	g, ctx := errgroup.WithContext(ctx)

	g.Go(func() error {
		defer close(entitiesWithChecksum)

		g, ctx := errgroup.WithContext(ctx)

		for range concurrent {
			g.Go(func() error {
				for {
					select {
					case entity, ok := <-entities:
						if !ok {
							return nil
						}

						if checksumer, ok := checksums[entity.ID().String()]; ok {
							entity, entityOk := entity.(contracts.Checksumer)
							if !entityOk {
								return errors.New("entity does not implement contracts.Checksumer")
							}
							checksumer, checksumerOk := checksumer.(contracts.Checksumer)
							if !checksumerOk {
								return errors.New("checksumer does not implement contracts.Checksumer")
							}
							entity.SetChecksum(checksumer.Checksum())
						} else {
							return errors.Errorf("no checksum for %#v", entity)
						}

						select {
						case entitiesWithChecksum <- entity:
						case <-ctx.Done():
							return ctx.Err()
						}
					case <-ctx.Done():
						return ctx.Err()
					}
				}
			})
		}

		return g.Wait()
	})

	return entitiesWithChecksum, com.WaitAsync(g)
}

// YieldAll yields all entities from Redis that belong to the specified SyncSubject.
func YieldAll(ctx context.Context, c *redis.Client, subject *common.SyncSubject) (<-chan database.Entity, <-chan error) {
	key := strcase.Delimited(types.Name(subject.Entity()), ':')
	if subject.WithChecksum() {
		key = "icinga:checksum:" + key
	} else {
		key = "icinga:" + key
	}

	pairs, errs := c.HYield(ctx, key)
	g, ctx := errgroup.WithContext(ctx)
	// Let errors from HYield cancel the group.
	com.ErrgroupReceive(g, errs)

	desired, errs := CreateEntities(ctx, subject.FactoryForDelta(), pairs, runtime.NumCPU())
	// Let errors from CreateEntities cancel the group.
	com.ErrgroupReceive(g, errs)

	return desired, com.WaitAsync(g)
}