1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
|
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from ..models import LogEvent, LogGroup, LogStream
from .query_parser import ParsedQuery, parse_query
class ParsedEvent:
def __init__(
self,
event: "LogEvent",
query: ParsedQuery,
log_stream: "LogStream",
log_group: "LogGroup",
):
self.event = event
self.query = query
self.log_stream = log_stream
self.log_group = log_group
self.fields = self._create_fields()
def _create_fields(self) -> dict[str, Any]:
fields: dict[str, Any] = {"@ptr": self.event.event_id}
if "@timestamp" in self.query.fields:
fields["@timestamp"] = self.event.timestamp
if "@message" in self.query.fields:
fields["@message"] = self.event.message
if "@logStream" in self.query.fields:
fields["@logStream"] = self.log_stream.log_stream_name # type: ignore[has-type]
if "@log" in self.query.fields:
fields["@log"] = self.log_group.name
return fields
def __eq__(self, other: "ParsedEvent") -> bool: # type: ignore[override]
return self.event.timestamp == other.event.timestamp
def __lt__(self, other: "ParsedEvent") -> bool:
return self.event.timestamp < other.event.timestamp
def __le__(self, other: "ParsedEvent") -> bool:
return self.event.timestamp <= other.event.timestamp
def __gt__(self, other: "ParsedEvent") -> bool:
return self.event.timestamp > other.event.timestamp
def __ge__(self, other: "ParsedEvent") -> bool:
return self.event.timestamp >= other.event.timestamp
def execute_query(
log_groups: list["LogGroup"], query: str, start_time: int, end_time: int
) -> list[dict[str, str]]:
parsed = parse_query(query)
all_events = _create_parsed_events(log_groups, parsed, start_time, end_time)
sorted_events = sorted(all_events, reverse=parsed.sort_reversed())
sorted_fields = [event.fields for event in sorted_events]
if parsed.limit:
return sorted_fields[0 : parsed.limit]
return sorted_fields
def _create_parsed_events(
log_groups: list["LogGroup"], query: ParsedQuery, start_time: int, end_time: int
) -> list["ParsedEvent"]:
def filter_func(event: "LogEvent") -> bool:
# Start/End time is in epoch seconds
# Event timestamp is in epoch milliseconds
if start_time and event.timestamp < (start_time * 1000):
return False
if end_time and event.timestamp > (end_time * 1000):
return False
return True
events: list[ParsedEvent] = []
for group in log_groups:
for stream in group.streams.values():
events.extend(
[
ParsedEvent(
event=event, query=query, log_stream=stream, log_group=group
)
for event in filter(filter_func, stream.events)
]
)
return events
|