File: problem.py

package info (click to toggle)
zabbix-cli 3.5.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,892 kB
  • sloc: python: 18,211; makefile: 3
file content (283 lines) | stat: -rw-r--r-- 8,805 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
from __future__ import annotations

from typing import Optional

import typer

from zabbix_cli._v2_compat import ARGS_POSITIONAL
from zabbix_cli.app import Example
from zabbix_cli.app import app
from zabbix_cli.output.console import err_console
from zabbix_cli.output.console import exit_err
from zabbix_cli.output.render import render_result
from zabbix_cli.pyzabbix.enums import TriggerPriority
from zabbix_cli.utils.args import parse_bool_arg
from zabbix_cli.utils.args import parse_int_arg
from zabbix_cli.utils.args import parse_list_arg

HELP_PANEL = "Problem"


@app.command(name="acknowledge_event", rich_help_panel=HELP_PANEL)
def acknowledge_event(
    ctx: typer.Context,
    event_ids: str = typer.Argument(
        help="Comma-separated list of event ID(s)",
        show_default=False,
    ),
    message: str = typer.Option(
        "[Zabbix-CLI] Acknowledged via acknowledge_events",
        "--message",
        "-m",
        help="Message to add to the event",
    ),
    close: bool = typer.Option(
        False,
        "--close",
        "-c",
        help="Close the event after acknowledging it",
    ),
    # Legacy positional args
    args: Optional[list[str]] = ARGS_POSITIONAL,
) -> None:
    """Acknowledge events by ID."""
    from zabbix_cli.commands.results.problem import AcknowledgeEventResult
    from zabbix_cli.models import Result

    eids = parse_list_arg(event_ids)
    if not eids:
        exit_err("No event IDs specified.")

    if args:
        if len(args) != 2:
            exit_err("Invalid number of positional arguments.")
        message = args[0]
        close = parse_bool_arg(args[1])
    # Don't prompt for missing message. It's optional.
    acknowledged_ids = app.state.client.acknowledge_event(
        *eids, message=message, close=close
    )

    msg = "Event(s) acknowledged"
    if close:
        msg += " and closed"
    render_result(
        Result(
            message=msg,
            result=AcknowledgeEventResult(
                event_ids=acknowledged_ids, close=close, message=message
            ),
        )
    )


@app.command(
    name="acknowledge_trigger_last_event",
    rich_help_panel=HELP_PANEL,
    examples=[
        Example(
            "Acknowledge the last event for trigger 12345",
            "acknowledge_trigger_last_event 12345",
        ),
        Example(
            "Acknowledge the last event for trigger 12345 with a message and close it",
            "acknowledge_trigger_last_event 12345 --message 'Acked via CLI' --close",
        ),
        Example(
            "Acknowledge multiple triggers",
            "acknowledge_trigger_last_event 12345,12346",
        ),
    ],
)
def acknowledge_trigger_last_event(
    ctx: typer.Context,
    trigger_ids: str,
    message: Optional[str] = typer.Option(
        None,
        "--message",
        help="Acknowledgement message",
    ),
    close: bool = typer.Option(
        False,
        "--close",
        "-c",
        help="Close event",
    ),
    # Legacy positional args
    args: Optional[list[str]] = ARGS_POSITIONAL,
) -> None:
    """Acknowledge the the last event for the given triggers."""
    from zabbix_cli.commands.results.problem import AcknowledgeTriggerLastEventResult
    from zabbix_cli.models import Result

    tids = parse_list_arg(trigger_ids)
    if not tids:
        exit_err("No trigger IDs specified.")
    if args:
        if len(args) != 2:
            exit_err("Invalid number of positional arguments.")
        message = args[0]
        close = parse_bool_arg(args[1])

    # Message is optional, so we don't prompt for it if it's missing
    events = [app.state.client.get_event(object_id=tid) for tid in tids]
    event_ids = [e.eventid for e in events]
    acknowledged_ids = app.state.client.acknowledge_event(
        *event_ids, message=message, close=close
    )

    msg = "Event(s) acknowledged"
    if close:
        msg += " and closed"
    render_result(
        Result(
            message=msg,
            result=AcknowledgeTriggerLastEventResult(
                trigger_ids=tids,
                event_ids=acknowledged_ids,
                close=close,
                message=message,
            ),
        )
    )


