1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
|
// Copyright 2016 Google LLC. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package fixchain
import (
"context"
"log"
"net/http"
"sync"
"sync/atomic"
"time"
"github.com/google/certificate-transparency-go/client"
"github.com/google/certificate-transparency-go/x509"
)
// FixAndLog contains a Fixer and a Logger, for all your fix-then-log-chain needs!
type FixAndLog struct {
fixer *Fixer
chains chan []*x509.Certificate
logger *Logger
wg sync.WaitGroup
// Number of whole chains queued - before checking cache & adding chains for intermediate certs.
queued uint32
// Cache of chains that QueueAllCertsInChain() has already been called on.
done *lockedMap
// Number of whole chains submitted to the FixAndLog using QueueAllCertsInChain() (before adding chains for intermediate certs) that had previously been processed.
alreadyDone uint32
// Number of chains queued total, including chains from intermediate certs.
// Note that in each chain there is len(chain) certs. So when calling QueueAllCertsInChain(chain), len(chain) chains will actually be added to the queue.
chainsQueued uint32
// Number of chains whose leaf cert has already been posted to the log with a valid chain.
alreadyPosted uint32
// Number of chains sent on to the Fixer to begin fixing & logging!
chainsSent uint32
}
// QueueAllCertsInChain adds every cert in the chain and the chain to the queue
// to be fixed and logged.
func (fl *FixAndLog) QueueAllCertsInChain(chain []*x509.Certificate) {
if chain != nil {
atomic.AddUint32(&fl.queued, 1)
atomic.AddUint32(&fl.chainsQueued, uint32(len(chain)))
dchain := newDedupedChain(chain)
// Caching check
h := hashBag(dchain.certs)
if fl.done.get(h) {
atomic.AddUint32(&fl.alreadyDone, 1)
return
}
fl.done.set(h, true)
for _, cert := range dchain.certs {
if fl.logger.IsPosted(cert) {
atomic.AddUint32(&fl.alreadyPosted, 1)
continue
}
fl.fixer.QueueChain(cert, dchain.certs, fl.logger.RootCerts())
atomic.AddUint32(&fl.chainsSent, 1)
}
}
}
// QueueChain queues the given chain to be fixed wrt the roots of the logger
// contained in fl, and then logged to the Certificate Transparency log
// represented by the logger. Note: chain is expected to be in the order of
// cert --> root.
func (fl *FixAndLog) QueueChain(chain []*x509.Certificate) {
if chain != nil {
if fl.logger.IsPosted(chain[0]) {
atomic.AddUint32(&fl.alreadyPosted, 1)
return
}
fl.fixer.QueueChain(chain[0], chain, fl.logger.RootCerts())
atomic.AddUint32(&fl.chainsSent, 1)
}
}
// Wait waits for the all of the queued chains to complete being fixed and
// logged.
func (fl *FixAndLog) Wait() {
fl.fixer.Wait()
close(fl.chains)
fl.wg.Wait()
fl.logger.Wait()
}
// NewFixAndLog creates an object that will asynchronously fix any chains that
// are added to its queue, and then log them to the Certificate Transparency log
// found at the given url. Any errors encountered along the way are pushed to
// the given errors channel.
func NewFixAndLog(ctx context.Context, fixerWorkerCount int, loggerWorkerCount int, errors chan<- *FixError, client *http.Client, logClient client.AddLogClient, limiter Limiter, logStats bool) *FixAndLog {
chains := make(chan []*x509.Certificate)
fl := &FixAndLog{
fixer: NewFixer(fixerWorkerCount, chains, errors, client, logStats),
chains: chains,
logger: NewLogger(ctx, loggerWorkerCount, errors, logClient, limiter, logStats),
done: newLockedMap(),
}
fl.wg.Add(1)
go func() {
for chain := range chains {
fl.logger.QueueChain(chain)
}
fl.wg.Done()
}()
if logStats {
t := time.NewTicker(time.Second)
go func() {
for range t.C {
log.Printf("fix-then-log: %d whole chains queued, %d whole chains already done, %d total chains queued, %d chains don't need posting (cache hits), %d chains sent to fixer", fl.queued, fl.alreadyDone, fl.chainsQueued, fl.alreadyPosted, fl.chainsSent)
}
}()
}
return fl
}
|