1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
|
// Package cbreaker implements circuit breaker similar to https://github.com/Netflix/Hystrix/wiki/How-it-Works
//
// Vulcan circuit breaker watches the error condtion to match
// after which it activates the fallback scenario, e.g. returns the response code
// or redirects the request to another location
//
// Circuit breakers start in the Standby state first, observing responses and watching location metrics.
//
// Once the Circuit breaker condition is met, it enters the "Tripped" state, where it activates fallback scenario
// for all requests during the FallbackDuration time period and reset the stats for the location.
//
// After FallbackDuration time period passes, Circuit breaker enters "Recovering" state, during that state it will
// start passing some traffic back to the endpoints, increasing the amount of passed requests using linear function:
//
// allowedRequestsRatio = 0.5 * (Now() - StartRecovery())/RecoveryDuration
//
// Two scenarios are possible in the "Recovering" state:
// 1. Condition matches again, this will reset the state to "Tripped" and reset the timer.
// 2. Condition does not match, circuit breaker enters "Standby" state
//
// It is possible to define actions (e.g. webhooks) of transitions between states:
//
// * OnTripped action is called on transition (Standby -> Tripped)
// * OnStandby action is called on transition (Recovering -> Standby)
package cbreaker
import (
"fmt"
"net/http"
"sync"
"time"
"github.com/vulcand/oxy/v2/internal/holsterv4/clock"
"github.com/vulcand/oxy/v2/memmetrics"
"github.com/vulcand/oxy/v2/utils"
)
// CircuitBreaker is http.Handler that implements circuit breaker pattern.
type CircuitBreaker struct {
m *sync.RWMutex
metrics *memmetrics.RTMetrics
condition hpredicate
fallbackDuration time.Duration
recoveryDuration time.Duration
onTripped SideEffect
onStandby SideEffect
state cbState
until clock.Time
rc *ratioController
checkPeriod time.Duration
lastCheck clock.Time
fallback http.Handler
next http.Handler
verbose bool
log utils.Logger
}
// New creates a new CircuitBreaker middleware.
func New(next http.Handler, expression string, options ...Option) (*CircuitBreaker, error) {
cb := &CircuitBreaker{
m: &sync.RWMutex{},
next: next,
// Default values. Might be overwritten by options below.
checkPeriod: defaultCheckPeriod,
fallbackDuration: defaultFallbackDuration,
recoveryDuration: defaultRecoveryDuration,
fallback: defaultFallback,
log: &utils.NoopLogger{},
}
for _, s := range options {
if err := s(cb); err != nil {
return nil, err
}
}
condition, err := parseExpression(expression)
if err != nil {
return nil, err
}
cb.condition = condition
mt, err := memmetrics.NewRTMetrics()
if err != nil {
return nil, err
}
cb.metrics = mt
return cb, nil
}
func (c *CircuitBreaker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if c.verbose {
dump := utils.DumpHTTPRequest(req)
c.log.Debug("vulcand/oxy/circuitbreaker: begin ServeHttp on request: %s", dump)
defer c.log.Debug("vulcand/oxy/circuitbreaker: completed ServeHttp on request: %s", dump)
}
if c.activateFallback(w, req) {
c.fallback.ServeHTTP(w, req)
return
}
c.serve(w, req)
}
// Fallback sets the fallback handler to be called by circuit breaker handler.
func (c *CircuitBreaker) Fallback(f http.Handler) {
c.fallback = f
}
// Wrap sets the next handler to be called by circuit breaker handler.
func (c *CircuitBreaker) Wrap(next http.Handler) {
c.next = next
}
// updateState updates internal state and returns true if fallback should be used and false otherwise.
func (c *CircuitBreaker) activateFallback(_ http.ResponseWriter, _ *http.Request) bool {
// Quick check with read locks optimized for normal operation use-case
if c.isStandby() {
return false
}
// Circuit breaker is in tripped or recovering state
c.m.Lock()
defer c.m.Unlock()
c.log.Warn("%v is in error state", c)
switch c.state {
case stateStandby:
// someone else has set it to standby just now
return false
case stateTripped:
if clock.Now().UTC().Before(c.until) {
return true
}
// We have been in active state enough, enter recovering state
c.setRecovering()
fallthrough
case stateRecovering:
// We have been in recovering state enough, enter standby and allow request
if clock.Now().UTC().After(c.until) {
c.setState(stateStandby, clock.Now().UTC())
return false
}
// ratio controller allows this request
if c.rc.allowRequest() {
return false
}
return true
}
return false
}
func (c *CircuitBreaker) serve(w http.ResponseWriter, req *http.Request) {
start := clock.Now().UTC()
p := utils.NewProxyWriterWithLogger(w, c.log)
c.next.ServeHTTP(p, req)
latency := clock.Now().UTC().Sub(start)
c.metrics.Record(p.StatusCode(), latency)
// Note that this call is less expensive than it looks -- checkCondition only performs the real check
// periodically. Because of that we can afford to call it here on every single response.
c.checkAndSet()
}
func (c *CircuitBreaker) isStandby() bool {
c.m.RLock()
defer c.m.RUnlock()
return c.state == stateStandby
}
// String returns log-friendly representation of the circuit breaker state.
func (c *CircuitBreaker) String() string {
switch c.state {
case stateTripped, stateRecovering:
return fmt.Sprintf("CircuitBreaker(state=%v, until=%v)", c.state, c.until)
default:
return fmt.Sprintf("CircuitBreaker(state=%v)", c.state)
}
}
// exec executes side effect.
func (c *CircuitBreaker) exec(s SideEffect) {
if s == nil {
return
}
go func() {
if err := s.Exec(); err != nil {
c.log.Error("%v side effect failure: %v", c, err)
}
}()
}
func (c *CircuitBreaker) setState(state cbState, until time.Time) {
c.log.Debug("%v setting state to %v, until %v", c, state, until)
c.state = state
c.until = until
switch state {
case stateTripped:
c.exec(c.onTripped)
case stateStandby:
c.exec(c.onStandby)
}
}
func (c *CircuitBreaker) timeToCheck() bool {
c.m.RLock()
defer c.m.RUnlock()
return clock.Now().UTC().After(c.lastCheck)
}
// Checks if tripping condition matches and sets circuit breaker to the tripped state.
func (c *CircuitBreaker) checkAndSet() {
if !c.timeToCheck() {
return
}
c.m.Lock()
defer c.m.Unlock()
// Other goroutine could have updated the lastCheck variable before we grabbed mutex
if clock.Now().UTC().Before(c.lastCheck) {
return
}
c.lastCheck = clock.Now().UTC().Add(c.checkPeriod)
if c.state == stateTripped {
c.log.Debug("%v skip set tripped", c)
return
}
if !c.condition(c) {
return
}
c.setState(stateTripped, clock.Now().UTC().Add(c.fallbackDuration))
c.metrics.Reset()
}
func (c *CircuitBreaker) setRecovering() {
c.setState(stateRecovering, clock.Now().UTC().Add(c.recoveryDuration))
c.rc = newRatioController(c.recoveryDuration, c.log)
}
// cbState is the state of the circuit breaker.
type cbState int
func (s cbState) String() string {
switch s {
case stateStandby:
return "standby"
case stateTripped:
return "tripped"
case stateRecovering:
return "recovering"
}
return "undefined"
}
const (
// CircuitBreaker is passing all requests and watching stats.
stateStandby = iota
// CircuitBreaker activates fallback scenario for all requests.
stateTripped
// CircuitBreaker passes some requests to go through, rejecting others.
stateRecovering
)
const (
defaultFallbackDuration = 10 * clock.Second
defaultRecoveryDuration = 10 * clock.Second
defaultCheckPeriod = 100 * clock.Millisecond
)
var defaultFallback = &fallback{}
type fallback struct{}
func (f *fallback) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte(http.StatusText(http.StatusServiceUnavailable)))
}
|