1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
|
// Copyright 2019 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package api
import (
"errors"
"fmt"
"log/slog"
"net/http"
"runtime"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
"github.com/prometheus/common/route"
apiv2 "github.com/prometheus/alertmanager/api/v2"
"github.com/prometheus/alertmanager/cluster"
"github.com/prometheus/alertmanager/config"
"github.com/prometheus/alertmanager/dispatch"
"github.com/prometheus/alertmanager/provider"
"github.com/prometheus/alertmanager/silence"
"github.com/prometheus/alertmanager/types"
)
// API represents all APIs of Alertmanager.
type API struct {
v2 *apiv2.API
deprecationRouter *V1DeprecationRouter
requestsInFlight prometheus.Gauge
concurrencyLimitExceeded prometheus.Counter
timeout time.Duration
inFlightSem chan struct{}
}
// Options for the creation of an API object. Alerts, Silences, AlertStatusFunc
// and GroupMutedFunc are mandatory. The zero value for everything else is a safe
// default.
type Options struct {
// Alerts to be used by the API. Mandatory.
Alerts provider.Alerts
// Silences to be used by the API. Mandatory.
Silences *silence.Silences
// AlertStatusFunc is used be the API to retrieve the AlertStatus of an
// alert. Mandatory.
AlertStatusFunc func(model.Fingerprint) types.AlertStatus
// GroupMutedFunc is used be the API to know if an alert is muted.
// Mandatory.
GroupMutedFunc func(routeID, groupKey string) ([]string, bool)
// Peer from the gossip cluster. If nil, no clustering will be used.
Peer cluster.ClusterPeer
// Timeout for all HTTP connections. The zero value (and negative
// values) result in no timeout.
Timeout time.Duration
// Concurrency limit for GET requests. The zero value (and negative
// values) result in a limit of GOMAXPROCS or 8, whichever is
// larger. Status code 503 is served for GET requests that would exceed
// the concurrency limit.
Concurrency int
// Logger is used for logging, if nil, no logging will happen.
Logger *slog.Logger
// Registry is used to register Prometheus metrics. If nil, no metrics
// registration will happen.
Registry prometheus.Registerer
// GroupFunc returns a list of alert groups. The alerts are grouped
// according to the current active configuration. Alerts returned are
// filtered by the arguments provided to the function.
GroupFunc func(func(*dispatch.Route) bool, func(*types.Alert, time.Time) bool) (dispatch.AlertGroups, map[model.Fingerprint][]string)
}
func (o Options) validate() error {
if o.Alerts == nil {
return errors.New("mandatory field Alerts not set")
}
if o.Silences == nil {
return errors.New("mandatory field Silences not set")
}
if o.AlertStatusFunc == nil {
return errors.New("mandatory field AlertStatusFunc not set")
}
if o.GroupMutedFunc == nil {
return errors.New("mandatory field GroupMutedFunc not set")
}
if o.GroupFunc == nil {
return errors.New("mandatory field GroupFunc not set")
}
return nil
}
// New creates a new API object combining all API versions. Note that an Update
// call is also needed to get the APIs into an operational state.
func New(opts Options) (*API, error) {
if err := opts.validate(); err != nil {
return nil, fmt.Errorf("invalid API options: %w", err)
}
l := opts.Logger
if l == nil {
l = promslog.NewNopLogger()
}
concurrency := opts.Concurrency
if concurrency < 1 {
concurrency = runtime.GOMAXPROCS(0)
if concurrency < 8 {
concurrency = 8
}
}
v2, err := apiv2.NewAPI(
opts.Alerts,
opts.GroupFunc,
opts.AlertStatusFunc,
opts.GroupMutedFunc,
opts.Silences,
opts.Peer,
l.With("version", "v2"),
opts.Registry,
)
if err != nil {
return nil, err
}
// TODO(beorn7): For now, this hardcodes the method="get" label. Other
// methods should get the same instrumentation.
requestsInFlight := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "alertmanager_http_requests_in_flight",
Help: "Current number of HTTP requests being processed.",
ConstLabels: prometheus.Labels{"method": "get"},
})
concurrencyLimitExceeded := prometheus.NewCounter(prometheus.CounterOpts{
Name: "alertmanager_http_concurrency_limit_exceeded_total",
Help: "Total number of times an HTTP request failed because the concurrency limit was reached.",
ConstLabels: prometheus.Labels{"method": "get"},
})
if opts.Registry != nil {
if err := opts.Registry.Register(requestsInFlight); err != nil {
return nil, err
}
if err := opts.Registry.Register(concurrencyLimitExceeded); err != nil {
return nil, err
}
}
return &API{
deprecationRouter: NewV1DeprecationRouter(l.With("version", "v1")),
v2: v2,
requestsInFlight: requestsInFlight,
concurrencyLimitExceeded: concurrencyLimitExceeded,
timeout: opts.Timeout,
inFlightSem: make(chan struct{}, concurrency),
}, nil
}
// Register API. As APIv2 works on the http.Handler level, this method also creates a new
// http.ServeMux and then uses it to register both the provided router (to
// handle "/") and APIv2 (to handle "<routePrefix>/api/v2"). The method returns
// the newly created http.ServeMux. If a timeout has been set on construction of
// API, it is enforced for all HTTP request going through this mux. The same is
// true for the concurrency limit, with the exception that it is only applied to
// GET requests.
func (api *API) Register(r *route.Router, routePrefix string) *http.ServeMux {
// TODO(gotjosh) API V1 was removed as of version 0.27, when we reach 1.0.0 we should removed these deprecation warnings.
api.deprecationRouter.Register(r.WithPrefix("/api/v1"))
mux := http.NewServeMux()
mux.Handle("/", api.limitHandler(r))
apiPrefix := ""
if routePrefix != "/" {
apiPrefix = routePrefix
}
// TODO(beorn7): HTTP instrumentation is only in place for Router. Since
// /api/v2 works on the Handler level, it is currently not instrumented
// at all (with the exception of requestsInFlight, which is handled in
// limitHandler below).
mux.Handle(
apiPrefix+"/api/v2/",
api.limitHandler(http.StripPrefix(apiPrefix, api.v2.Handler)),
)
return mux
}
// Update config and resolve timeout of each API. APIv2 also needs
// setAlertStatus to be updated.
func (api *API) Update(cfg *config.Config, setAlertStatus func(model.LabelSet)) {
api.v2.Update(cfg, setAlertStatus)
}
func (api *API) limitHandler(h http.Handler) http.Handler {
concLimiter := http.HandlerFunc(func(rsp http.ResponseWriter, req *http.Request) {
if req.Method == http.MethodGet { // Only limit concurrency of GETs.
select {
case api.inFlightSem <- struct{}{}: // All good, carry on.
api.requestsInFlight.Inc()
defer func() {
<-api.inFlightSem
api.requestsInFlight.Dec()
}()
default:
api.concurrencyLimitExceeded.Inc()
http.Error(rsp, fmt.Sprintf(
"Limit of concurrent GET requests reached (%d), try again later.\n", cap(api.inFlightSem),
), http.StatusServiceUnavailable)
return
}
}
h.ServeHTTP(rsp, req)
})
if api.timeout <= 0 {
return concLimiter
}
return http.TimeoutHandler(concLimiter, api.timeout, fmt.Sprintf(
"Exceeded configured timeout of %v.\n", api.timeout,
))
}
|