File: ruletypes.py

package info (click to toggle)
elastalert 0.2.4-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 1,472 kB
  • sloc: python: 12,252; makefile: 108; sh: 2
file content (1226 lines) | stat: -rw-r--r-- 57,065 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
# -*- coding: utf-8 -*-
import copy
import datetime
import sys

from sortedcontainers import SortedKeyList

from .util import add_raw_postfix
from .util import dt_to_ts
from .util import EAException
from .util import elastalert_logger
from .util import elasticsearch_client
from .util import format_index
from .util import hashable
from .util import lookup_es_key
from .util import new_get_event_ts
from .util import pretty_ts
from .util import total_seconds
from .util import ts_now
from .util import ts_to_dt


class RuleType(object):
    """ The base class for a rule type.
    The class must implement add_data and add any matches to self.matches.

    :param rules: A rule configuration.
    """
    required_options = frozenset()

    def __init__(self, rules, args=None):
        self.matches = []
        self.rules = rules
        self.occurrences = {}
        self.rules['category'] = self.rules.get('category', '')
        self.rules['description'] = self.rules.get('description', '')
        self.rules['owner'] = self.rules.get('owner', '')
        self.rules['priority'] = self.rules.get('priority', '2')

    def add_data(self, data):
        """ The function that the ElastAlert client calls with results from ES.
        Data is a list of dictionaries, from Elasticsearch.

        :param data: A list of events, each of which is a dictionary of terms.
        """
        raise NotImplementedError()

    def add_match(self, event):
        """ This function is called on all matching events. Rules use it to add
        extra information about the context of a match. Event is a dictionary
        containing terms directly from Elasticsearch and alerts will report
        all of the information.

        :param event: The matching event, a dictionary of terms.
        """
        # Convert datetime's back to timestamps
        ts = self.rules.get('timestamp_field')
        if ts in event:
            event[ts] = dt_to_ts(event[ts])

        self.matches.append(copy.deepcopy(event))

    def get_match_str(self, match):
        """ Returns a string that gives more context about a match.

        :param match: The matching event, a dictionary of terms.
        :return: A user facing string describing the match.
        """
        return ''

    def garbage_collect(self, timestamp):
        """ Gets called periodically to remove old data that is useless beyond given timestamp.
        May also be used to compute things in the absence of new data.

        :param timestamp: A timestamp indicating the rule has been run up to that point.
        """
        pass

    def add_count_data(self, counts):
        """ Gets called when a rule has use_count_query set to True. Called to add data from querying to the rule.

        :param counts: A dictionary mapping timestamps to hit counts.
        """
        raise NotImplementedError()

    def add_terms_data(self, terms):
        """ Gets called when a rule has use_terms_query set to True.

        :param terms: A list of buckets with a key, corresponding to query_key, and the count """
        raise NotImplementedError()

    def add_aggregation_data(self, payload):
        """ Gets called when a rule has use_terms_query set to True.
        :param terms: A list of buckets with a key, corresponding to query_key, and the count """
        raise NotImplementedError()


class CompareRule(RuleType):
    """ A base class for matching a specific term by passing it to a compare function """
    required_options = frozenset(['compound_compare_key'])

    def expand_entries(self, list_type):
        """ Expand entries specified in files using the '!file' directive, if there are
        any, then add everything to a set.
        """
        entries_set = set()
        for entry in self.rules[list_type]:
            if entry.startswith("!file"):  # - "!file /path/to/list"
                filename = entry.split()[1]
                with open(filename, 'r') as f:
                    for line in f:
                        entries_set.add(line.rstrip())
            else:
                entries_set.add(entry)
        self.rules[list_type] = entries_set

    def compare(self, event):
        """ An event is a match if this returns true """
        raise NotImplementedError()

    def add_data(self, data):
        # If compare returns true, add it as a match
        for event in data:
            if self.compare(event):
                self.add_match(event)


class BlacklistRule(CompareRule):
    """ A CompareRule where the compare function checks a given key against a blacklist """
    required_options = frozenset(['compare_key', 'blacklist'])

    def __init__(self, rules, args=None):
        super(BlacklistRule, self).__init__(rules, args=None)
        self.expand_entries('blacklist')

    def compare(self, event):
        term = lookup_es_key(event, self.rules['compare_key'])
        if term in self.rules['blacklist']:
            return True
        return False


class WhitelistRule(CompareRule):
    """ A CompareRule where the compare function checks a given term against a whitelist """
    required_options = frozenset(['compare_key', 'whitelist', 'ignore_null'])

    def __init__(self, rules, args=None):
        super(WhitelistRule, self).__init__(rules, args=None)
        self.expand_entries('whitelist')

    def compare(self, event):
        term = lookup_es_key(event, self.rules['compare_key'])
        if term is None:
            return not self.rules['ignore_null']
        if term not in self.rules['whitelist']:
            return True
        return False


class ChangeRule(CompareRule):
    """ A rule that will store values for a certain term and match if those values change """
    required_options = frozenset(['query_key', 'compound_compare_key', 'ignore_null'])
    change_map = {}
    occurrence_time = {}

    def compare(self, event):
        key = hashable(lookup_es_key(event, self.rules['query_key']))
        values = []
        elastalert_logger.debug(" Previous Values of compare keys  " + str(self.occurrences))
        for val in self.rules['compound_compare_key']:
            lookup_value = lookup_es_key(event, val)
            values.append(lookup_value)
        elastalert_logger.debug(" Current Values of compare keys   " + str(values))

        changed = False
        for val in values:
            if not isinstance(val, bool) and not val and self.rules['ignore_null']:
                return False
        # If we have seen this key before, compare it to the new value
        if key in self.occurrences:
            for idx, previous_values in enumerate(self.occurrences[key]):
                elastalert_logger.debug(" " + str(previous_values) + " " + str(values[idx]))
                changed = previous_values != values[idx]
                if changed:
                    break
            if changed:
                self.change_map[key] = (self.occurrences[key], values)
                # If using timeframe, only return true if the time delta is < timeframe
                if key in self.occurrence_time:
                    changed = event[self.rules['timestamp_field']] - self.occurrence_time[key] <= self.rules['timeframe']

        # Update the current value and time
        elastalert_logger.debug(" Setting current value of compare keys values " + str(values))
        self.occurrences[key] = values
        if 'timeframe' in self.rules:
            self.occurrence_time[key] = event[self.rules['timestamp_field']]
        elastalert_logger.debug("Final result of comparision between previous and current values " + str(changed))
        return changed

    def add_match(self, match):
        # TODO this is not technically correct
        # if the term changes multiple times before an alert is sent
        # this data will be overwritten with the most recent change
        change = self.change_map.get(hashable(lookup_es_key(match, self.rules['query_key'])))
        extra = {}
        if change:
            extra = {'old_value': change[0],
                     'new_value': change[1]}
            elastalert_logger.debug("Description of the changed records  " + str(dict(list(match.items()) + list(extra.items()))))
        super(ChangeRule, self).add_match(dict(list(match.items()) + list(extra.items())))


