File: filter.py

package info (click to toggle)
snapd 2.71-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 79,536 kB
  • sloc: ansic: 16,114; sh: 16,105; python: 9,941; makefile: 1,890; exp: 190; awk: 40; xml: 22
file content (379 lines) | stat: -rwxr-xr-x 11,790 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
#!/usr/bin/env python3

from __future__ import annotations

import argparse
import re
import sys
from typing import IO

from common import LockOpTrace, get_next_match


class LockOp:
    """
    LockOp: Represents a lock operation in the log file, including its header
    and times (held and wait times). It is tied to the LockOpTrace class.
    """

    def __init__(self, lines: list[str]):
        self.trace = LockOpTrace(lines[1:])
        self.header = lines[0]
        self.held_time = 0
        self.wait_time = 0
        self._calc_held_ms(self.header)
        self._calc_wait_ms(self.header)

    def _calc_held_ms(self, line: str) -> int:
        match = re.search(".*held: (.+?) ms.*", line)
        if match:
            self.held_time = int(match.group(1))
        else:
            raise ValueError("No held time in line: {}".format(line))

    def _calc_wait_ms(self, line: str) -> int:
        match = re.search(".*wait (.+?) ms.*", line)
        if match:
            self.wait_time = int(match.group(1))
        else:
            raise ValueError("No wait time in line: {}".format(line))

    def get_held_time(self) -> int:
        return self.held_time

    def get_wait_time(self) -> int:
        return self.wait_time

    def get_trace(self) -> LockOpTrace:
        return self.trace


class LocksGroup:
    """
    LocksGroup: Represents a group of locks, which could correspond to a test
    case or a project setup. It holds a header (name) and a list of Lock
    objects.
    """

    LOCK_PREFIX = "### "

    locks: list[LockOp]

    def __init__(self, lines: list[str]):
        self.lines = lines
        self.header = self.lines[0]
        self.locks = []

        self._read()

    def _read(self) -> None:
        current_line = 1
        while current_line < len(self.lines):
            lock_lines = self._current_lock(current_line)
            if len(lock_lines) == 0:
                raise RuntimeError("Error parsing lock")

            self.locks.append(LockOp(lock_lines))
            current_line = current_line + len(lock_lines)

    def __str__(self) -> str:
        return "".join(self.lines)

    # This function works to detect the current test or project
    def _current_lock(self, start_line: int) -> list[str]:
        next_match = get_next_match(self.lines, start_line, self.LOCK_PREFIX)
        if next_match == -1:
            return self.lines[start_line:len(self.lines)]
        return self.lines[start_line:next_match]

    def get_name(self) -> str:
        return self.header

    def get_locks(self) -> list[LockOp]:
        return self.locks

    def get_traces(self) -> list[LockOpTrace]:
        traces = []
        for lock in self.locks:
            traces.append(lock.get_trace())
        return traces

    def get_lock_held_time(self, trace: LockOpTrace) -> int:
        for lock in self.locks:
            if lock.get_trace() == trace:
                return lock.get_held_time()

        return 0

    def get_lock_wait_time(self, trace: LockOpTrace) -> int:
        for lock in self.locks:
            if lock.get_trace() == trace:
                return lock.get_wait_time()

        return 0


class GroupTimes:
    """
    GroupTimes: A tuple-like class that associates the group name with the
    held and wait times for each lock trace.
    """

    def __init__(self, group_name: str, held_time: int, wait_time: int):
        self.group_name = group_name
        self.held_time = held_time
        self.wait_time = wait_time

    def get_group_name(self) -> str:
        return self.group_name

    def get_held_time(self) -> int:
        return self.held_time

    def get_wait_time(self) -> int:
        return self.wait_time


class LockTraceManager:
    """
    LockTraceManager: Handles filtering and managing the lock traces
    and associated group times. It provides methods to filter the traces
    by time and to print the results in a sorted manner.
    """

    traces: dict[LockOpTrace, list[GroupTimes]]

    def __init__(self, traces: dict[LockOpTrace, list[GroupTimes]]):
        self.traces = traces

    # Filter the times for each trace
    def filter(self, held_time: int, wait_time: int) -> None:
        filtered_traces = dict[LockOpTrace, list[GroupTimes]]()
        for trace, times in self.traces.items():
            filtered_times = [
                time
                for time in times
                if time.get_held_time() >= held_time
                and time.get_wait_time() >= wait_time
            ]
            if len(filtered_times) > 0:
                filtered_traces[trace] = filtered_times

        self.traces = filtered_traces

    # Keep the traces that match with the params
    def match(self, match_names: list[str]) -> None:
        filtered_traces = dict[LockOpTrace, list[GroupTimes]]()
        for trace, times in self.traces.items():
            for match_name in match_names:
                if trace.match(match_name):
                    filtered_traces[trace] = times

        self.traces = filtered_traces

    # print the traces with their times for each test
    def print(
        self, sort_held_time: bool, sort_wait_time: bool, list_traces: bool
    ) -> None:
        if sort_held_time:
            for trace, times in self.traces.items():
                self.traces[trace] = sorted(
                    times, key=lambda x: x.get_held_time(), reverse=True
                )
        if sort_wait_time:
            for trace, times in self.traces.items():
                self.traces[trace] = sorted(
                    times, key=lambda x: x.get_held_time(), reverse=True
                )

        for trace, times in self.traces.items():
            trace.print()

            if not list_traces:
                for time in times:
                    print(
                        "{} held: {} ms, wait: {} ms".format(
                            time.get_group_name(),
                            time.get_held_time(),
                            time.get_wait_time(),
                        )
                    )
                print("")


