1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259
|
package ca
import (
"bytes"
"context"
"fmt"
"reflect"
"sync"
"time"
"github.com/cloudflare/cfssl/helpers"
"github.com/moby/swarmkit/v2/api"
"github.com/moby/swarmkit/v2/api/equality"
"github.com/moby/swarmkit/v2/log"
"github.com/moby/swarmkit/v2/manager/state/store"
"github.com/pkg/errors"
)
// IssuanceStateRotateMaxBatchSize is the maximum number of nodes we'll tell to rotate their certificates in any given update
const IssuanceStateRotateMaxBatchSize = 30
func hasIssuer(n *api.Node, info *IssuerInfo) bool {
if n.Description == nil || n.Description.TLSInfo == nil {
return false
}
return bytes.Equal(info.Subject, n.Description.TLSInfo.CertIssuerSubject) && bytes.Equal(info.PublicKey, n.Description.TLSInfo.CertIssuerPublicKey)
}
var errRootRotationChanged = errors.New("target root rotation has changed")
// rootRotationReconciler keeps track of all the nodes in the store so that we can determine which ones need reconciliation when nodes are updated
// or the root CA is updated. This is meant to be used with watches on nodes and the cluster, and provides functions to be called when the
// cluster's RootCA has changed and when a node is added, updated, or removed.
type rootRotationReconciler struct {
mu sync.Mutex
clusterID string
batchUpdateInterval time.Duration
ctx context.Context
store *store.MemoryStore
currentRootCA *api.RootCA
currentIssuer IssuerInfo
unconvergedNodes map[string]*api.Node
wg sync.WaitGroup
cancel func()
}
// IssuerFromAPIRootCA returns the desired issuer given an API root CA object
func IssuerFromAPIRootCA(rootCA *api.RootCA) (*IssuerInfo, error) {
wantedIssuer := rootCA.CACert
if rootCA.RootRotation != nil {
wantedIssuer = rootCA.RootRotation.CACert
}
issuerCerts, err := helpers.ParseCertificatesPEM(wantedIssuer)
if err != nil {
return nil, errors.Wrap(err, "invalid certificate in cluster root CA object")
}
if len(issuerCerts) == 0 {
return nil, errors.New("invalid certificate in cluster root CA object")
}
return &IssuerInfo{
Subject: issuerCerts[0].RawSubject,
PublicKey: issuerCerts[0].RawSubjectPublicKeyInfo,
}, nil
}
// assumption: UpdateRootCA will never be called with a `nil` root CA because the caller will be acting in response to
// a store update event
func (r *rootRotationReconciler) UpdateRootCA(newRootCA *api.RootCA) {
issuerInfo, err := IssuerFromAPIRootCA(newRootCA)
if err != nil {
log.G(r.ctx).WithError(err).Error("unable to update process the current root CA")
return
}
var (
shouldStartNewLoop, waitForPrevLoop bool
loopCtx context.Context
)
r.mu.Lock()
defer func() {
r.mu.Unlock()
if shouldStartNewLoop {
if waitForPrevLoop {
r.wg.Wait()
}
r.wg.Add(1)
go r.runReconcilerLoop(loopCtx, newRootCA)
}
}()
// check if the issuer has changed, first
if reflect.DeepEqual(&r.currentIssuer, issuerInfo) {
r.currentRootCA = newRootCA
return
}
// If the issuer has changed, iterate through all the nodes to figure out which ones need rotation
if newRootCA.RootRotation != nil {
var nodes []*api.Node
r.store.View(func(tx store.ReadTx) {
nodes, err = store.FindNodes(tx, store.All)
})
if err != nil {
log.G(r.ctx).WithError(err).Error("unable to list nodes, so unable to process the current root CA")
return
}
// from here on out, there will be no more errors that cause us to have to abandon updating the Root CA,
// so we can start making changes to r's fields
r.unconvergedNodes = make(map[string]*api.Node)
for _, n := range nodes {
if !hasIssuer(n, issuerInfo) {
r.unconvergedNodes[n.ID] = n
}
}
shouldStartNewLoop = true
if r.cancel != nil { // there's already a loop going, so cancel it
r.cancel()
waitForPrevLoop = true
}
loopCtx, r.cancel = context.WithCancel(r.ctx)
} else {
r.unconvergedNodes = nil
}
r.currentRootCA = newRootCA
r.currentIssuer = *issuerInfo
}
// assumption: UpdateNode will never be called with a `nil` node because the caller will be acting in response to
// a store update event
func (r *rootRotationReconciler) UpdateNode(node *api.Node) {
r.mu.Lock()
defer r.mu.Unlock()
// if we're not in the middle of a root rotation ignore the update
if r.currentRootCA == nil || r.currentRootCA.RootRotation == nil {
return
}
if hasIssuer(node, &r.currentIssuer) {
delete(r.unconvergedNodes, node.ID)
} else {
r.unconvergedNodes[node.ID] = node
}
}
// assumption: DeleteNode will never be called with a `nil` node because the caller will be acting in response to
// a store update event
func (r *rootRotationReconciler) DeleteNode(node *api.Node) {
r.mu.Lock()
delete(r.unconvergedNodes, node.ID)
r.mu.Unlock()
}
func (r *rootRotationReconciler) runReconcilerLoop(ctx context.Context, loopRootCA *api.RootCA) {
defer r.wg.Done()
for {
r.mu.Lock()
if len(r.unconvergedNodes) == 0 {
r.mu.Unlock()
err := r.store.Update(func(tx store.Tx) error {
return r.finishRootRotation(tx, loopRootCA)
})
if err == nil {
log.G(r.ctx).Info("completed root rotation")
return
}
log.G(r.ctx).WithError(err).Error("could not complete root rotation")
if err == errRootRotationChanged {
// if the root rotation has changed, this loop will be cancelled anyway, so may as well abort early
return
}
} else {
var toUpdate []*api.Node
for _, n := range r.unconvergedNodes {
iState := n.Certificate.Status.State
if iState != api.IssuanceStateRenew && iState != api.IssuanceStatePending && iState != api.IssuanceStateRotate {
n = n.Copy()
n.Certificate.Status.State = api.IssuanceStateRotate
toUpdate = append(toUpdate, n)
if len(toUpdate) >= IssuanceStateRotateMaxBatchSize {
break
}
}
}
r.mu.Unlock()
if err := r.batchUpdateNodes(toUpdate); err != nil {
log.G(r.ctx).WithError(err).Errorf("store error when trying to batch update %d nodes to request certificate rotation", len(toUpdate))
}
}
select {
case <-ctx.Done():
return
case <-time.After(r.batchUpdateInterval):
}
}
}
// This function assumes that the expected root CA has root rotation. This is intended to be used by
// `reconcileNodeRootsAndCerts`, which uses the root CA from the `lastSeenClusterRootCA`, and checks
// that it has a root rotation before calling this function.
func (r *rootRotationReconciler) finishRootRotation(tx store.Tx, expectedRootCA *api.RootCA) error {
cluster := store.GetCluster(tx, r.clusterID)
if cluster == nil {
return fmt.Errorf("unable to get cluster %s", r.clusterID)
}
// If the RootCA object has changed (because another root rotation was started or because some other node
// had finished the root rotation), we cannot finish the root rotation that we were working on.
if !equality.RootCAEqualStable(expectedRootCA, &cluster.RootCA) {
return errRootRotationChanged
}
var signerCert []byte
if len(cluster.RootCA.RootRotation.CAKey) > 0 {
signerCert = cluster.RootCA.RootRotation.CACert
}
// we don't actually have to parse out the default node expiration from the cluster - we are just using
// the ca.RootCA object to generate new tokens and the digest
updatedRootCA, err := NewRootCA(cluster.RootCA.RootRotation.CACert, signerCert, cluster.RootCA.RootRotation.CAKey,
DefaultNodeCertExpiration, nil)
if err != nil {
return errors.Wrap(err, "invalid cluster root rotation object")
}
cluster.RootCA = api.RootCA{
CACert: cluster.RootCA.RootRotation.CACert,
CAKey: cluster.RootCA.RootRotation.CAKey,
CACertHash: updatedRootCA.Digest.String(),
JoinTokens: api.JoinTokens{
Worker: GenerateJoinToken(&updatedRootCA, cluster.FIPS),
Manager: GenerateJoinToken(&updatedRootCA, cluster.FIPS),
},
LastForcedRotation: cluster.RootCA.LastForcedRotation,
}
return store.UpdateCluster(tx, cluster)
}
func (r *rootRotationReconciler) batchUpdateNodes(toUpdate []*api.Node) error {
if len(toUpdate) == 0 {
return nil
}
err := r.store.Batch(func(batch *store.Batch) error {
// Directly update the nodes rather than get + update, and ignore version errors. Since
// `rootRotationReconciler` should be hooked up to all node update/delete/create events, we should have
// close to the latest versions of all the nodes. If not, the node will updated later and the
// next batch of updates should catch it.
for _, n := range toUpdate {
if err := batch.Update(func(tx store.Tx) error {
return store.UpdateNode(tx, n)
}); err != nil && err != store.ErrSequenceConflict {
log.G(r.ctx).WithError(err).Errorf("unable to update node %s to request a certificate rotation", n.ID)
}
}
return nil
})
return err
}
|