class FrequencyRule(RuleType):
    """ A rule that matches if num_events number of events occur within a timeframe """
    required_options = frozenset(['num_events', 'timeframe'])

    def __init__(self, *args):
        super(FrequencyRule, self).__init__(*args)
        self.ts_field = self.rules.get('timestamp_field', '@timestamp')
        self.get_ts = new_get_event_ts(self.ts_field)
        self.attach_related = self.rules.get('attach_related', False)

    def add_count_data(self, data):
        """ Add count data to the rule. Data should be of the form {ts: count}. """
        if len(data) > 1:
            raise EAException('add_count_data can only accept one count at a time')

        (ts, count), = list(data.items())

        event = ({self.ts_field: ts}, count)
        self.occurrences.setdefault('all', EventWindow(self.rules['timeframe'], getTimestamp=self.get_ts)).append(event)
        self.check_for_match('all')

    def add_terms_data(self, terms):
        for timestamp, buckets in terms.items():
            for bucket in buckets:
                event = ({self.ts_field: timestamp,
                          self.rules['query_key']: bucket['key']}, bucket['doc_count'])
                self.occurrences.setdefault(bucket['key'], EventWindow(self.rules['timeframe'], getTimestamp=self.get_ts)).append(event)
                self.check_for_match(bucket['key'])

    def add_data(self, data):
        if 'query_key' in self.rules:
            qk = self.rules['query_key']
        else:
            qk = None

        for event in data:
            if qk:
                key = hashable(lookup_es_key(event, qk))
            else:
                # If no query_key, we use the key 'all' for all events
                key = 'all'

            # Store the timestamps of recent occurrences, per key
            self.occurrences.setdefault(key, EventWindow(self.rules['timeframe'], getTimestamp=self.get_ts)).append((event, 1))
            self.check_for_match(key, end=False)

        # We call this multiple times with the 'end' parameter because subclasses
        # may or may not want to check while only partial data has been added
        if key in self.occurrences:  # could have been emptied by previous check
            self.check_for_match(key, end=True)

    def check_for_match(self, key, end=False):
        # Match if, after removing old events, we hit num_events.
        # the 'end' parameter depends on whether this was called from the
        # middle or end of an add_data call and is used in subclasses
        if self.occurrences[key].count() >= self.rules['num_events']:
            event = self.occurrences[key].data[-1][0]
            if self.attach_related:
                event['related_events'] = [data[0] for data in self.occurrences[key].data[:-1]]
            self.add_match(event)
            self.occurrences.pop(key)

    def garbage_collect(self, timestamp):
        """ Remove all occurrence data that is beyond the timeframe away """
        stale_keys = []
        for key, window in self.occurrences.items():
            if timestamp - lookup_es_key(window.data[-1][0], self.ts_field) > self.rules['timeframe']:
                stale_keys.append(key)
        list(map(self.occurrences.pop, stale_keys))

    def get_match_str(self, match):
        lt = self.rules.get('use_local_time')
        match_ts = lookup_es_key(match, self.ts_field)
        starttime = pretty_ts(dt_to_ts(ts_to_dt(match_ts) - self.rules['timeframe']), lt)
        endtime = pretty_ts(match_ts, lt)
        message = 'At least %d events occurred between %s and %s\n\n' % (self.rules['num_events'],
                                                                         starttime,
                                                                         endtime)
        return message


class AnyRule(RuleType):
    """ A rule that will match on any input data """

    def add_data(self, data):
        for datum in data:
            self.add_match(datum)


class EventWindow(object):
    """ A container for hold event counts for rules which need a chronological ordered event window. """

    def __init__(self, timeframe, onRemoved=None, getTimestamp=new_get_event_ts('@timestamp')):
        self.timeframe = timeframe
        self.onRemoved = onRemoved
        self.get_ts = getTimestamp
        self.data = SortedKeyList(key=self.get_ts)
        self.running_count = 0

    def clear(self):
        self.data = SortedKeyList(key=self.get_ts)
        self.running_count = 0

    def append(self, event):
        """ Add an event to the window. Event should be of the form (dict, count).
        This will also pop the oldest events and call onRemoved on them until the
        window size is less than timeframe. """
        self.data.add(event)
        self.running_count += event[1]

        while self.duration() >= self.timeframe:
            oldest = self.data[0]
            self.data.remove(oldest)
            self.running_count -= oldest[1]
            self.onRemoved and self.onRemoved(oldest)

    def duration(self):
        """ Get the size in timedelta of the window. """
        if not self.data:
            return datetime.timedelta(0)
        return self.get_ts(self.data[-1]) - self.get_ts(self.data[0])

    def count(self):
        """ Count the number of events in the window. """
        return self.running_count

    def mean(self):
        """ Compute the mean of the value_field in the window. """
        if len(self.data) > 0:
            datasum = 0
            datalen = 0
            for dat in self.data:
                if "placeholder" not in dat[0]:
                    datasum += dat[1]
                    datalen += 1
            if datalen > 0:
                return datasum / float(datalen)
            return None
        else:
            return None

    def __iter__(self):
        return iter(self.data)


