File: rrd2whisper.py

package info (click to toggle)
python-whisper 1.0.2-1~bpo9%2B2
  • links: PTS, VCS
  • area: main
  • in suites: stretch-backports
  • size: 224 kB
  • sloc: python: 1,932; makefile: 6
file content (156 lines) | stat: -rwxr-xr-x 4,738 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#!/usr/bin/env python

import os
import sys
import time
import signal
import optparse

try:
  import rrdtool
except ImportError as exc:
  raise SystemExit('[ERROR] Missing dependency: %s' % str(exc))

try:
  import whisper
except ImportError:
  raise SystemExit('[ERROR] Please make sure whisper is installed properly')

# Ignore SIGPIPE
signal.signal(signal.SIGPIPE, signal.SIG_DFL)

aggregationMethods = whisper.aggregationMethods

# RRD doesn't have a 'sum' or 'total' type
aggregationMethods.remove('sum')

option_parser = optparse.OptionParser(usage='''%prog rrd_path''')
option_parser.add_option(
    '--xFilesFactor',
    help="The xFilesFactor to use in the output file. " +
    "Defaults to the input RRD's xFilesFactor",
    default=None,
    type='float')
option_parser.add_option(
    '--aggregationMethod',
    help="The consolidation function to fetch from on input and " +
    "aggregationMethod to set on output. One of: %s" %
    ', '.join(aggregationMethods),
    default='average',
    type='string')
option_parser.add_option('--destinationPath', 
    help="Path to place created whisper file. Defaults to the " + 
    "RRD file's source path.",
    default=None,
    type='string')

(options, args) = option_parser.parse_args()

if len(args) < 1:
  option_parser.print_help()
  sys.exit(1)

rrd_path = args[0]

try:
  rrd_info = rrdtool.info(rrd_path)
except rrdtool.error as exc:
  raise SystemExit('[ERROR] %s' % str(exc))

seconds_per_pdp = rrd_info['step']

# Reconcile old vs new python-rrdtool APIs (yuck)
# leave consistent 'rras' and 'datasources' lists
if 'rra' in rrd_info:
  rras = rrd_info['rra']
else:
  rra_indices = []
  for key in rrd_info:
    if key.startswith('rra['):
      index = int(key.split('[')[1].split(']')[0])
      rra_indices.append(index)

  rra_count = max(rra_indices) + 1
  rras = []
  for i in range(rra_count):
    rra_info = {}
    rra_info['pdp_per_row'] = rrd_info['rra[%d].pdp_per_row' % i]
    rra_info['rows'] = rrd_info['rra[%d].rows' % i]
    rra_info['cf'] = rrd_info['rra[%d].cf' % i]
    rra_info['xff'] = rrd_info['rra[%d].xff' % i]
    rras.append(rra_info)

if 'ds' in rrd_info:
  datasources = rrd_info['ds'].keys()
else:
  ds_keys = [key for key in rrd_info if key.startswith('ds[')]
  datasources = list(set(key[3:].split(']')[0] for key in ds_keys))

# Grab the archive configuration
relevant_rras = []
for rra in rras:
  if rra['cf'] == options.aggregationMethod.upper():
    relevant_rras.append(rra)

if not relevant_rras:
  err = "[ERROR] Unable to find any RRAs with consolidation function: %s" % \
        options.aggregationMethod.upper()
  raise SystemExit(err)

archives = []
xFilesFactor = options.xFilesFactor
for rra in relevant_rras:
  precision = rra['pdp_per_row'] * seconds_per_pdp
  points = rra['rows']
  if not xFilesFactor:
    xFilesFactor = rra['xff']
  archives.append((precision, points))

for datasource in datasources:
  now = int(time.time())
  suffix = '_%s' % datasource if len(datasources) > 1 else ''

  if options.destinationPath:
    destination_path = options.destinationPath
    if not os.path.isdir(destination_path):
      try:
        os.makedirs(destination_path)
      except OSError as exc: # Python >2.5
        if exc.errno == errno.EEXIST and os.path.isdir(destination_path):
          pass
        else: raise
    rrd_file = os.path.basename(rrd_path).replace('.rrd', '%s.wsp' % suffix)
    path = destination_path + '/' + rrd_file
  else:
    path = rrd_path.replace('.rrd', '%s.wsp' % suffix)

  try:
    whisper.create(path, archives, xFilesFactor=xFilesFactor)
  except whisper.InvalidConfiguration as e:
    raise SystemExit('[ERROR] %s' % str(e))
  size = os.stat(path).st_size
  archiveConfig = ','.join(["%d:%d" % ar for ar in archives])
  print("Created: %s (%d bytes) with archives: %s" % (path, size, archiveConfig))

  print("Migrating data")
  archiveNumber = len(archives) - 1
  for precision, points in reversed(archives):
    retention = precision * points
    endTime = now - now % precision
    startTime = endTime - retention
    (time_info, columns, rows) = rrdtool.fetch(
      rrd_path,
      options.aggregationMethod.upper(),
      '-r', str(precision),
      '-s', str(startTime),
      '-e', str(endTime))
    column_index = list(columns).index(datasource)
    rows.pop()  # remove the last datapoint because RRD sometimes gives funky values

    values = [row[column_index] for row in rows]
    timestamps = list(range(*time_info))
    datapoints = zip(timestamps, values)
    datapoints = filter(lambda p: p[1] is not None, datapoints)
    print(' migrating %d datapoints from archive %d' % (len(datapoints), archiveNumber))
    archiveNumber -= 1
    whisper.update_many(path, datapoints)