File: bundlesource.go

package info (click to toggle)
golang-github-spiffe-go-spiffe 2.5.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,116 kB
  • sloc: makefile: 157
file content (188 lines) | stat: -rw-r--r-- 6,132 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package workloadapi

import (
	"context"
	"crypto"
	"crypto/x509"
	"sync"

	"github.com/spiffe/go-spiffe/v2/bundle/jwtbundle"
	"github.com/spiffe/go-spiffe/v2/bundle/spiffebundle"
	"github.com/spiffe/go-spiffe/v2/bundle/x509bundle"
	"github.com/spiffe/go-spiffe/v2/spiffeid"
	"github.com/zeebo/errs"
)

var bundlesourceErr = errs.Class("bundlesource")

// BundleSource is a source of SPIFFE bundles maintained via the Workload API.
type BundleSource struct {
	watcher *watcher

	mtx             sync.RWMutex
	x509Authorities map[spiffeid.TrustDomain][]*x509.Certificate
	jwtAuthorities  map[spiffeid.TrustDomain]map[string]crypto.PublicKey

	closeMtx sync.RWMutex
	closed   bool
}

// NewBundleSource creates a new BundleSource. It blocks until the initial
// update has been received from the Workload API. The source should be closed
// when no longer in use to free underlying resources.
func NewBundleSource(ctx context.Context, options ...BundleSourceOption) (_ *BundleSource, err error) {
	config := &bundleSourceConfig{}
	for _, option := range options {
		option.configureBundleSource(config)
	}

	s := &BundleSource{
		x509Authorities: make(map[spiffeid.TrustDomain][]*x509.Certificate),
		jwtAuthorities:  make(map[spiffeid.TrustDomain]map[string]crypto.PublicKey),
	}

	s.watcher, err = newWatcher(ctx, config.watcher, s.setX509Context, s.setJWTBundles)
	if err != nil {
		return nil, err
	}

	return s, nil
}

// Close closes the source, dropping the connection to the Workload API.
// Other source methods will return an error after Close has been called.
// The underlying Workload API client will also be closed if it is owned by
// the BundleSource (i.e. not provided via the WithClient option).
func (s *BundleSource) Close() error {
	s.closeMtx.Lock()
	s.closed = true
	s.closeMtx.Unlock()

	return s.watcher.Close()
}

// GetBundleForTrustDomain returns the SPIFFE bundle for the given trust
// domain. It implements the spiffebundle.Source interface.
func (s *BundleSource) GetBundleForTrustDomain(trustDomain spiffeid.TrustDomain) (*spiffebundle.Bundle, error) {
	if err := s.checkClosed(); err != nil {
		return nil, err
	}
	s.mtx.RLock()
	defer s.mtx.RUnlock()

	x509Authorities, hasX509Authorities := s.x509Authorities[trustDomain]
	jwtAuthorities, hasJWTAuthorities := s.jwtAuthorities[trustDomain]
	if !hasX509Authorities && !hasJWTAuthorities {
		return nil, bundlesourceErr.New("no SPIFFE bundle for trust domain %q", trustDomain)
	}
	bundle := spiffebundle.New(trustDomain)
	if hasX509Authorities {
		bundle.SetX509Authorities(x509Authorities)
	}
	if hasJWTAuthorities {
		bundle.SetJWTAuthorities(jwtAuthorities)
	}
	return bundle, nil
}

// GetX509BundleForTrustDomain returns the X.509 bundle for the given trust
// domain. It implements the x509bundle.Source interface.
func (s *BundleSource) GetX509BundleForTrustDomain(trustDomain spiffeid.TrustDomain) (*x509bundle.Bundle, error) {
	if err := s.checkClosed(); err != nil {
		return nil, err
	}
	s.mtx.RLock()
	defer s.mtx.RUnlock()

	x509Authorities, hasX509Authorities := s.x509Authorities[trustDomain]
	if !hasX509Authorities {
		return nil, bundlesourceErr.New("no X.509 bundle for trust domain %q", trustDomain)
	}
	return x509bundle.FromX509Authorities(trustDomain, x509Authorities), nil
}

// GetJWTBundleForTrustDomain returns the JWT bundle for the given trust
// domain. It implements the jwtbundle.Source interface.
func (s *BundleSource) GetJWTBundleForTrustDomain(trustDomain spiffeid.TrustDomain) (*jwtbundle.Bundle, error) {
	if err := s.checkClosed(); err != nil {
		return nil, err
	}
	s.mtx.RLock()
	defer s.mtx.RUnlock()

	jwtAuthorities, hasJWTAuthorities := s.jwtAuthorities[trustDomain]
	if !hasJWTAuthorities {
		return nil, bundlesourceErr.New("no JWT bundle for trust domain %q", trustDomain)
	}
	return jwtbundle.FromJWTAuthorities(trustDomain, jwtAuthorities), nil
}

// WaitUntilUpdated waits until the source is updated or the context is done,
// in which case ctx.Err() is returned.
func (s *BundleSource) WaitUntilUpdated(ctx context.Context) error {
	return s.watcher.WaitUntilUpdated(ctx)
}

// Updated returns a channel that is sent on whenever the source is updated.
func (s *BundleSource) Updated() <-chan struct{} {
	return s.watcher.Updated()
}

func (s *BundleSource) setX509Context(x509Context *X509Context) {
	s.mtx.Lock()
	defer s.mtx.Unlock()

	newBundles := x509Context.Bundles.Bundles()

	// Add/replace the X.509 authorities from the X.509 context. Track the trust
	// domains represented in the new X.509 context so we can determine which
	// existing trust domains are no longer represented.
	trustDomains := make(map[spiffeid.TrustDomain]struct{}, len(newBundles))
	for _, newBundle := range newBundles {
		trustDomains[newBundle.TrustDomain()] = struct{}{}
		s.x509Authorities[newBundle.TrustDomain()] = newBundle.X509Authorities()
	}

	// Remove the X.509 authority entries for trust domains no longer
	// represented in the X.509 context.
	for existingTD := range s.x509Authorities {
		if _, ok := trustDomains[existingTD]; ok {
			continue
		}
		delete(s.x509Authorities, existingTD)
	}
}

func (s *BundleSource) setJWTBundles(bundles *jwtbundle.Set) {
	s.mtx.Lock()
	defer s.mtx.Unlock()

	newBundles := bundles.Bundles()

	// Add/replace the JWT authorities from the JWT bundles. Track the trust
	// domains represented in the new JWT bundles so we can determine which
	// existing trust domains are no longer represented.
	trustDomains := make(map[spiffeid.TrustDomain]struct{}, len(newBundles))
	for _, newBundle := range newBundles {
		trustDomains[newBundle.TrustDomain()] = struct{}{}
		s.jwtAuthorities[newBundle.TrustDomain()] = newBundle.JWTAuthorities()
	}

	// Remove the JWT authority entries for trust domains no longer represented
	// in the JWT bundles.
	for existingTD := range s.jwtAuthorities {
		if _, ok := trustDomains[existingTD]; ok {
			continue
		}
		delete(s.jwtAuthorities, existingTD)
	}
}

func (s *BundleSource) checkClosed() error {
	s.closeMtx.RLock()
	defer s.closeMtx.RUnlock()
	if s.closed {
		return bundlesourceErr.New("source is closed")
	}
	return nil
}