class SpikeRule(RuleType):
    """ A rule that uses two sliding windows to compare relative event frequency. """
    required_options = frozenset(['timeframe', 'spike_height', 'spike_type'])

    def __init__(self, *args):
        super(SpikeRule, self).__init__(*args)
        self.timeframe = self.rules['timeframe']

        self.ref_windows = {}
        self.cur_windows = {}

        self.ts_field = self.rules.get('timestamp_field', '@timestamp')
        self.get_ts = new_get_event_ts(self.ts_field)
        self.first_event = {}
        self.skip_checks = {}

        self.field_value = self.rules.get('field_value')

        self.ref_window_filled_once = False

    def add_count_data(self, data):
        """ Add count data to the rule. Data should be of the form {ts: count}. """
        if len(data) > 1:
            raise EAException('add_count_data can only accept one count at a time')
        for ts, count in data.items():
            self.handle_event({self.ts_field: ts}, count, 'all')

    def add_terms_data(self, terms):
        for timestamp, buckets in terms.items():
            for bucket in buckets:
                count = bucket['doc_count']
                event = {self.ts_field: timestamp,
                         self.rules['query_key']: bucket['key']}
                key = bucket['key']
                self.handle_event(event, count, key)

    def add_data(self, data):
        for event in data:
            qk = self.rules.get('query_key', 'all')
            if qk != 'all':
                qk = hashable(lookup_es_key(event, qk))
                if qk is None:
                    qk = 'other'
            if self.field_value is not None:
                count = lookup_es_key(event, self.field_value)
                if count is not None:
                    try:
                        count = int(count)
                    except ValueError:
                        elastalert_logger.warn('{} is not a number: {}'.format(self.field_value, count))
                    else:
                        self.handle_event(event, count, qk)
            else:
                self.handle_event(event, 1, qk)

    def clear_windows(self, qk, event):
        # Reset the state and prevent alerts until windows filled again
        self.ref_windows[qk].clear()
        self.first_event.pop(qk)
        self.skip_checks[qk] = lookup_es_key(event, self.ts_field) + self.rules['timeframe'] * 2

    def handle_event(self, event, count, qk='all'):
        self.first_event.setdefault(qk, event)

        self.ref_windows.setdefault(qk, EventWindow(self.timeframe, getTimestamp=self.get_ts))
        self.cur_windows.setdefault(qk, EventWindow(self.timeframe, self.ref_windows[qk].append, self.get_ts))

        self.cur_windows[qk].append((event, count))

        # Don't alert if ref window has not yet been filled for this key AND
        if lookup_es_key(event, self.ts_field) - self.first_event[qk][self.ts_field] < self.rules['timeframe'] * 2:
            # ElastAlert has not been running long enough for any alerts OR
            if not self.ref_window_filled_once:
                return
            # This rule is not using alert_on_new_data (with query_key) OR
            if not (self.rules.get('query_key') and self.rules.get('alert_on_new_data')):
                return
            # An alert for this qk has recently fired
            if qk in self.skip_checks and lookup_es_key(event, self.ts_field) < self.skip_checks[qk]:
                return
        else:
            self.ref_window_filled_once = True

        if self.field_value is not None:
            if self.find_matches(self.ref_windows[qk].mean(), self.cur_windows[qk].mean()):
                # skip over placeholder events
                for match, count in self.cur_windows[qk].data:
                    if "placeholder" not in match:
                        break
                self.add_match(match, qk)
                self.clear_windows(qk, match)
        else:
            if self.find_matches(self.ref_windows[qk].count(), self.cur_windows[qk].count()):
                # skip over placeholder events which have count=0
                for match, count in self.cur_windows[qk].data:
                    if count:
                        break

                self.add_match(match, qk)
                self.clear_windows(qk, match)

    def add_match(self, match, qk):
        extra_info = {}
        if self.field_value is None:
            spike_count = self.cur_windows[qk].count()
            reference_count = self.ref_windows[qk].count()
        else:
            spike_count = self.cur_windows[qk].mean()
            reference_count = self.ref_windows[qk].mean()
        extra_info = {'spike_count': spike_count,
                      'reference_count': reference_count}

        match = dict(list(match.items()) + list(extra_info.items()))

        super(SpikeRule, self).add_match(match)

    def find_matches(self, ref, cur):
        """ Determines if an event spike or dip happening. """
        # Apply threshold limits
        if self.field_value is None:
            if (cur < self.rules.get('threshold_cur', 0) or
                    ref < self.rules.get('threshold_ref', 0)):
                return False
        elif ref is None or ref == 0 or cur is None or cur == 0:
            return False

        spike_up, spike_down = False, False
        if cur <= ref / self.rules['spike_height']:
            spike_down = True
        if cur >= ref * self.rules['spike_height']:
            spike_up = True

        if (self.rules['spike_type'] in ['both', 'up'] and spike_up) or \
           (self.rules['spike_type'] in ['both', 'down'] and spike_down):
            return True
        return False

    def get_match_str(self, match):
        if self.field_value is None:
            message = 'An abnormal number (%d) of events occurred around %s.\n' % (
                match['spike_count'],
                pretty_ts(match[self.rules['timestamp_field']], self.rules.get('use_local_time'))
            )
            message += 'Preceding that time, there were only %d events within %s\n\n' % (match['reference_count'], self.rules['timeframe'])
        else:
            message = 'An abnormal average value (%.2f) of field \'%s\' occurred around %s.\n' % (
                match['spike_count'],
                self.field_value,
                pretty_ts(match[self.rules['timestamp_field']],
                          self.rules.get('use_local_time'))
            )
            message += 'Preceding that time, the field had an average value of (%.2f) within %s\n\n' % (
                match['reference_count'], self.rules['timeframe'])
        return message

    def garbage_collect(self, ts):
        # Windows are sized according to their newest event
        # This is a placeholder to accurately size windows in the absence of events
        for qk in list(self.cur_windows.keys()):
            # If we havn't seen this key in a long time, forget it
            if qk != 'all' and self.ref_windows[qk].count() == 0 and self.cur_windows[qk].count() == 0:
                self.cur_windows.pop(qk)
                self.ref_windows.pop(qk)
                continue
            placeholder = {self.ts_field: ts, "placeholder": True}
            # The placeholder may trigger an alert, in which case, qk will be expected
            if qk != 'all':
                placeholder.update({self.rules['query_key']: qk})
            self.handle_event(placeholder, 0, qk)


