1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
|
package bootstrap
import (
"fmt"
"net"
"os"
"os/signal"
"syscall"
"github.com/cloudflare/tableflip"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper"
"gitlab.com/gitlab-org/gitaly/v16/internal/helper/env"
"golang.org/x/sys/unix"
)
const (
// EnvPidFile is the name of the environment variable containing the pid file path
EnvPidFile = "GITALY_PID_FILE"
// EnvUpgradesEnabled is an environment variable that when defined gitaly must enable graceful upgrades on SIGHUP
EnvUpgradesEnabled = "GITALY_UPGRADES_ENABLED"
socketReusePortWarning = "Unable to set SO_REUSEPORT: zero downtime upgrades will not work"
)
// Listener is an interface of the bootstrap manager.
type Listener interface {
// RegisterStarter adds starter to the pool.
RegisterStarter(starter Starter)
// Start starts all registered starters to accept connections.
Start() error
// Wait terminates all registered starters.
Wait(gracePeriodTicker helper.Ticker, stopAction func()) error
}
// Bootstrap handles graceful upgrades
type Bootstrap struct {
upgrader upgrader
listenFunc ListenFunc
errChan chan error
starters []Starter
connTotal *prometheus.CounterVec
}
type upgrader interface {
Exit() <-chan struct{}
HasParent() bool
Ready() error
Upgrade() error
Stop()
}
// New performs tableflip initialization
//
// pidFile is optional, if provided it will always contain the current process PID
// upgradesEnabled controls the upgrade process on SIGHUP signal
//
// first boot:
// * gitaly starts as usual, we will refer to it as p1
// * New will build a tableflip.Upgrader, we will refer to it as upg
// * sockets and files must be opened with upg.Fds
// * p1 will trap SIGHUP and invoke upg.Upgrade()
// * when ready to accept incoming connections p1 will call upg.Ready()
// * upg.Exit() channel will be closed when an upgrades completed successfully and the process must terminate
//
// graceful upgrade:
// - user replaces gitaly binary and/or config file
// - user sends SIGHUP to p1
// - p1 will fork and exec the new gitaly, we will refer to it as p2
// - from now on p1 will ignore other SIGHUP
// - if p2 terminates with a non-zero exit code, SIGHUP handling will be restored
// - p2 will follow the "first boot" sequence but upg.Fds will provide sockets and files from p1, when available
// - when p2 invokes upg.Ready() all the shared file descriptors not claimed by p2 will be closed
// - upg.Exit() channel in p1 will be closed now and p1 can gracefully terminate already accepted connections
// - upgrades cannot starts again if p1 and p2 are both running, an hard termination should be scheduled to overcome
// freezes during a graceful shutdown
//
// gitaly-wrapper is supposed to set EnvUpgradesEnabled in order to enable graceful upgrades
func New(totalConn *prometheus.CounterVec) (*Bootstrap, error) {
pidFile := os.Getenv(EnvPidFile)
upgradesEnabled, _ := env.GetBool(EnvUpgradesEnabled, false)
// PIDFile is optional, if provided tableflip will keep it updated
upg, err := tableflip.New(tableflip.Options{
PIDFile: pidFile,
ListenConfig: &net.ListenConfig{
Control: func(network, address string, c syscall.RawConn) error {
var opErr error
err := c.Control(func(fd uintptr) {
opErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1)
})
if err != nil {
log.WithError(err).Warn(socketReusePortWarning)
}
if opErr != nil {
log.WithError(opErr).Warn(socketReusePortWarning)
}
return nil
},
},
})
if err != nil {
return nil, err
}
return _new(upg, upg.Fds.Listen, upgradesEnabled, totalConn)
}
func _new(upg upgrader, listenFunc ListenFunc, upgradesEnabled bool, totalConn *prometheus.CounterVec) (*Bootstrap, error) {
if upgradesEnabled {
go func() {
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGHUP)
for range sig {
err := upg.Upgrade()
if err != nil {
log.WithError(err).Error("Upgrade failed")
continue
}
log.Info("Upgrade succeeded")
}
}()
}
return &Bootstrap{
upgrader: upg,
listenFunc: listenFunc,
connTotal: totalConn,
}, nil
}
// ListenFunc is a net.Listener factory
type ListenFunc func(net, addr string) (net.Listener, error)
// Starter is function to initialize a net.Listener
// it receives a ListenFunc to be used for net.Listener creation and a chan<- error to signal runtime errors
// It must serve incoming connections asynchronously and signal errors on the channel
// the return value is for setup errors
type Starter func(ListenFunc, chan<- error, *prometheus.CounterVec) error
func (b *Bootstrap) isFirstBoot() bool { return !b.upgrader.HasParent() }
// RegisterStarter adds a new starter
func (b *Bootstrap) RegisterStarter(starter Starter) {
b.starters = append(b.starters, starter)
}
// Start will invoke all the registered starters and wait asynchronously for runtime errors
// in case a Starter fails then the error is returned and the function is aborted
func (b *Bootstrap) Start() error {
b.errChan = make(chan error, len(b.starters))
for _, start := range b.starters {
if err := start(b.listen, b.errChan, b.connTotal); err != nil {
return err
}
}
return nil
}
// Wait will signal process readiness to the parent and than wait for an exit condition
// SIGTERM, SIGINT and a runtime error will trigger an immediate shutdown
// in case of an upgrade there will be a grace period to complete the ongoing requests
// stopAction will be invoked during a graceful stop. It must wait until the shutdown is completed.
func (b *Bootstrap) Wait(gracePeriodTicker helper.Ticker, stopAction func()) error {
signals := []os.Signal{syscall.SIGTERM, syscall.SIGINT}
immediateShutdown := make(chan os.Signal, len(signals))
signal.Notify(immediateShutdown, signals...)
if err := b.upgrader.Ready(); err != nil {
return err
}
var err error
select {
case <-b.upgrader.Exit():
// this is the old process and a graceful upgrade is in progress
// the new process signaled its readiness and we started a graceful stop
// however no further upgrades can be started until this process is running
// we set a grace period and then we force a termination.
waitError := b.waitGracePeriod(gracePeriodTicker, immediateShutdown, stopAction)
err = fmt.Errorf("graceful upgrade: %w", waitError)
case s := <-immediateShutdown:
err = fmt.Errorf("received signal %q", s)
b.upgrader.Stop()
case err = <-b.errChan:
}
return err
}
func (b *Bootstrap) waitGracePeriod(gracePeriodTicker helper.Ticker, kill <-chan os.Signal, stopAction func()) error {
log.Warn("starting grace period")
allServersDone := make(chan struct{})
go func() {
if stopAction != nil {
stopAction()
}
close(allServersDone)
}()
gracePeriodTicker.Reset()
select {
case <-gracePeriodTicker.C():
return fmt.Errorf("grace period expired")
case <-kill:
return fmt.Errorf("force shutdown")
case <-allServersDone:
return fmt.Errorf("completed")
}
}
func (b *Bootstrap) listen(network, path string) (net.Listener, error) {
if network == "unix" && b.isFirstBoot() {
if err := os.RemoveAll(path); err != nil {
return nil, err
}
}
return b.listenFunc(network, path)
}
// Noop is a bootstrapper that does no additional configurations.
type Noop struct {
starters []Starter
shutdown chan struct{}
errChan chan error
connTotal *prometheus.CounterVec
}
// NewNoop returns initialized instance of the *Noop.
func NewNoop(connTotal *prometheus.CounterVec) *Noop {
return &Noop{shutdown: make(chan struct{}), connTotal: connTotal}
}
// RegisterStarter adds starter to the pool.
func (n *Noop) RegisterStarter(starter Starter) {
n.starters = append(n.starters, starter)
}
// Start starts all registered starters to accept connections.
func (n *Noop) Start() error {
n.errChan = make(chan error, len(n.starters))
for _, start := range n.starters {
if err := start(
net.Listen,
n.errChan,
n.connTotal,
); err != nil {
return err
}
}
return nil
}
// Wait terminates all registered starters.
func (n *Noop) Wait(_ helper.Ticker, stopAction func()) error {
select {
case <-n.shutdown:
if stopAction != nil {
stopAction()
}
case err := <-n.errChan:
return err
}
return nil
}
// Terminate unblocks Wait method and executes stopAction call-back passed into it.
func (n *Noop) Terminate() {
close(n.shutdown)
}
|