1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
|
package eureka
import (
"fmt"
"net/http"
"sync"
"time"
"github.com/hudl/fargo"
"github.com/go-kit/kit/sd"
"github.com/go-kit/log"
)
// Matches official Netflix Java client default.
const defaultRenewalInterval = 30 * time.Second
// The methods of fargo.Connection used in this package.
type fargoConnection interface {
RegisterInstance(instance *fargo.Instance) error
DeregisterInstance(instance *fargo.Instance) error
ReregisterInstance(instance *fargo.Instance) error
HeartBeatInstance(instance *fargo.Instance) error
ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan fargo.AppUpdate
GetApp(name string) (*fargo.Application, error)
}
type fargoUnsuccessfulHTTPResponse struct {
statusCode int
messagePrefix string
}
func (u *fargoUnsuccessfulHTTPResponse) Error() string {
return fmt.Sprintf("err=%s code=%d", u.messagePrefix, u.statusCode)
}
// Registrar maintains service instance liveness information in Eureka.
type Registrar struct {
conn fargoConnection
instance *fargo.Instance
logger log.Logger
quitc chan chan struct{}
sync.Mutex
}
var _ sd.Registrar = (*Registrar)(nil)
// NewRegistrar returns an Eureka Registrar acting on behalf of the provided
// Fargo connection and instance. See the integration test for usage examples.
func NewRegistrar(conn fargoConnection, instance *fargo.Instance, logger log.Logger) *Registrar {
return &Registrar{
conn: conn,
instance: instance,
logger: log.With(logger, "service", instance.App, "address", fmt.Sprintf("%s:%d", instance.IPAddr, instance.Port)),
}
}
// Register implements sd.Registrar.
func (r *Registrar) Register() {
r.Lock()
defer r.Unlock()
if r.quitc != nil {
return // Already in the registration loop.
}
if err := r.conn.RegisterInstance(r.instance); err != nil {
r.logger.Log("during", "Register", "err", err)
}
r.quitc = make(chan chan struct{})
go r.loop()
}
// Deregister implements sd.Registrar.
func (r *Registrar) Deregister() {
r.Lock()
defer r.Unlock()
if r.quitc == nil {
return // Already deregistered.
}
q := make(chan struct{})
r.quitc <- q
<-q
r.quitc = nil
}
func (r *Registrar) loop() {
var renewalInterval time.Duration
if r.instance.LeaseInfo.RenewalIntervalInSecs > 0 {
renewalInterval = time.Duration(r.instance.LeaseInfo.RenewalIntervalInSecs) * time.Second
} else {
renewalInterval = defaultRenewalInterval
}
ticker := time.NewTicker(renewalInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := r.heartbeat(); err != nil {
r.logger.Log("during", "heartbeat", "err", err)
}
case q := <-r.quitc:
if err := r.conn.DeregisterInstance(r.instance); err != nil {
r.logger.Log("during", "Deregister", "err", err)
}
close(q)
return
}
}
}
func httpResponseStatusCode(err error) (code int, present bool) {
if code, ok := fargo.HTTPResponseStatusCode(err); ok {
return code, true
}
// Allow injection of errors for testing.
if u, ok := err.(*fargoUnsuccessfulHTTPResponse); ok {
return u.statusCode, true
}
return 0, false
}
func isNotFound(err error) bool {
code, ok := httpResponseStatusCode(err)
return ok && code == http.StatusNotFound
}
func (r *Registrar) heartbeat() error {
err := r.conn.HeartBeatInstance(r.instance)
if err == nil {
return nil
}
if isNotFound(err) {
// Instance expired (e.g. network partition). Re-register.
return r.conn.ReregisterInstance(r.instance)
}
return err
}
|