class FlatlineRule(FrequencyRule):
    """ A rule that matches when there is a low number of events given a timeframe. """
    required_options = frozenset(['timeframe', 'threshold'])

    def __init__(self, *args):
        super(FlatlineRule, self).__init__(*args)
        self.threshold = self.rules['threshold']

        # Dictionary mapping query keys to the first events
        self.first_event = {}

    def check_for_match(self, key, end=True):
        # This function gets called between every added document with end=True after the last
        # We ignore the calls before the end because it may trigger false positives
        if not end:
            return

        most_recent_ts = self.get_ts(self.occurrences[key].data[-1])
        if self.first_event.get(key) is None:
            self.first_event[key] = most_recent_ts

        # Don't check for matches until timeframe has elapsed
        if most_recent_ts - self.first_event[key] < self.rules['timeframe']:
            return

        # Match if, after removing old events, we hit num_events
        count = self.occurrences[key].count()
        if count < self.rules['threshold']:
            # Do a deep-copy, otherwise we lose the datetime type in the timestamp field of the last event
            event = copy.deepcopy(self.occurrences[key].data[-1][0])
            event.update(key=key, count=count)
            self.add_match(event)

            if not self.rules.get('forget_keys'):
                # After adding this match, leave the occurrences windows alone since it will
                # be pruned in the next add_data or garbage_collect, but reset the first_event
                # so that alerts continue to fire until the threshold is passed again.
                least_recent_ts = self.get_ts(self.occurrences[key].data[0])
                timeframe_ago = most_recent_ts - self.rules['timeframe']
                self.first_event[key] = min(least_recent_ts, timeframe_ago)
            else:
                # Forget about this key until we see it again
                self.first_event.pop(key)
                self.occurrences.pop(key)

    def get_match_str(self, match):
        ts = match[self.rules['timestamp_field']]
        lt = self.rules.get('use_local_time')
        message = 'An abnormally low number of events occurred around %s.\n' % (pretty_ts(ts, lt))
        message += 'Between %s and %s, there were less than %s events.\n\n' % (
            pretty_ts(dt_to_ts(ts_to_dt(ts) - self.rules['timeframe']), lt),
            pretty_ts(ts, lt),
            self.rules['threshold']
        )
        return message

    def garbage_collect(self, ts):
        # We add an event with a count of zero to the EventWindow for each key. This will cause the EventWindow
        # to remove events that occurred more than one `timeframe` ago, and call onRemoved on them.
        default = ['all'] if 'query_key' not in self.rules else []
        for key in list(self.occurrences.keys()) or default:
            self.occurrences.setdefault(
                key,
                EventWindow(self.rules['timeframe'], getTimestamp=self.get_ts)
            ).append(
                ({self.ts_field: ts}, 0)
            )
            self.first_event.setdefault(key, ts)
            self.check_for_match(key)


