File: monitoring_and_alerting.py

package info (click to toggle)
python-influxdb-client 1.40.0-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 7,216 kB
  • sloc: python: 60,236; sh: 64; makefile: 53
file content (121 lines) | stat: -rw-r--r-- 5,275 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
"""
How to create a check with Slack notification.
"""
import datetime

from influxdb_client.service.notification_rules_service import NotificationRulesService

from influxdb_client.domain.rule_status_level import RuleStatusLevel

from influxdb_client.domain.status_rule import StatusRule

from influxdb_client.domain.slack_notification_rule import SlackNotificationRule

from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.domain.check_status_level import CheckStatusLevel
from influxdb_client.domain.dashboard_query import DashboardQuery
from influxdb_client.domain.lesser_threshold import LesserThreshold
from influxdb_client.domain.query_edit_mode import QueryEditMode
from influxdb_client.domain.slack_notification_endpoint import SlackNotificationEndpoint
from influxdb_client.domain.task_status_type import TaskStatusType
from influxdb_client.domain.threshold_check import ThresholdCheck
from influxdb_client.service.checks_service import ChecksService
from influxdb_client.service.notification_endpoints_service import NotificationEndpointsService

"""
Define credentials
"""
url = "http://localhost:8086"
token = "my-token"
org_name = "my-org"
bucket_name = "my-bucket"


with InfluxDBClient(url=url, token=token, org=org_name, debug=False) as client:
    uniqueId = str(datetime.datetime.now())

    """
    Find Organization ID by Organization API.
    """
    org = client.organizations_api().find_organizations(org=org_name)[0]

    """
    Prepare data
    """
    client.write_api(write_options=SYNCHRONOUS).write(record="mem,production=true free=40", bucket=bucket_name)

    """
    Create Threshold Check - set status to `Critical` if the current value is lesser than `35`.
    """
    threshold = LesserThreshold(value=35.0,
                                level=CheckStatusLevel.CRIT)
    query = f'''
            from(bucket:"{bucket_name}") 
                |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
                |> filter(fn: (r) => r["_measurement"] == "mem")
                |> filter(fn: (r) => r["_field"] == "free")
                |> aggregateWindow(every: 1m, fn: mean, createEmpty: false)        
                |> yield(name: "mean")
            '''

    check = ThresholdCheck(name=f"Check created by Remote API_{uniqueId}",
                           status_message_template="The value is on: ${ r._level } level!",
                           every="5s",
                           offset="0s",
                           query=DashboardQuery(edit_mode=QueryEditMode.ADVANCED, text=query),
                           thresholds=[threshold],
                           org_id=org.id,
                           status=TaskStatusType.ACTIVE)

    checks_service = ChecksService(api_client=client.api_client)
    checks_service.create_check(check)

    """
    Create Slack Notification endpoint
    """
    notification_endpoint = SlackNotificationEndpoint(name=f"Slack Dev Channel_{uniqueId}",
                                                      url="https://hooks.slack.com/services/x/y/z",
                                                      org_id=org.id)
    notification_endpoint_service = NotificationEndpointsService(api_client=client.api_client)
    notification_endpoint = notification_endpoint_service.create_notification_endpoint(notification_endpoint)

    """
    Create Notification Rule to notify critical value to Slack Channel
    """
    notification_rule = SlackNotificationRule(name=f"Critical status to Slack_{uniqueId}",
                                              every="10s",
                                              offset="0s",
                                              message_template="${ r._message }",
                                              status_rules=[StatusRule(current_level=RuleStatusLevel.CRIT)],
                                              tag_rules=[],
                                              endpoint_id=notification_endpoint.id,
                                              org_id=org.id,
                                              status=TaskStatusType.ACTIVE)

    notification_rules_service = NotificationRulesService(api_client=client.api_client)
    notification_rules_service.create_notification_rule(notification_rule)

    """
    List all Checks
    """
    print(f"\n------- Checks: -------\n")
    checks = checks_service.get_checks(org_id=org.id).checks
    print("\n".join([f" ---\n ID: {it.id}\n Name: {it.name}\n Type: {type(it)}" for it in checks]))
    print("---")

    """
    List all Endpoints
    """
    print(f"\n------- Notification Endpoints: -------\n")
    notification_endpoints = notification_endpoint_service.get_notification_endpoints(org_id=org.id).notification_endpoints
    print("\n".join([f" ---\n ID: {it.id}\n Name: {it.name}\n Type: {type(it)}" for it in notification_endpoints]))
    print("---")

    """
    List all Notification Rules
    """
    print(f"\n------- Notification Rules: -------\n")
    notification_rules = notification_rules_service.get_notification_rules(org_id=org.id).notification_rules
    print("\n".join([f" ---\n ID: {it.id}\n Name: {it.name}\n Type: {type(it)}" for it in notification_rules]))
    print("---")