File: whisper-resize.py

package info (click to toggle)
python-whisper 1.0.2-1~bpo9%2B2
  • links: PTS, VCS
  • area: main
  • in suites: stretch-backports
  • size: 224 kB
  • sloc: python: 1,932; makefile: 6
file content (184 lines) | stat: -rwxr-xr-x 6,010 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#!/usr/bin/env python

import os
import sys
import math
import time
import bisect
import signal
import optparse
import traceback

try:
  import whisper
except ImportError:
  raise SystemExit('[ERROR] Please make sure whisper is installed properly')

# Ignore SIGPIPE
signal.signal(signal.SIGPIPE, signal.SIG_DFL)

now = int(time.time())

option_parser = optparse.OptionParser(
    usage='''%prog path timePerPoint:timeToStore [timePerPoint:timeToStore]*

timePerPoint and timeToStore specify lengths of time, for example:

60:1440      60 seconds per datapoint, 1440 datapoints = 1 day of retention
15m:8        15 minutes per datapoint, 8 datapoints = 2 hours of retention
1h:7d        1 hour per datapoint, 7 days of retention
12h:2y       12 hours per datapoint, 2 years of retention
''')

option_parser.add_option(
    '--xFilesFactor', default=None,
    type='float', help="Change the xFilesFactor")
option_parser.add_option(
    '--aggregationMethod', default=None,
    type='string', help="Change the aggregation function (%s)" %
    ', '.join(whisper.aggregationMethods))
option_parser.add_option(
    '--force', default=False, action='store_true',
    help="Perform a destructive change")
option_parser.add_option(
    '--newfile', default=None, action='store',
    help="Create a new database file without removing the existing one")
option_parser.add_option(
    '--nobackup', action='store_true',
    help='Delete the .bak file after successful execution')
option_parser.add_option(
    '--aggregate', action='store_true',
    help='Try to aggregate the values to fit the new archive better.'
         ' Note that this will make things slower and use more memory.')

(options, args) = option_parser.parse_args()

if len(args) < 2:
  option_parser.print_help()
  sys.exit(1)

path = args[0]

if not os.path.exists(path):
  sys.stderr.write("[ERROR] File '%s' does not exist!\n\n" % path)
  option_parser.print_help()
  sys.exit(1)

info = whisper.info(path)

new_archives = [whisper.parseRetentionDef(retentionDef)
                for retentionDef in args[1:]]

old_archives = info['archives']
# sort by precision, lowest to highest
old_archives.sort(key=lambda a: a['secondsPerPoint'], reverse=True)

if options.xFilesFactor is None:
  xff = info['xFilesFactor']
else:
  xff = options.xFilesFactor

if options.aggregationMethod is None:
  aggregationMethod = info['aggregationMethod']
else:
  aggregationMethod = options.aggregationMethod

print('Retrieving all data from the archives')
for archive in old_archives:
  fromTime = now - archive['retention'] + archive['secondsPerPoint']
  untilTime = now
  timeinfo,values = whisper.fetch(path, fromTime, untilTime)
  archive['data'] = (timeinfo,values)

if options.newfile is None:
  tmpfile = path + '.tmp'
  if os.path.exists(tmpfile):
    print('Removing previous temporary database file: %s' % tmpfile)
    os.unlink(tmpfile)
  newfile = tmpfile
else:
  newfile = options.newfile

print('Creating new whisper database: %s' % newfile)
whisper.create(newfile, new_archives, xFilesFactor=xff, aggregationMethod=aggregationMethod)
size = os.stat(newfile).st_size
print('Created: %s (%d bytes)' % (newfile,size))

if options.aggregate:
  # This is where data will be interpolated (best effort)
  print('Migrating data with aggregation...')
  all_datapoints = []
  for archive in old_archives:
    # Loading all datapoints into memory for fast querying
    timeinfo, values = archive['data']
    new_datapoints = zip( range(*timeinfo), values )
    if all_datapoints:
      last_timestamp = all_datapoints[-1][0]
      slice_end = 0
      for i,(timestamp,value) in enumerate(new_datapoints):
        if timestamp > last_timestamp:
          slice_end = i
          break
      all_datapoints += new_datapoints[i:]
    else:
      all_datapoints += new_datapoints

  oldtimestamps = map( lambda p: p[0], all_datapoints)
  oldvalues = map( lambda p: p[1], all_datapoints)

  print("oldtimestamps: %s" % oldtimestamps)
  # Simply cleaning up some used memory
  del all_datapoints

  new_info = whisper.info(newfile)
  new_archives = new_info['archives']

  for archive in new_archives:
    step = archive['secondsPerPoint']
    fromTime = now - archive['retention'] + now % step
    untilTime = now + now % step + step
    print("(%s,%s,%s)" % (fromTime,untilTime, step))
    timepoints_to_update = range(fromTime, untilTime, step)
    print("timepoints_to_update: %s" % timepoints_to_update)
    newdatapoints = []
    for tinterval in zip( timepoints_to_update[:-1], timepoints_to_update[1:] ):
      # TODO: Setting lo= parameter for 'lefti' based on righti from previous
      #       iteration. Obviously, this can only be done if
      #       timepoints_to_update is always updated. Is it?
      lefti = bisect.bisect_left(oldtimestamps, tinterval[0])
      righti = bisect.bisect_left(oldtimestamps, tinterval[1], lo=lefti)
      newvalues = oldvalues[lefti:righti]
      if newvalues:
        non_none = filter( lambda x: x is not None, newvalues)
        if 1.0*len(non_none)/len(newvalues) >= xff:
          newdatapoints.append([tinterval[0],
                                whisper.aggregate(aggregationMethod,
                                                  non_none, newvalues)])
    whisper.update_many(newfile, newdatapoints)
else:
  print('Migrating data without aggregation...')
  for archive in old_archives:
    timeinfo, values = archive['data']
    datapoints = zip( range(*timeinfo), values )
    datapoints = filter(lambda p: p[1] is not None, datapoints)
    whisper.update_many(newfile, datapoints)

if options.newfile is not None:
  sys.exit(0)

backup = path + '.bak'
print('Renaming old database to: %s' % backup)
os.rename(path, backup)

try:
  print('Renaming new database to: %s' % path)
  os.rename(tmpfile, path)
except:
  traceback.print_exc()
  print('\nOperation failed, restoring backup')
  os.rename(backup, path)
  sys.exit(1)

if options.nobackup:
  print("Unlinking backup: %s" % backup)
  os.unlink(backup)