class NewTermsRule(RuleType):
    """ Alerts on a new value in a list of fields. """

    def __init__(self, rule, args=None):
        super(NewTermsRule, self).__init__(rule, args)
        self.seen_values = {}
        # Allow the use of query_key or fields
        if 'fields' not in self.rules:
            if 'query_key' not in self.rules:
                raise EAException("fields or query_key must be specified")
            self.fields = self.rules['query_key']
        else:
            self.fields = self.rules['fields']
        if not self.fields:
            raise EAException("fields must not be an empty list")
        if type(self.fields) != list:
            self.fields = [self.fields]
        if self.rules.get('use_terms_query') and \
                (len(self.fields) != 1 or (len(self.fields) == 1 and type(self.fields[0]) == list)):
            raise EAException("use_terms_query can only be used with a single non-composite field")
        if self.rules.get('use_terms_query'):
            if [self.rules['query_key']] != self.fields:
                raise EAException('If use_terms_query is specified, you cannot specify different query_key and fields')
            if not self.rules.get('query_key').endswith('.keyword') and not self.rules.get('query_key').endswith('.raw'):
                if self.rules.get('use_keyword_postfix', True):
                    elastalert_logger.warn('Warning: If query_key is a non-keyword field, you must set '
                                           'use_keyword_postfix to false, or add .keyword/.raw to your query_key.')
        try:
            self.get_all_terms(args)
        except Exception as e:
            # Refuse to start if we cannot get existing terms
            raise EAException('Error searching for existing terms: %s' % (repr(e))).with_traceback(sys.exc_info()[2])

    def get_all_terms(self, args):
        """ Performs a terms aggregation for each field to get every existing term. """
        self.es = elasticsearch_client(self.rules)
        window_size = datetime.timedelta(**self.rules.get('terms_window_size', {'days': 30}))
        field_name = {"field": "", "size": 2147483647}  # Integer.MAX_VALUE
        query_template = {"aggs": {"values": {"terms": field_name}}}
        if args and hasattr(args, 'start') and args.start:
            end = ts_to_dt(args.start)
        elif 'start_date' in self.rules:
            end = ts_to_dt(self.rules['start_date'])
        else:
            end = ts_now()
        start = end - window_size
        step = datetime.timedelta(**self.rules.get('window_step_size', {'days': 1}))

        for field in self.fields:
            tmp_start = start
            tmp_end = min(start + step, end)

            time_filter = {self.rules['timestamp_field']: {'lt': self.rules['dt_to_ts'](tmp_end), 'gte': self.rules['dt_to_ts'](tmp_start)}}
            query_template['filter'] = {'bool': {'must': [{'range': time_filter}]}}
            query = {'aggs': {'filtered': query_template}}

            if 'filter' in self.rules:
                for item in self.rules['filter']:
                    query_template['filter']['bool']['must'].append(item)

            # For composite keys, we will need to perform sub-aggregations
            if type(field) == list:
                self.seen_values.setdefault(tuple(field), [])
                level = query_template['aggs']
                # Iterate on each part of the composite key and add a sub aggs clause to the elastic search query
                for i, sub_field in enumerate(field):
                    if self.rules.get('use_keyword_postfix', True):
                        level['values']['terms']['field'] = add_raw_postfix(sub_field, self.is_five_or_above())
                    else:
                        level['values']['terms']['field'] = sub_field
                    if i < len(field) - 1:
                        # If we have more fields after the current one, then set up the next nested structure
                        level['values']['aggs'] = {'values': {'terms': copy.deepcopy(field_name)}}
                        level = level['values']['aggs']
            else:
                self.seen_values.setdefault(field, [])
                # For non-composite keys, only a single agg is needed
                if self.rules.get('use_keyword_postfix', True):
                    field_name['field'] = add_raw_postfix(field, self.is_five_or_above())
                else:
                    field_name['field'] = field

            # Query the entire time range in small chunks
            while tmp_start < end:
                if self.rules.get('use_strftime_index'):
                    index = format_index(self.rules['index'], tmp_start, tmp_end)
                else:
                    index = self.rules['index']
                res = self.es.search(body=query, index=index, ignore_unavailable=True, timeout='50s')
                if 'aggregations' in res:
                    buckets = res['aggregations']['filtered']['values']['buckets']
                    if type(field) == list:
                        # For composite keys, make the lookup based on all fields
                        # Make it a tuple since it can be hashed and used in dictionary lookups
                        for bucket in buckets:
                            # We need to walk down the hierarchy and obtain the value at each level
                            self.seen_values[tuple(field)] += self.flatten_aggregation_hierarchy(bucket)
                    else:
                        keys = [bucket['key'] for bucket in buckets]
                        self.seen_values[field] += keys
                else:
                    if type(field) == list:
                        self.seen_values.setdefault(tuple(field), [])
                    else:
                        self.seen_values.setdefault(field, [])
                if tmp_start == tmp_end:
                    break
                tmp_start = tmp_end
                tmp_end = min(tmp_start + step, end)
                time_filter[self.rules['timestamp_field']] = {'lt': self.rules['dt_to_ts'](tmp_end),
                                                              'gte': self.rules['dt_to_ts'](tmp_start)}

            for key, values in self.seen_values.items():
                if not values:
                    if type(key) == tuple:
                        # If we don't have any results, it could either be because of the absence of any baseline data
                        # OR it may be because the composite key contained a non-primitive type.  Either way, give the
                        # end-users a heads up to help them debug what might be going on.
                        elastalert_logger.warning((
                            'No results were found from all sub-aggregations.  This can either indicate that there is '
                            'no baseline data OR that a non-primitive field was used in a composite key.'
                        ))
                    else:
                        elastalert_logger.info('Found no values for %s' % (field))
                    continue
                self.seen_values[key] = list(set(values))
                elastalert_logger.info('Found %s unique values for %s' % (len(set(values)), key))

    def flatten_aggregation_hierarchy(self, root, hierarchy_tuple=()):
        """ For nested aggregations, the results come back in the following format:
            {
            "aggregations" : {
                "filtered" : {
                  "doc_count" : 37,
                  "values" : {
                    "doc_count_error_upper_bound" : 0,
                    "sum_other_doc_count" : 0,
                    "buckets" : [ {
                      "key" : "1.1.1.1", # IP address (root)
                      "doc_count" : 13,
                      "values" : {
                        "doc_count_error_upper_bound" : 0,
                        "sum_other_doc_count" : 0,
                        "buckets" : [ {
                          "key" : "80",    # Port (sub-aggregation)
                          "doc_count" : 3,
                          "values" : {
                            "doc_count_error_upper_bound" : 0,
                            "sum_other_doc_count" : 0,
                            "buckets" : [ {
                              "key" : "ack",  # Reason (sub-aggregation, leaf-node)
                              "doc_count" : 3
                            }, {
                              "key" : "syn",  # Reason (sub-aggregation, leaf-node)
                              "doc_count" : 1
                            } ]
                          }
                        }, {
                          "key" : "82",    # Port (sub-aggregation)
                          "doc_count" : 3,
                          "values" : {
                            "doc_count_error_upper_bound" : 0,
                            "sum_other_doc_count" : 0,
                            "buckets" : [ {
                              "key" : "ack",  # Reason (sub-aggregation, leaf-node)
                              "doc_count" : 3
                            }, {
                              "key" : "syn",  # Reason (sub-aggregation, leaf-node)
                              "doc_count" : 3
                            } ]
                          }
                        } ]
                      }
                    }, {
                      "key" : "2.2.2.2", # IP address (root)
                      "doc_count" : 4,
                      "values" : {
                        "doc_count_error_upper_bound" : 0,
                        "sum_other_doc_count" : 0,
                        "buckets" : [ {
                          "key" : "443",    # Port (sub-aggregation)
                          "doc_count" : 3,
                          "values" : {
                            "doc_count_error_upper_bound" : 0,
                            "sum_other_doc_count" : 0,
                            "buckets" : [ {
                              "key" : "ack",  # Reason (sub-aggregation, leaf-node)
                              "doc_count" : 3
                            }, {
                              "key" : "syn",  # Reason (sub-aggregation, leaf-node)
                              "doc_count" : 3
                            } ]
                          }
                        } ]
                      }
                    } ]
                  }
                }
              }
            }

            Each level will either have more values and buckets, or it will be a leaf node
            We'll ultimately return a flattened list with the hierarchies appended as strings,
            e.g the above snippet would yield a list with:

            [
             ('1.1.1.1', '80', 'ack'),
             ('1.1.1.1', '80', 'syn'),
             ('1.1.1.1', '82', 'ack'),
             ('1.1.1.1', '82', 'syn'),
             ('2.2.2.2', '443', 'ack'),
             ('2.2.2.2', '443', 'syn')
            ]

            A similar formatting will be performed in the add_data method and used as the basis for comparison

        """
        results = []
        # There are more aggregation hierarchies left.  Traverse them.
        if 'values' in root:
            results += self.flatten_aggregation_hierarchy(root['values']['buckets'], hierarchy_tuple + (root['key'],))
        else:
            # We've gotten to a sub-aggregation, which may have further sub-aggregations
            # See if we need to traverse further
            for node in root:
                if 'values' in node:
                    results += self.flatten_aggregation_hierarchy(node, hierarchy_tuple)
                else:
                    results.append(hierarchy_tuple + (node['key'],))
        return results

    def add_data(self, data):
        for document in data:
            for field in self.fields:
                value = ()
                lookup_field = field
                if type(field) == list:
                    # For composite keys, make the lookup based on all fields
                    # Make it a tuple since it can be hashed and used in dictionary lookups
                    lookup_field = tuple(field)
                    for sub_field in field:
                        lookup_result = lookup_es_key(document, sub_field)
                        if not lookup_result:
                            value = None
                            break
                        value += (lookup_result,)
                else:
                    value = lookup_es_key(document, field)
                if not value and self.rules.get('alert_on_missing_field'):
                    document['missing_field'] = lookup_field
                    self.add_match(copy.deepcopy(document))
                elif value:
                    if value not in self.seen_values[lookup_field]:
                        document['new_field'] = lookup_field
                        self.add_match(copy.deepcopy(document))
                        self.seen_values[lookup_field].append(value)

    def add_terms_data(self, terms):
        # With terms query, len(self.fields) is always 1 and the 0'th entry is always a string
        field = self.fields[0]
        for timestamp, buckets in terms.items():
            for bucket in buckets:
                if bucket['doc_count']:
                    if bucket['key'] not in self.seen_values[field]:
                        match = {field: bucket['key'],
                                 self.rules['timestamp_field']: timestamp,
                                 'new_field': field}
                        self.add_match(match)
                        self.seen_values[field].append(bucket['key'])

    def is_five_or_above(self):
        version = self.es.info()['version']['number']
        return int(version[0]) >= 5


