1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282
|
package main
import (
"fmt"
"net/url"
"regexp"
"strings"
"time"
"github.com/cenkalti/backoff"
_ "github.com/denisenkom/go-mssqldb" // register the MS-SQL driver
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/go-sql-driver/mysql" // register the MySQL driver
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq" // register the PostgreSQL driver
"github.com/prometheus/client_golang/prometheus"
)
var (
// MetricNameRE matches any invalid metric name
// characters, see github.com/prometheus/common/model.MetricNameRE
MetricNameRE = regexp.MustCompile("[^a-zA-Z0-9_:]+")
)
// Init will initialize the metric descriptors
func (j *Job) Init(logger log.Logger, queries map[string]string) error {
j.log = log.With(logger, "job", j.Name)
// register each query as an metric
for _, q := range j.Queries {
if q == nil {
level.Warn(j.log).Log("msg", "Skipping invalid query")
continue
}
q.log = log.With(j.log, "query", q.Name)
q.jobName = j.Name
if q.Query == "" && q.QueryRef != "" {
if qry, found := queries[q.QueryRef]; found {
q.Query = qry
}
}
if q.Query == "" {
level.Warn(q.log).Log("msg", "Skipping empty query")
continue
}
if q.metrics == nil {
// we have no way of knowing how many metrics will be returned by the
// queries, so we just assume that each query returns at least one metric.
// after the each round of collection this will be resized as necessary.
q.metrics = make(map[*connection][]prometheus.Metric, len(j.Queries))
}
// try to satisfy prometheus naming restrictions
name := MetricNameRE.ReplaceAllString("sql_"+q.Name, "")
help := q.Help
// prepare a new metrics descriptor
//
// the tricky part here is that the *order* of labels has to match the
// order of label values supplied to NewConstMetric later
q.desc = prometheus.NewDesc(
name,
help,
append(q.Labels, "driver", "host", "database", "user", "col"),
prometheus.Labels{
"sql_job": j.Name,
},
)
}
j.updateConnections()
return nil
}
func (j *Job) updateConnections() {
// if there are no connection URLs for this job it can't be run
if j.Connections == nil {
level.Error(j.log).Log("msg", "No connections for job", "job", j.Name)
return
}
// make space for the connection objects
if j.conns == nil {
j.conns = make([]*connection, 0, len(j.Connections))
}
// parse the connection URLs and create an connection object for each
if len(j.conns) < len(j.Connections) {
for _, conn := range j.Connections {
// MySQL DSNs do not parse cleanly as URLs as of Go 1.12.8+
if strings.HasPrefix(conn, "mysql://") {
config, err := mysql.ParseDSN(strings.TrimPrefix(conn, "mysql://"))
if err != nil {
level.Error(j.log).Log("msg", "Failed to parse MySQL DSN", "url", conn, "err", err)
}
j.conns = append(j.conns, &connection{
conn: nil,
url: conn,
driver: "mysql",
host: config.Addr,
database: config.DBName,
user: config.User,
})
continue
}
u, err := url.Parse(conn)
if err != nil {
level.Error(j.log).Log("msg", "Failed to parse URL", "url", conn, "err", err)
continue
}
user := ""
if u.User != nil {
user = u.User.Username()
}
// we expose some of the connection variables as labels, so we need to
// remember them
newConn := &connection{
conn: nil,
url: conn,
driver: u.Scheme,
host: u.Host,
database: strings.TrimPrefix(u.Path, "/"),
user: user,
}
/*
if newConn.driver == "athena" {
// call go-athena's Open() to ensure conn.db is set,
// otherwise API calls will complain about an empty database field:
// "InvalidParameter: 1 validation error(s) found. - minimum field size of 1, StartQueryExecutionInput.QueryExecutionContext.Database."
newConn.conn, err = sqlx.Open("athena", u.String())
if err != nil {
level.Error(j.log).Log("msg", "Failed to open Athena connection", "connection", conn, "err", err)
continue
}
}
*/
/*
if newConn.driver == "snowflake" {
cfg := &gosnowflake.Config{
Account: u.Host,
User: u.User.Username(),
}
pw, set := u.User.Password()
if set {
cfg.Password = pw
}
if u.Port() != "" {
portStr, err := strconv.Atoi(u.Port())
if err != nil {
level.Error(j.log).Log("msg", "Failed to parse Snowflake port", "connection", conn, "err", err)
continue
}
cfg.Port = portStr
}
dsn, err := gosnowflake.DSN(cfg)
if err != nil {
level.Error(j.log).Log("msg", "Failed to create Snowflake DSN", "connection", conn, "err", err)
continue
}
newConn.conn, err = sqlx.Open("snowflake", dsn)
if err != nil {
level.Error(j.log).Log("msg", "Failed to open Snowflake connection", "connection", conn, "err", err)
continue
}
}
*/
j.conns = append(j.conns, newConn)
}
}
}
func (j *Job) ExecutePeriodically() {
level.Debug(j.log).Log("msg", "Starting")
for {
j.Run()
level.Debug(j.log).Log("msg", "Sleeping until next run", "sleep", j.Interval.String())
time.Sleep(j.Interval)
}
}
func (j *Job) runOnceConnection(conn *connection, done chan int) {
updated := 0
defer func() {
done <- updated
}()
// connect to DB if not connected already
if err := conn.connect(j); err != nil {
level.Warn(j.log).Log("msg", "Failed to connect", "err", err)
j.markFailed(conn)
return
}
for _, q := range j.Queries {
if q == nil {
continue
}
if q.desc == nil {
// this may happen if the metric registration failed
level.Warn(q.log).Log("msg", "Skipping query. Collector is nil")
continue
}
level.Debug(q.log).Log("msg", "Running Query")
// execute the query on the connection
if err := q.Run(conn); err != nil {
level.Warn(q.log).Log("msg", "Failed to run query", "err", err)
continue
}
level.Debug(q.log).Log("msg", "Query finished")
updated++
}
}
func (j *Job) markFailed(conn *connection) {
for _, q := range j.Queries {
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(1.0)
}
}
// Run the job queries with exponential backoff, implements the cron.Job interface
func (j *Job) Run() {
bo := backoff.NewExponentialBackOff()
bo.MaxElapsedTime = j.Interval
if bo.MaxElapsedTime == 0 {
bo.MaxElapsedTime = time.Minute
}
if err := backoff.Retry(j.runOnce, bo); err != nil {
level.Error(j.log).Log("msg", "Failed to run", "err", err)
}
}
func (j *Job) runOnce() error {
doneChan := make(chan int, len(j.conns))
// execute queries for each connection in parallel
for _, conn := range j.conns {
go j.runOnceConnection(conn, doneChan)
}
// connections now run in parallel, wait for and collect results
updated := 0
for range j.conns {
updated += <-doneChan
}
if updated < 1 {
return fmt.Errorf("zero queries ran")
}
return nil
}
func (c *connection) connect(job *Job) error {
// already connected
if c.conn != nil {
return nil
}
dsn := c.url
switch c.driver {
case "mysql":
dsn = strings.TrimPrefix(dsn, "mysql://")
}
conn, err := sqlx.Connect(c.driver, dsn)
if err != nil {
return err
}
// be nice and don't use up too many connections for mere metrics
conn.SetMaxOpenConns(1)
conn.SetMaxIdleConns(1)
// Disable SetConnMaxLifetime if MSSQL as it is causing issues with the MSSQL driver we are using. See #60
if c.driver != "sqlserver" {
conn.SetConnMaxLifetime(job.Interval * 2)
}
// execute StartupSQL
for _, query := range job.StartupSQL {
level.Debug(job.log).Log("msg", "StartupSQL", "Query:", query)
conn.MustExec(query)
}
c.conn = conn
return nil
}
|