class LocksFileReader:
    """
    LocksFileReader: Reads the lock file and parses the different test cases
    and groups. It extracts the relevant trace data and stores it in
    LocksGroup instances. It can also return a dictionary of traces and
    associated group times.
    """

    PROJECT_PREFIX = "###START: SNAPD PROJECT"
    TEST_PREFIX = "###START:"

    lines: list[str]
    groups: list[LocksGroup]

    def __init__(self, locks_file: IO[str]):
        self.lines = []
        self.groups = []

        self._read(locks_file)

    def _read(self, locks_file: IO[str]) -> None:
        self.lines = locks_file.readlines()

        current_line = 0
        if not self._is_project(self.lines[current_line]):
            print("First time expected to be the project start.")
            sys.exit(1)

        # Read the tests
        while current_line < len(self.lines):
            group_lines = self._current_group(current_line)
            if len(group_lines) == 0:
                raise RuntimeError("Error parsing test.")

            self.groups.append(LocksGroup(group_lines))
            current_line = current_line + len(group_lines)

    # Indicates if the line is the project declaration
    def _is_project(self, line: str) -> bool:
        return line.startswith(self.PROJECT_PREFIX)

    # This function works to detect the current test or project
    def _current_group(self, start_line: int) -> list[str]:
        next_match = get_next_match(self.lines, start_line, self.TEST_PREFIX)
        if next_match == -1:
            return self.lines[start_line:len(self.lines)]
        return self.lines[start_line:next_match]

    # Retrieve the test lines
    def get_test(self, test: str) -> str:
        for group in self.groups:
            if test in group.get_name():
                return str(group)

        return ""

    # Retrieve the times for each trace in the file
    # For each trace, there is a list with the times for each test where
    # the trace appears
    def get_traces_times(self) -> dict[LockOpTrace, list[GroupTimes]]:
        traces = dict[LockOpTrace, list[GroupTimes]]()
        for group in self.groups:
            group_traces = group.get_traces()
            for trace in group_traces:
                group_time = GroupTimes(
                    group.get_name(),
                    group.get_lock_held_time(trace),
                    group.get_lock_wait_time(trace),
                )
                if trace not in traces.keys():
                    traces[trace] = list[GroupTimes]()
                try:
                    traces[trace].append(group_time)
                except KeyError:
                    traces[trace] = list[GroupTimes]([group_time])

        return traces


"""
A locks file must:
- begin with "###START: SNAPD PROJECT"
- contain at least one group, indicated by the key string "###START"

It parses the lock information by:
- grouping its contents based on consecutive lines that begin with "###START"
- creating subgroups for each group by searching for the key string "###" and
  creating a subgroup for each instance found of consecutive lines between
  instances of that key string

One may filter data based on:
- matching string found in a sequence of sub-group lock information. 
  If a match is found, then only sub-groups that contain that match will be shown
- only showing held or wait times above a certain threshold
- a singular test
"""


def _make_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(
        prog="state-locks-filter",
        description="Reads a locks file and extracts lock information.",
    )
    parser.add_argument(
        "-f",
        "--locks-file",
        type=argparse.FileType("r"),
        required=True,
        help="Locks file",
    )
    parser.add_argument(
        "--test",
        default="",
        help="Show results for this test, when test is "
             "selected no other filters are applied.",
    )
    parser.add_argument(
        "--match",
        action="append",
        default=[],
        help="Show traces that match this specific word",
    )
    parser.add_argument(
        "--held-time", type=int, default=0,
        help="Include locks with longer held time"
    )
    parser.add_argument(
        "--wait-time", type=int, default=0,
        help="Include locks with longer wait time"
    )
    parser.add_argument(
        "--sort-held-time",
        action="store_true",
        help="Sort higher times first by held time",
    )
    parser.add_argument(
        "--sort-wait-time",
        action="store_true",
        help="Sort higher times first by wait time",
    )
    parser.add_argument(
        "--list-traces",
        action="store_true",
        help="Just print the resulting traces",
    )

    return parser


if __name__ == "__main__":
    parser = _make_parser()
    args = parser.parse_args()

    if args.sort_held_time and args.sort_wait_time:
        print("state-lock-filter: define just 1 sorting (by held/wait times)")
        sys.exit(1)

    locks_reader = LocksFileReader(args.locks_file)
    if args.test:
        print(locks_reader.get_test(args.test))
        sys.exit()

    trace_manager = LockTraceManager(locks_reader.get_traces_times())

    # Then keep traces with matches
    if args.match:
        trace_manager.match(args.match)

    # First filter by time
    if args.held_time > 0 or args.wait_time > 0:
        trace_manager.filter(args.held_time, args.wait_time)

    # And finally print the sorted results (if required)
    trace_manager.print(args.sort_held_time,
                        args.sort_wait_time,
                        args.list_traces)