class CardinalityRule(RuleType):
    """ A rule that matches if cardinality of a field is above or below a threshold within a timeframe """
    required_options = frozenset(['timeframe', 'cardinality_field'])

    def __init__(self, *args):
        super(CardinalityRule, self).__init__(*args)
        if 'max_cardinality' not in self.rules and 'min_cardinality' not in self.rules:
            raise EAException("CardinalityRule must have one of either max_cardinality or min_cardinality")
        self.ts_field = self.rules.get('timestamp_field', '@timestamp')
        self.cardinality_field = self.rules['cardinality_field']
        self.cardinality_cache = {}
        self.first_event = {}
        self.timeframe = self.rules['timeframe']

    def add_data(self, data):
        qk = self.rules.get('query_key')
        for event in data:
            if qk:
                key = hashable(lookup_es_key(event, qk))
            else:
                # If no query_key, we use the key 'all' for all events
                key = 'all'
            self.cardinality_cache.setdefault(key, {})
            self.first_event.setdefault(key, lookup_es_key(event, self.ts_field))
            value = hashable(lookup_es_key(event, self.cardinality_field))
            if value is not None:
                # Store this timestamp as most recent occurence of the term
                self.cardinality_cache[key][value] = lookup_es_key(event, self.ts_field)
                self.check_for_match(key, event)

    def check_for_match(self, key, event, gc=True):
        # Check to see if we are past max/min_cardinality for a given key
        time_elapsed = lookup_es_key(event, self.ts_field) - self.first_event.get(key, lookup_es_key(event, self.ts_field))
        timeframe_elapsed = time_elapsed > self.timeframe
        if (len(self.cardinality_cache[key]) > self.rules.get('max_cardinality', float('inf')) or
                (len(self.cardinality_cache[key]) < self.rules.get('min_cardinality', float('-inf')) and timeframe_elapsed)):
            # If there might be a match, run garbage collect first, as outdated terms are only removed in GC
            # Only run it if there might be a match so it doesn't impact performance
            if gc:
                self.garbage_collect(lookup_es_key(event, self.ts_field))
                self.check_for_match(key, event, False)
            else:
                self.first_event.pop(key, None)
                self.add_match(event)

    def garbage_collect(self, timestamp):
        """ Remove all occurrence data that is beyond the timeframe away """
        for qk, terms in list(self.cardinality_cache.items()):
            for term, last_occurence in list(terms.items()):
                if timestamp - last_occurence > self.rules['timeframe']:
                    self.cardinality_cache[qk].pop(term)

            # Create a placeholder event for if a min_cardinality match occured
            if 'min_cardinality' in self.rules:
                event = {self.ts_field: timestamp}
                if 'query_key' in self.rules:
                    event.update({self.rules['query_key']: qk})
                self.check_for_match(qk, event, False)

    def get_match_str(self, match):
        lt = self.rules.get('use_local_time')
        starttime = pretty_ts(dt_to_ts(ts_to_dt(lookup_es_key(match, self.ts_field)) - self.rules['timeframe']), lt)
        endtime = pretty_ts(lookup_es_key(match, self.ts_field), lt)
        if 'max_cardinality' in self.rules:
            message = ('A maximum of %d unique %s(s) occurred since last alert or between %s and %s\n\n' % (self.rules['max_cardinality'],
                                                                                                            self.rules['cardinality_field'],
                                                                                                            starttime, endtime))
        else:
            message = ('Less than %d unique %s(s) occurred since last alert or between %s and %s\n\n' % (self.rules['min_cardinality'],
                                                                                                         self.rules['cardinality_field'],
                                                                                                         starttime, endtime))
        return message


