1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
|
package dnsresolver
import (
"context"
"net"
"sync"
"time"
"github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/v16/internal/backoff"
"gitlab.com/gitlab-org/gitaly/v16/internal/structerr"
"google.golang.org/grpc/resolver"
)
type dnsResolver struct {
logger *logrus.Entry
retry backoff.Strategy
ctx context.Context
cancel context.CancelFunc
cc resolver.ClientConn
host string
port string
refreshRate time.Duration
lookup dnsLookuper
reqs chan struct{}
wg sync.WaitGroup
}
var dnsLookupTimeout = 15 * time.Second
type dnsLookuper interface {
LookupHost(context.Context, string) ([]string, error)
}
// ResolveNow signals the resolver to perform a DNS resolution immediately. This method returns
// without waiting for the result. The resolver treats this as a hint rather than a command. The
// client connection receives the resolution result asynchronously via `clientconn.UpdateState`
// This method also skip resolver caching because it's likely the client calls this method after
// encounter an error with recent subchannels.
func (d *dnsResolver) ResolveNow(resolver.ResolveNowOptions) {
select {
case d.reqs <- struct{}{}:
default:
}
}
// Close cancels all activities of this dns resolver. It waits until the watch goroutine exits.
func (d *dnsResolver) Close() {
d.cancel()
d.wg.Wait()
}
func (d *dnsResolver) watch() {
defer d.wg.Done()
d.logger.Info("dns resolver: started")
defer d.logger.Info("dns resolver: stopped")
// Exponential retry after failed to resolve or client connection failed to update its state
var retries uint
for {
state, err := d.resolve()
if err != nil {
d.logger.WithField("dns.retries", retries).WithField("dns.error", err).Error(
"dns resolver: fail to lookup dns")
d.cc.ReportError(err)
} else {
err = d.updateState(state)
}
var timer *time.Timer
if err == nil {
timer = time.NewTimer(d.refreshRate)
retries = 0
} else {
timer = time.NewTimer(d.retry.Backoff(retries))
retries++
}
select {
case <-d.ctx.Done():
timer.Stop()
return
case <-timer.C:
// Refresh timer expires, issue another DNS lookup.
d.logger.Debug("dns resolver: refreshing")
continue
case <-d.reqs:
// If the resolver is requested to resolve now, force notify the client
// connection. Typically, client connection contacts the resolver when any
// of the subchannels change its connectivity state.
timer.Stop()
d.logger.Debug("dns resolver: handle ResolveNow request")
}
}
}
func (d *dnsResolver) updateState(state *resolver.State) error {
d.logger.WithField("dns.state", state).Info("dns resolver: updating state")
return d.cc.UpdateState(*state)
}
func (d *dnsResolver) resolve() (*resolver.State, error) {
ctx, cancel := context.WithTimeout(d.ctx, dnsLookupTimeout)
defer cancel()
addrs, err := d.lookup.LookupHost(ctx, d.host)
if err != nil {
err = handleDNSError(err)
return &resolver.State{Addresses: []resolver.Address{}}, err
}
newAddrs := make([]resolver.Address, 0, len(addrs))
for _, a := range addrs {
addr, ok := tryParseIP(a, d.port)
if !ok {
return nil, structerr.New("dns: error parsing dns record IP address %v", a)
}
newAddrs = append(newAddrs, resolver.Address{Addr: addr})
}
return &resolver.State{Addresses: newAddrs}, nil
}
// handleDNSError massages the error to fit into expectations of the gRPC model:
// - Timeouts and temporary errors should be communicated to gRPC to attempt another DNS query (with
// Backoff).
// - Other errors should be suppressed (they may represent the absence of a TXT record).
func handleDNSError(err error) error {
if dnsErr, ok := err.(*net.DNSError); ok && !dnsErr.IsTimeout && !dnsErr.IsTemporary {
return nil
}
return structerr.New("dns: record resolve error: %w", err)
}
|