1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
|
package workloadapi
import (
"context"
"sync"
"github.com/spiffe/go-spiffe/v2/bundle/jwtbundle"
"github.com/spiffe/go-spiffe/v2/spiffeid"
"github.com/spiffe/go-spiffe/v2/svid/jwtsvid"
"github.com/zeebo/errs"
)
var jwtsourceErr = errs.Class("jwtsource")
// JWTSource is a source of JWT-SVID and JWT bundles maintained via the
// Workload API.
type JWTSource struct {
watcher *watcher
picker func([]*jwtsvid.SVID) *jwtsvid.SVID
mtx sync.RWMutex
bundles *jwtbundle.Set
closeMtx sync.RWMutex
closed bool
}
// NewJWTSource creates a new JWTSource. It blocks until the initial update
// has been received from the Workload API. The source should be closed when
// no longer in use to free underlying resources.
func NewJWTSource(ctx context.Context, options ...JWTSourceOption) (_ *JWTSource, err error) {
config := &jwtSourceConfig{}
for _, option := range options {
option.configureJWTSource(config)
}
s := &JWTSource{
picker: config.picker,
}
s.watcher, err = newWatcher(ctx, config.watcher, nil, s.setJWTBundles)
if err != nil {
return nil, err
}
return s, nil
}
// Close closes the source, dropping the connection to the Workload API.
// Other source methods will return an error after Close has been called.
// The underlying Workload API client will also be closed if it is owned by
// the JWTSource (i.e. not provided via the WithClient option).
func (s *JWTSource) Close() error {
s.closeMtx.Lock()
s.closed = true
s.closeMtx.Unlock()
return s.watcher.Close()
}
// FetchJWTSVID fetches a JWT-SVID from the source with the given parameters.
// It implements the jwtsvid.Source interface.
func (s *JWTSource) FetchJWTSVID(ctx context.Context, params jwtsvid.Params) (*jwtsvid.SVID, error) {
if err := s.checkClosed(); err != nil {
return nil, err
}
var (
svid *jwtsvid.SVID
err error
)
if s.picker == nil {
svid, err = s.watcher.client.FetchJWTSVID(ctx, params)
} else {
svids, err := s.watcher.client.FetchJWTSVIDs(ctx, params)
if err != nil {
return svid, err
}
svid = s.picker(svids)
}
return svid, err
}
// FetchJWTSVIDs fetches all JWT-SVIDs from the source with the given parameters.
// It implements the jwtsvid.Source interface.
func (s *JWTSource) FetchJWTSVIDs(ctx context.Context, params jwtsvid.Params) ([]*jwtsvid.SVID, error) {
if err := s.checkClosed(); err != nil {
return nil, err
}
return s.watcher.client.FetchJWTSVIDs(ctx, params)
}
// GetJWTBundleForTrustDomain returns the JWT bundle for the given trust
// domain. It implements the jwtbundle.Source interface.
func (s *JWTSource) GetJWTBundleForTrustDomain(trustDomain spiffeid.TrustDomain) (*jwtbundle.Bundle, error) {
if err := s.checkClosed(); err != nil {
return nil, err
}
return s.bundles.GetJWTBundleForTrustDomain(trustDomain)
}
// WaitUntilUpdated waits until the source is updated or the context is done,
// in which case ctx.Err() is returned.
func (s *JWTSource) WaitUntilUpdated(ctx context.Context) error {
return s.watcher.WaitUntilUpdated(ctx)
}
// Updated returns a channel that is sent on whenever the source is updated.
func (s *JWTSource) Updated() <-chan struct{} {
return s.watcher.Updated()
}
func (s *JWTSource) setJWTBundles(bundles *jwtbundle.Set) {
s.mtx.Lock()
defer s.mtx.Unlock()
s.bundles = bundles
}
func (s *JWTSource) checkClosed() error {
s.closeMtx.RLock()
defer s.closeMtx.RUnlock()
if s.closed {
return jwtsourceErr.New("source is closed")
}
return nil
}
|