class BaseAggregationRule(RuleType):
    def __init__(self, *args):
        super(BaseAggregationRule, self).__init__(*args)
        bucket_interval = self.rules.get('bucket_interval')
        if bucket_interval:
            if 'seconds' in bucket_interval:
                self.rules['bucket_interval_period'] = str(bucket_interval['seconds']) + 's'
            elif 'minutes' in bucket_interval:
                self.rules['bucket_interval_period'] = str(bucket_interval['minutes']) + 'm'
            elif 'hours' in bucket_interval:
                self.rules['bucket_interval_period'] = str(bucket_interval['hours']) + 'h'
            elif 'days' in bucket_interval:
                self.rules['bucket_interval_period'] = str(bucket_interval['days']) + 'd'
            elif 'weeks' in bucket_interval:
                self.rules['bucket_interval_period'] = str(bucket_interval['weeks']) + 'w'
            else:
                raise EAException("Unsupported window size")

            if self.rules.get('use_run_every_query_size'):
                if total_seconds(self.rules['run_every']) % total_seconds(self.rules['bucket_interval_timedelta']) != 0:
                    raise EAException("run_every must be evenly divisible by bucket_interval if specified")
            else:
                if total_seconds(self.rules['buffer_time']) % total_seconds(self.rules['bucket_interval_timedelta']) != 0:
                    raise EAException("Buffer_time must be evenly divisible by bucket_interval if specified")

    def generate_aggregation_query(self):
        raise NotImplementedError()

    def add_aggregation_data(self, payload):
        for timestamp, payload_data in payload.items():
            if 'interval_aggs' in payload_data:
                self.unwrap_interval_buckets(timestamp, None, payload_data['interval_aggs']['buckets'])
            elif 'bucket_aggs' in payload_data:
                self.unwrap_term_buckets(timestamp, payload_data['bucket_aggs']['buckets'])
            else:
                self.check_matches(timestamp, None, payload_data)

    def unwrap_interval_buckets(self, timestamp, query_key, interval_buckets):
        for interval_data in interval_buckets:
            # Use bucket key here instead of start_time for more accurate match timestamp
            self.check_matches(ts_to_dt(interval_data['key_as_string']), query_key, interval_data)

    def unwrap_term_buckets(self, timestamp, term_buckets):
        for term_data in term_buckets:
            if 'interval_aggs' in term_data:
                self.unwrap_interval_buckets(timestamp, term_data['key'], term_data['interval_aggs']['buckets'])
            else:
                self.check_matches(timestamp, term_data['key'], term_data)

    def check_matches(self, timestamp, query_key, aggregation_data):
        raise NotImplementedError()


class MetricAggregationRule(BaseAggregationRule):
    """ A rule that matches when there is a low number of events given a timeframe. """
    required_options = frozenset(['metric_agg_key', 'metric_agg_type'])
    allowed_aggregations = frozenset(['min', 'max', 'avg', 'sum', 'cardinality', 'value_count'])

    def __init__(self, *args):
        super(MetricAggregationRule, self).__init__(*args)
        self.ts_field = self.rules.get('timestamp_field', '@timestamp')
        if 'max_threshold' not in self.rules and 'min_threshold' not in self.rules:
            raise EAException("MetricAggregationRule must have at least one of either max_threshold or min_threshold")

        self.metric_key = 'metric_' + self.rules['metric_agg_key'] + '_' + self.rules['metric_agg_type']

        if not self.rules['metric_agg_type'] in self.allowed_aggregations:
            raise EAException("metric_agg_type must be one of %s" % (str(self.allowed_aggregations)))

        self.rules['aggregation_query_element'] = self.generate_aggregation_query()

    def get_match_str(self, match):
        message = 'Threshold violation, %s:%s %s (min: %s max : %s) \n\n' % (
            self.rules['metric_agg_type'],
            self.rules['metric_agg_key'],
            match[self.metric_key],
            self.rules.get('min_threshold'),
            self.rules.get('max_threshold')
        )
        return message

    def generate_aggregation_query(self):
        return {self.metric_key: {self.rules['metric_agg_type']: {'field': self.rules['metric_agg_key']}}}

    def check_matches(self, timestamp, query_key, aggregation_data):
        if "compound_query_key" in self.rules:
            self.check_matches_recursive(timestamp, query_key, aggregation_data, self.rules['compound_query_key'], dict())

        else:
            metric_val = aggregation_data[self.metric_key]['value']
            if self.crossed_thresholds(metric_val):
                match = {self.rules['timestamp_field']: timestamp,
                         self.metric_key: metric_val}
                if query_key is not None:
                    match[self.rules['query_key']] = query_key
                self.add_match(match)

    def check_matches_recursive(self, timestamp, query_key, aggregation_data, compound_keys, match_data):
        if len(compound_keys) < 1:
            # shouldn't get to this point, but checking for safety
            return

        match_data[compound_keys[0]] = aggregation_data['key']
        if 'bucket_aggs' in aggregation_data:
            for result in aggregation_data['bucket_aggs']['buckets']:
                self.check_matches_recursive(timestamp,
                                             query_key,
                                             result,
                                             compound_keys[1:],
                                             match_data)

        else:
            metric_val = aggregation_data[self.metric_key]['value']
            if self.crossed_thresholds(metric_val):
                match_data[self.rules['timestamp_field']] = timestamp
                match_data[self.metric_key] = metric_val

                # add compound key to payload to allow alerts to trigger for every unique occurence
                compound_value = [match_data[key] for key in self.rules['compound_query_key']]
                match_data[self.rules['query_key']] = ",".join([str(value) for value in compound_value])

                self.add_match(match_data)

    def crossed_thresholds(self, metric_value):
        if metric_value is None:
            return False
        if 'max_threshold' in self.rules and metric_value > self.rules['max_threshold']:
            return True
        if 'min_threshold' in self.rules and metric_value < self.rules['min_threshold']:
            return True
        return False