@app.command(name="show_alarms", rich_help_panel=HELP_PANEL)
def show_alarms(
    ctx: typer.Context,
    description: Optional[str] = typer.Option(
        None,
        "--description",
        help="Description of alarm(s) to show.",
    ),
    # Could this be a list of priorities in V2?
    priority: Optional[TriggerPriority] = typer.Option(
        None,
        "--priority",
        help="Priority of alarm(s) to show.",
    ),
    hostgroups: Optional[str] = typer.Option(
        None,
        "--hostgroup",
        help="Host group(s) to show alarms for. Comma-separated.",
    ),
    unacknowledged: bool = typer.Option(
        True,
        "--unack/--ack",
        help="Show only alarms whose last event is unacknowledged.",
    ),
    args: Optional[list[str]] = ARGS_POSITIONAL,
) -> None:
    """Show the latest events for the given triggers, hosts, and/or host groups.

    At least one trigger ID, host or host group must be specified.
    """
    from zabbix_cli.models import AggregateResult

    if args:
        if len(args) != 4:
            exit_err("Invalid number of positional arguments.")
        description = args[0]
        priority = TriggerPriority(args[1])
        hostgroups = args[2]
        # in V2, "*" was used to indicate "false"
        if args[3] == "*":
            unacknowledged = False
        else:
            unacknowledged = parse_bool_arg(args[3])

    hostgroups_args = parse_list_arg(hostgroups)
    hgs = [app.state.client.get_hostgroup(hg) for hg in hostgroups_args]
    with app.status("Fetching triggers..."):
        triggers = app.state.client.get_triggers(
            hostgroups=hgs,
            description=description,
            priority=priority,
            unacknowledged=unacknowledged,
            select_hosts=True,
            skip_dependent=True,
            monitored=True,
            active=True,
            expand_description=True,
            filter={"value": 1},  # why?
        )
    render_result(AggregateResult(result=triggers))


@app.command(
    name="show_trigger_events",
    rich_help_panel=HELP_PANEL,
    examples=[
        Example(
            "Show recent events for host foo.example.com",
            "show_trigger_events --host foo.example.com",
        ),
        Example(
            "Show recent events for hosts in host group 'Linux servers'",
            "show_trigger_events --hostgroup 'Linux servers'",
        ),
        Example(
            "Show 20 most recent events for triggers 12345 & 12346",
            "show_trigger_events --trigger-id 12345,12346 --limit 20",
        ),
    ],
)
def show_trigger_events(
    ctx: typer.Context,
    trigger_id: Optional[str] = typer.Option(
        None,
        "--trigger-id",
        help="ID of trigger(s) to show events for.",
    ),
    hostgroups: Optional[str] = typer.Option(
        None,
        "--hostgroup",
        help="Host group(s) to show events for.",
    ),
    hosts: Optional[str] = typer.Option(
        None,
        "--host",
        help="Host(s) to show events for.",
    ),
    limit: int = typer.Option(
        10,
        "--limit",
        "-l",
        help="Maximum number of events to show.",
    ),
    args: Optional[list[str]] = ARGS_POSITIONAL,
) -> None:
    """Show the latest events for the given triggers, hosts, and/or host groups.

    At least one trigger ID, host or host group must be specified.
    """
    from zabbix_cli.models import AggregateResult

    if args:
        if len(args) != 2:
            exit_err("Invalid number of positional arguments.")
        trigger_id = args[0]
        limit = parse_int_arg(args[1])

    # Parse commma-separated args
    trigger_ids = parse_list_arg(trigger_id)
    hostgroups_args = parse_list_arg(hostgroups)
    hosts_args = parse_list_arg(hosts)
    if not trigger_ids and not hostgroups_args and not hosts_args:
        err_console.print(ctx.get_help())
        exit_err("At least one trigger ID, host or host group must be specified.")

    # Fetch the host(group)s if specified
    hostgroups_list = [app.state.client.get_hostgroup(hg) for hg in hostgroups_args]
    hosts_list = [app.state.client.get_host(host) for host in hosts_args]

    with app.status("Fetching events..."):
        events = app.state.client.get_events(
            object_ids=trigger_ids,
            group_ids=[hg.groupid for hg in hostgroups_list],
            host_ids=[host.hostid for host in hosts_list],
            sort_field="clock",
            sort_order="DESC",
            limit=limit,
        )
    render_result(AggregateResult(result=events))