File: whisper-fill.py

package info (click to toggle)
python-whisper 1.0.2-1~bpo9%2B2
  • links: PTS, VCS
  • area: main
  • in suites: stretch-backports
  • size: 224 kB
  • sloc: python: 1,932; makefile: 6
file content (145 lines) | stat: -rwxr-xr-x 4,567 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#!/usr/bin/env python

# whisper-fill: unlike whisper-merge, don't overwrite data that's
# already present in the target file, but instead, only add the missing
# data (e.g. where the gaps in the target file are).  Because no values
# are overwritten, no data or precision gets lost.  Also, unlike
# whisper-merge, try to take the highest-precision archive to provide
# the data, instead of the one with the largest retention.
# Using this script, reconciliation between two replica instances can be
# performed by whisper-fill-ing the data of the other replica with the
# data that exists locally, without introducing the quite remarkable
# gaps that whisper-merge leaves behind (filling a higher precision
# archive with data from a lower precision one)

# Work performed by Fabian Groffen @grobian while working at Booking.com.
# additional patches are from https://github.com/jssjr/carbonate/

import whisper

try:
    from whisper import operator
    HAS_OPERATOR = True
except ImportError:
    HAS_OPERATOR = False

import itertools
import time
import sys
import optparse

if sys.version_info >= (3, 0):
    xrange = range

def itemgetter(*items):
    if HAS_OPERATOR:
        return operator.itemgetter(*items)
    else:
        if len(items) == 1:
            item = items[0]

            def g(obj):
                return obj[item]
        else:

            def g(obj):
                return tuple(obj[item] for item in items)
        return g


def fill(src, dst, tstart, tstop):
    # fetch range start-stop from src, taking values from the highest
    # precision archive, thus optionally requiring multiple fetch + merges
    srcHeader = whisper.info(src)

    srcArchives = srcHeader['archives']
    srcArchives.sort(key=itemgetter('retention'))

    # find oldest point in time, stored by both files
    srcTime = int(time.time()) - srcHeader['maxRetention']

    if tstart < srcTime and tstop < srcTime:
        return

    # we want to retain as much precision as we can, hence we do backwards
    # walk in time

    # skip forward at max 'step' points at a time
    for archive in srcArchives:
        # skip over archives that don't have any data points
        rtime = time.time() - archive['retention']
        if tstop <= rtime:
            continue

        untilTime = tstop
        fromTime = rtime if rtime > tstart else tstart

        (timeInfo, values) = whisper.fetch(src, fromTime, untilTime)
        (start, end, archive_step) = timeInfo
        pointsToWrite = list(itertools.ifilter(
            lambda points: points[1] is not None,
            itertools.izip(xrange(start, end, archive_step), values)))
        # order points by timestamp, newest first
        pointsToWrite.sort(key=lambda p: p[0], reverse=True)
        whisper.update_many(dst, pointsToWrite)

        tstop = fromTime

        # can stop when there's nothing to fetch any more
        if tstart == tstop:
            return


def fill_archives(src, dst, startFrom):
    header = whisper.info(dst)
    archives = header['archives']
    archives = sorted(archives, key=lambda t: t['retention'])

    for archive in archives:
        fromTime = time.time() - archive['retention']
        if fromTime >= startFrom:
            continue

        (timeInfo, values) = whisper.fetch(dst, fromTime, startFrom)
        (start, end, step) = timeInfo
        gapstart = None
        for v in values:
            if not v and not gapstart:
                gapstart = start
            elif v and gapstart:
                # ignore single units lost
                if (start - gapstart) > archive['secondsPerPoint']:
                    fill(src, dst, gapstart - step, start)
                gapstart = None
            elif gapstart and start == end - step:
                fill(src, dst, gapstart - step, start)

            start += step

        startFrom = fromTime


def main():
        option_parser = optparse.OptionParser(
            usage='%prog [--lock] src dst',
            description='copies data from src in dst, if missing')
        option_parser.add_option('--lock', help='Lock whisper files',
                default=False, action='store_true')
        (options, args) = option_parser.parse_args()

        if len(args) != 2:
                option_parser.print_help()
                sys.exit(1)

        if options.lock is True and whisper.CAN_LOCK:
            whisper.LOCK = True

        src = args[0]
        dst = args[1]
        startFrom = time.time()

        fill_archives(src, dst, startFrom)


if __name__ == "__main__":
        main()