class SpikeMetricAggregationRule(BaseAggregationRule, SpikeRule):
    """ A rule that matches when there is a spike in an aggregated event compared to its reference point """
    required_options = frozenset(['metric_agg_key', 'metric_agg_type', 'spike_height', 'spike_type'])
    allowed_aggregations = frozenset(['min', 'max', 'avg', 'sum', 'cardinality', 'value_count'])

    def __init__(self, *args):
        # We inherit everything from BaseAggregation and Spike, overwrite only what we need in functions below
        super(SpikeMetricAggregationRule, self).__init__(*args)

        # MetricAgg alert things
        self.metric_key = 'metric_' + self.rules['metric_agg_key'] + '_' + self.rules['metric_agg_type']
        if not self.rules['metric_agg_type'] in self.allowed_aggregations:
            raise EAException("metric_agg_type must be one of %s" % (str(self.allowed_aggregations)))

        # Disabling bucket intervals (doesn't make sense in context of spike to split up your time period)
        if self.rules.get('bucket_interval'):
            raise EAException("bucket intervals are not supported for spike aggregation alerts")

        self.rules['aggregation_query_element'] = self.generate_aggregation_query()

    def generate_aggregation_query(self):
        """Lifted from MetricAggregationRule, added support for scripted fields"""
        if self.rules.get('metric_agg_script'):
            return {self.metric_key: {self.rules['metric_agg_type']: self.rules['metric_agg_script']}}
        return {self.metric_key: {self.rules['metric_agg_type']: {'field': self.rules['metric_agg_key']}}}

    def add_aggregation_data(self, payload):
        """
        BaseAggregationRule.add_aggregation_data unpacks our results and runs checks directly against hardcoded cutoffs.
        We instead want to use all of our SpikeRule.handle_event inherited logic (current/reference) from
        the aggregation's "value" key to determine spikes from aggregations
        """
        for timestamp, payload_data in payload.items():
            if 'bucket_aggs' in payload_data:
                self.unwrap_term_buckets(timestamp, payload_data['bucket_aggs'])
            else:
                # no time / term split, just focus on the agg
                event = {self.ts_field: timestamp}
                agg_value = payload_data[self.metric_key]['value']
                self.handle_event(event, agg_value, 'all')
        return

    def unwrap_term_buckets(self, timestamp, term_buckets, qk=[]):
        """
        create separate spike event trackers for each term,
        handle compound query keys
        """
        for term_data in term_buckets['buckets']:
            qk.append(term_data['key'])

            # handle compound query keys (nested aggregations)
            if term_data.get('bucket_aggs'):
                self.unwrap_term_buckets(timestamp, term_data['bucket_aggs'], qk)
                # reset the query key to consider the proper depth for N > 2
                del qk[-1]
                continue

            qk_str = ','.join(qk)
            agg_value = term_data[self.metric_key]['value']
            event = {self.ts_field: timestamp,
                     self.rules['query_key']: qk_str}
            # pass to SpikeRule's tracker
            self.handle_event(event, agg_value, qk_str)

            # handle unpack of lowest level
            del qk[-1]
        return

    def get_match_str(self, match):
        """
        Overwrite SpikeRule's message to relate to the aggregation type & field instead of count
        """
        message = 'An abnormal {0} of {1} ({2}) occurred around {3}.\n'.format(
            self.rules['metric_agg_type'], self.rules['metric_agg_key'], round(match['spike_count'], 2),
            pretty_ts(match[self.rules['timestamp_field']], self.rules.get('use_local_time'))
        )
        message += 'Preceding that time, there was a {0} of {1} of ({2}) within {3}\n\n'.format(
            self.rules['metric_agg_type'], self.rules['metric_agg_key'],
            round(match['reference_count'], 2), self.rules['timeframe'])
        return message


class PercentageMatchRule(BaseAggregationRule):
    required_options = frozenset(['match_bucket_filter'])

    def __init__(self, *args):
        super(PercentageMatchRule, self).__init__(*args)
        self.ts_field = self.rules.get('timestamp_field', '@timestamp')
        if 'max_percentage' not in self.rules and 'min_percentage' not in self.rules:
            raise EAException("PercentageMatchRule must have at least one of either min_percentage or max_percentage")

        self.min_denominator = self.rules.get('min_denominator', 0)
        self.match_bucket_filter = self.rules['match_bucket_filter']
        self.rules['aggregation_query_element'] = self.generate_aggregation_query()

    def get_match_str(self, match):
        percentage_format_string = self.rules.get('percentage_format_string', None)
        message = 'Percentage violation, value: %s (min: %s max : %s) of %s items\n\n' % (
            percentage_format_string % (match['percentage']) if percentage_format_string else match['percentage'],
            self.rules.get('min_percentage'),
            self.rules.get('max_percentage'),
            match['denominator']
        )
        return message

    def generate_aggregation_query(self):
        return {
            'percentage_match_aggs': {
                'filters': {
                    'other_bucket': True,
                    'filters': {
                        'match_bucket': {
                            'bool': {
                                'must': self.match_bucket_filter
                            }
                        }
                    }
                }
            }
        }

    def check_matches(self, timestamp, query_key, aggregation_data):
        match_bucket_count = aggregation_data['percentage_match_aggs']['buckets']['match_bucket']['doc_count']
        other_bucket_count = aggregation_data['percentage_match_aggs']['buckets']['_other_']['doc_count']

        if match_bucket_count is None or other_bucket_count is None:
            return
        else:
            total_count = other_bucket_count + match_bucket_count
            if total_count == 0 or total_count < self.min_denominator:
                return
            else:
                match_percentage = (match_bucket_count * 1.0) / (total_count * 1.0) * 100
                if self.percentage_violation(match_percentage):
                    match = {self.rules['timestamp_field']: timestamp, 'percentage': match_percentage, 'denominator': total_count}
                    if query_key is not None:
                        match[self.rules['query_key']] = query_key
                    self.add_match(match)

    def percentage_violation(self, match_percentage):
        if 'max_percentage' in self.rules and match_percentage > self.rules['max_percentage']:
            return True
        if 'min_percentage' in self.rules and match_percentage < self.rules['min_percentage']:
            return True
        return False