File: test__init__.py

package info (click to toggle)
anta 1.7.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,048 kB
  • sloc: python: 48,164; sh: 28; javascript: 9; makefile: 4
file content (624 lines) | stat: -rw-r--r-- 26,230 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
# Copyright (c) 2023-2025 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the LICENSE file.
"""Test anta.result_manager.__init__.py."""

from __future__ import annotations

import json
import logging
import re
from contextlib import AbstractContextManager, nullcontext
from typing import TYPE_CHECKING

import pytest

from anta.result_manager import ResultManager, models
from anta.result_manager.models import AntaTestStatus

if TYPE_CHECKING:
    from collections.abc import Callable

    from anta.result_manager.models import TestResult


# pylint: disable=too-many-public-methods
class TestResultManager:
    """Test ResultManager class."""

    # TODO: test __init__() and reset()

    def test__len__(self, result_manager_factory: Callable[[int], ResultManager]) -> None:
        """Test __len__."""
        for i in range(3):
            result_manager = result_manager_factory(i)
            assert len(result_manager) == i

    def test_results_getter(self, result_manager_factory: Callable[[int], ResultManager]) -> None:
        """Test ResultManager.results property getter."""
        result_manager = result_manager_factory(3)
        res = result_manager.results
        assert len(res) == 3
        assert isinstance(res, list)
        for e in res:
            assert isinstance(e, models.TestResult)

    def test_results_setter(self, test_result_factory: Callable[..., TestResult], result_manager_factory: Callable[[int], ResultManager]) -> None:
        """Test ResultManager.results property setter."""
        result_manager = result_manager_factory(3)
        assert len(result_manager) == 3
        tests = [test_result_factory(i) for i in range(5)]
        result_manager.results = tests
        assert len(result_manager) == 5

    def test_json(self, test_result_factory: Callable[..., TestResult]) -> None:
        """Test ResultManager.json property."""
        result_manager = ResultManager()

        success_list = [test_result_factory(i) for i in range(3)]
        for test in success_list:
            test.result = AntaTestStatus.SUCCESS
        result_manager.results = success_list

        json_res = result_manager.json
        assert isinstance(json_res, str)

        # Verifies it can be deserialized back to a list of dict with the correct values types
        res = json.loads(json_res)
        for test in res:
            assert isinstance(test, dict)
            assert isinstance(test.get("test"), str)
            assert isinstance(test.get("categories"), list)
            assert isinstance(test.get("description"), str)
            # @gmuloc: Adding this as part of #1364 to make sure we don't remove custom_field again
            # TODO: modify this if we add back exclude_none=True
            assert "custom_field" in test
            assert test.get("custom_field") is None
            assert test.get("result") == "success"

    def test_sorted_category_stats(self, test_result_factory: Callable[..., TestResult]) -> None:
        """Test ResultManager.sorted_category_stats."""
        result_manager = ResultManager()
        results = [test_result_factory(i) for i in range(4)]

        # Modify the categories to have a mix of different acronym categories
        results[0].categories = ["ospf"]
        results[1].categories = ["bgp"]
        results[2].categories = ["vxlan"]
        results[3].categories = ["system"]

        result_manager.results = results

        # Check that category_stats returns sorted order by default
        expected_order = ["bgp", "ospf", "system", "vxlan"]
        assert list(result_manager.category_stats.keys()) == expected_order

    @pytest.mark.parametrize(
        ("starting_status", "test_status", "expected_status", "expected_raise"),
        [
            pytest.param("unset", "unset", "unset", nullcontext(), id="unset->unset"),
            pytest.param("unset", "success", "success", nullcontext(), id="unset->success"),
            pytest.param("unset", "error", "unset", nullcontext(), id="set error"),
            pytest.param("skipped", "skipped", "skipped", nullcontext(), id="skipped->skipped"),
            pytest.param("skipped", "unset", "skipped", nullcontext(), id="skipped, add unset"),
            pytest.param(
                "skipped",
                "success",
                "success",
                nullcontext(),
                id="skipped, add success",
            ),
            pytest.param(
                "skipped",
                "failure",
                "failure",
                nullcontext(),
                id="skipped, add failure",
            ),
            pytest.param("success", "unset", "success", nullcontext(), id="success, add unset"),
            pytest.param(
                "success",
                "skipped",
                "success",
                nullcontext(),
                id="success, add skipped",
            ),
            pytest.param("success", "success", "success", nullcontext(), id="success->success"),
            pytest.param("success", "failure", "failure", nullcontext(), id="success->failure"),
            pytest.param("failure", "unset", "failure", nullcontext(), id="failure->failure"),
            pytest.param("failure", "skipped", "failure", nullcontext(), id="failure, add unset"),
            pytest.param(
                "failure",
                "success",
                "failure",
                nullcontext(),
                id="failure, add skipped",
            ),
            pytest.param(
                "failure",
                "failure",
                "failure",
                nullcontext(),
                id="failure, add success",
            ),
            pytest.param("unset", "unknown", None, pytest.raises(ValueError, match="'unknown' is not a valid AntaTestStatus"), id="wrong status"),
        ],
    )
    def test_add(
        self,
        test_result_factory: Callable[..., TestResult],
        starting_status: str,
        test_status: str,
        expected_status: str,
        expected_raise: AbstractContextManager[Exception],
    ) -> None:
        """Test ResultManager_update_status."""
        result_manager = ResultManager()
        result_manager.status = AntaTestStatus(starting_status)
        assert result_manager.error_status is False
        assert len(result_manager) == 0

        test = test_result_factory()
        with expected_raise:
            test.result = AntaTestStatus(test_status)
            result_manager.add(test)
            if test_status == "error":
                assert result_manager.error_status is True
            else:
                assert result_manager.status == expected_status
            assert len(result_manager) == 1

    def test_add_clear_cache(self, result_manager: ResultManager, test_result_factory: Callable[..., TestResult]) -> None:
        """Test ResultManager.add and make sure the cache is reset after adding a new test."""
        # Check the cache is empty
        assert "results_by_status" not in result_manager.__dict__

        # Access the cache
        assert result_manager.get_total_results() == 181

        # Check the cache is filled with the correct results count
        assert "results_by_status" in result_manager.__dict__
        assert sum(len(v) for v in result_manager.__dict__["results_by_status"].values()) == 181

        # Add a new test
        result_manager.add(result=test_result_factory())

        # Check the cache has been reset
        assert "results_by_status" not in result_manager.__dict__

        # Access the cache again
        assert result_manager.get_total_results() == 182

        # Check the cache is filled again with the correct results count
        assert "results_by_status" in result_manager.__dict__
        assert sum(len(v) for v in result_manager.__dict__["results_by_status"].values()) == 182

    def test_get_results(self, result_manager: ResultManager) -> None:
        """Test ResultManager.get_results."""
        # Check for single status
        success_results = result_manager.get_results(status={AntaTestStatus.SUCCESS})
        assert len(success_results) == 43
        assert all(r.result == "success" for r in success_results)

        # Check for multiple statuses
        failure_results = result_manager.get_results(status={AntaTestStatus.FAILURE, AntaTestStatus.ERROR})
        assert len(failure_results) == 104
        assert all(r.result in {"failure", "error"} for r in failure_results)

        # Check all results
        all_results = result_manager.get_results()
        assert len(all_results) == 181

    def test_get_results_sort_by(self, result_manager: ResultManager) -> None:
        """Test ResultManager.get_results with sort_by."""
        # Check all results with sort_by result
        all_results = result_manager.get_results(sort_by=["result"])
        assert len(all_results) == 181
        assert [r.result for r in all_results] == ["error"] * 1 + ["failure"] * 103 + ["skipped"] * 34 + ["success"] * 43

        # Check all results with sort_by device (name)
        all_results = result_manager.get_results(sort_by=["name"])
        assert len(all_results) == 181
        assert all_results[0].name == "s1-spine1"

        # Check multiple statuses with sort_by categories
        success_skipped_results = result_manager.get_results(status={AntaTestStatus.SUCCESS, AntaTestStatus.SKIPPED}, sort_by=["categories"])
        assert len(success_skipped_results) == 77
        assert success_skipped_results[0].categories == ["avt"]
        assert success_skipped_results[-1].categories == ["vxlan"]

        # Check multiple statuses with sort_by custom_field
        success_skipped_results = result_manager.get_results(status={AntaTestStatus.SUCCESS, AntaTestStatus.SKIPPED}, sort_by=["custom_field"])
        assert len(success_skipped_results) == 77
        assert success_skipped_results[-1].test == "VerifyReloadCause"

        # Check all results with bad sort_by
        with pytest.raises(
            ValueError,
            match=re.escape("Invalid sort_by fields: ['bad_field']."),
        ):
            all_results = result_manager.get_results(sort_by=["bad_field"])

    def test_get_total_results(self, result_manager: ResultManager) -> None:
        """Test ResultManager.get_total_results."""
        # Test all results
        assert result_manager.get_total_results() == 181

        # Test single status
        assert result_manager.get_total_results(status={AntaTestStatus.SUCCESS}) == 43
        assert result_manager.get_total_results(status={AntaTestStatus.FAILURE}) == 103
        assert result_manager.get_total_results(status={AntaTestStatus.ERROR}) == 1
        assert result_manager.get_total_results(status={AntaTestStatus.SKIPPED}) == 34

        # Test multiple statuses
        assert result_manager.get_total_results(status={AntaTestStatus.SUCCESS, AntaTestStatus.FAILURE}) == 146
        assert result_manager.get_total_results(status={AntaTestStatus.SUCCESS, AntaTestStatus.FAILURE, AntaTestStatus.ERROR}) == 147
        assert result_manager.get_total_results(status={AntaTestStatus.SUCCESS, AntaTestStatus.FAILURE, AntaTestStatus.ERROR, AntaTestStatus.SKIPPED}) == 181

    @pytest.mark.parametrize(
        ("status", "error_status", "ignore_error", "expected_status"),
        [
            pytest.param("success", False, True, "success", id="no error"),
            pytest.param("success", True, True, "success", id="error, ignore error"),
            pytest.param("success", True, False, "error", id="error, do not ignore error"),
        ],
    )
    def test_get_status(
        self,
        status: AntaTestStatus,
        error_status: bool,
        ignore_error: bool,
        expected_status: str,
    ) -> None:
        """Test ResultManager.get_status."""
        result_manager = ResultManager()
        result_manager.status = status
        result_manager.error_status = error_status

        assert result_manager.get_status(ignore_error=ignore_error) == expected_status

    def test_filter(self, test_result_factory: Callable[..., TestResult]) -> None:
        """Test ResultManager.filter."""
        result_manager = ResultManager()

        success_list = [test_result_factory(i) for i in range(3)]
        for test in success_list:
            test.result = AntaTestStatus.SUCCESS
        result_manager.results = success_list

        test = test_result_factory()
        test.result = AntaTestStatus.FAILURE
        result_manager.add(test)

        test = test_result_factory()
        test.result = AntaTestStatus.ERROR
        result_manager.add(test)

        test = test_result_factory()
        test.result = AntaTestStatus.SKIPPED
        result_manager.add(test)

        assert len(result_manager) == 6
        assert len(result_manager.filter({AntaTestStatus.FAILURE})) == 5
        assert len(result_manager.filter({AntaTestStatus.ERROR})) == 5
        assert len(result_manager.filter({AntaTestStatus.SKIPPED})) == 5
        assert len(result_manager.filter({AntaTestStatus.FAILURE, AntaTestStatus.ERROR})) == 4
        assert len(result_manager.filter({AntaTestStatus.FAILURE, AntaTestStatus.ERROR, AntaTestStatus.SKIPPED})) == 3
        assert len(result_manager.filter({AntaTestStatus.SUCCESS, AntaTestStatus.FAILURE, AntaTestStatus.ERROR, AntaTestStatus.SKIPPED})) == 0

    def test_get_by_tests(self, test_result_factory: Callable[..., TestResult], result_manager_factory: Callable[[int], ResultManager]) -> None:
        """Test ResultManager.get_by_tests."""
        result_manager = result_manager_factory(3)

        test = test_result_factory()
        test.test = "Test1"
        result_manager.add(test)

        test = test_result_factory()
        test.test = "Test2"
        result_manager.add(test)

        test = test_result_factory()
        test.test = "Test2"
        result_manager.add(test)

        assert len(result_manager) == 6
        assert len(result_manager.filter_by_tests({"Test1"})) == 1
        rm = result_manager.filter_by_tests({"Test1", "Test2"})
        assert len(rm) == 3
        assert len(rm.filter_by_tests({"Test1"})) == 1

    def test_get_by_devices(self, test_result_factory: Callable[..., TestResult], result_manager_factory: Callable[[int], ResultManager]) -> None:
        """Test ResultManager.get_by_devices."""
        result_manager = result_manager_factory(3)

        test = test_result_factory()
        test.name = "Device1"
        result_manager.add(test)

        test = test_result_factory()
        test.name = "Device2"
        result_manager.add(test)

        test = test_result_factory()
        test.name = "Device2"
        result_manager.add(test)

        assert len(result_manager) == 6
        assert len(result_manager.filter_by_devices({"Device1"})) == 1
        rm = result_manager.filter_by_devices({"Device1", "Device2"})
        assert len(rm) == 3
        assert len(rm.filter_by_devices({"Device1"})) == 1

    def test_get_tests(self, test_result_factory: Callable[..., TestResult]) -> None:
        """Test ResultManager.get_tests."""
        result_manager = ResultManager()

        tests = [test_result_factory(i) for i in range(3)]
        for test in tests:
            test.test = "Test1"
        result_manager.results = tests

        test = test_result_factory()
        test.test = "Test2"
        result_manager.add(test)

        assert len(result_manager.get_tests()) == 2
        assert all(t in result_manager.get_tests() for t in ["Test1", "Test2"])

    def test_get_devices(self, test_result_factory: Callable[..., TestResult]) -> None:
        """Test ResultManager.get_tests."""
        result_manager = ResultManager()

        tests = [test_result_factory(i) for i in range(3)]
        for test in tests:
            test.name = "Device1"
        result_manager.results = tests

        test = test_result_factory()
        test.name = "Device2"
        result_manager.add(test)

        assert len(result_manager.get_devices()) == 2
        assert all(t in result_manager.get_devices() for t in ["Device1", "Device2"])

    def test_stats_computation_methods(self, test_result_factory: Callable[..., TestResult], caplog: pytest.LogCaptureFixture) -> None:
        """Test ResultManager internal stats computation methods."""
        result_manager = ResultManager()

        # Initially stats should be unsynced
        assert result_manager._stats_in_sync is False

        # Test _reset_stats
        result_manager._reset_stats()
        assert result_manager._stats_in_sync is False
        assert len(result_manager._device_stats) == 0
        assert len(result_manager._category_stats) == 0
        assert len(result_manager._test_stats) == 0

        # Add some test results
        test1 = test_result_factory()
        test1.name = "device1"
        test1.result = AntaTestStatus.SUCCESS
        test1.categories = ["system"]
        test1.test = "test1"

        test2 = test_result_factory()
        test2.name = "device2"
        test2.result = AntaTestStatus.FAILURE
        test2.categories = ["interfaces"]
        test2.test = "test2"

        result_manager.add(test1)
        result_manager.add(test2)

        # Stats should still be unsynced after adding results
        assert result_manager._stats_in_sync is False

        # Test _compute_stats directly
        with caplog.at_level(logging.INFO):
            result_manager._compute_stats()
        assert "Computing statistics for all results" in caplog.text
        assert result_manager._stats_in_sync is True

        # Verify stats content
        assert len(result_manager._device_stats) == 2
        assert len(result_manager._category_stats) == 2
        assert len(result_manager._test_stats) == 2
        assert result_manager._device_stats["device1"].tests_success_count == 1
        assert result_manager._device_stats["device2"].tests_failure_count == 1
        assert result_manager._category_stats["system"].tests_success_count == 1
        assert result_manager._category_stats["interfaces"].tests_failure_count == 1
        assert result_manager._test_stats["test1"].devices_success_count == 1
        assert result_manager._test_stats["test2"].devices_failure_count == 1

    def test_stats_property_computation(self, test_result_factory: Callable[..., TestResult], caplog: pytest.LogCaptureFixture) -> None:
        """Test that stats are computed only once when accessed via properties."""
        result_manager = ResultManager()

        # Add some test results
        test1 = test_result_factory()
        test1.name = "device1"
        test1.result = AntaTestStatus.SUCCESS
        test1.categories = ["system"]
        result_manager.add(test1)

        test2 = test_result_factory()
        test2.name = "device2"
        test2.result = AntaTestStatus.FAILURE
        test2.categories = ["interfaces"]
        result_manager.add(test2)

        # Stats should be unsynced after adding results
        assert result_manager._stats_in_sync is False
        assert "Computing statistics" not in caplog.text

        # Access device_stats property - should trigger computation
        with caplog.at_level(logging.INFO):
            _ = result_manager.device_stats
        assert "Computing statistics for all results" in caplog.text
        assert result_manager._stats_in_sync is True

        # Clear the log
        caplog.clear()

        # Access other stats properties - should not trigger computation again
        with caplog.at_level(logging.INFO):
            _ = result_manager.category_stats
            _ = result_manager.test_stats
        assert "Computing statistics" not in caplog.text

        # Add another result - should mark stats as unsynced
        test3 = test_result_factory()
        test3.name = "device3"
        test3.result = AntaTestStatus.ERROR
        result_manager.add(test3)
        assert result_manager._stats_in_sync is False

        # Access stats again - should trigger recomputation
        with caplog.at_level(logging.INFO):
            _ = result_manager.device_stats
        assert "Computing statistics for all results" in caplog.text
        assert result_manager._stats_in_sync is True

    def test_sort_by_result(self, test_result_factory: Callable[[], TestResult]) -> None:
        """Test sorting by result."""
        result_manager = ResultManager()
        test1 = test_result_factory()
        test1.result = AntaTestStatus.SUCCESS
        test2 = test_result_factory()
        test2.result = AntaTestStatus.FAILURE
        test3 = test_result_factory()
        test3.result = AntaTestStatus.ERROR

        result_manager.results = [test1, test2, test3]
        sorted_manager = result_manager.sort(["result"])
        assert [r.result for r in sorted_manager.results] == ["error", "failure", "success"]

    def test_sort_by_name(self, test_result_factory: Callable[[], TestResult]) -> None:
        """Test sorting by name."""
        result_manager = ResultManager()
        test1 = test_result_factory()
        test1.name = "Device3"
        test2 = test_result_factory()
        test2.name = "Device1"
        test3 = test_result_factory()
        test3.name = "Device2"

        result_manager.results = [test1, test2, test3]
        sorted_manager = result_manager.sort(["name"])
        assert [r.name for r in sorted_manager.results] == ["Device1", "Device2", "Device3"]

    def test_sort_by_categories(self, test_result_factory: Callable[[], TestResult]) -> None:
        """Test sorting by categories."""
        result_manager = ResultManager()
        test1 = test_result_factory()
        test1.categories = ["VXLAN", "networking"]
        test2 = test_result_factory()
        test2.categories = ["BGP", "routing"]
        test3 = test_result_factory()
        test3.categories = ["system", "hardware"]

        result_manager.results = [test1, test2, test3]
        sorted_manager = result_manager.sort(["categories"])
        results = sorted_manager.results

        assert results[0].categories == ["BGP", "routing"]
        assert results[1].categories == ["VXLAN", "networking"]
        assert results[2].categories == ["system", "hardware"]

    def test_sort_multiple_fields(self, test_result_factory: Callable[[], TestResult]) -> None:
        """Test sorting by multiple fields."""
        result_manager = ResultManager()
        test1 = test_result_factory()
        test1.result = AntaTestStatus.ERROR
        test1.test = "Test3"
        test2 = test_result_factory()
        test2.result = AntaTestStatus.ERROR
        test2.test = "Test1"
        test3 = test_result_factory()
        test3.result = AntaTestStatus.FAILURE
        test3.test = "Test2"

        result_manager.results = [test1, test2, test3]
        sorted_manager = result_manager.sort(["result", "test"])
        results = sorted_manager.results

        assert results[0].result == "error"
        assert results[0].test == "Test1"
        assert results[1].result == "error"
        assert results[1].test == "Test3"
        assert results[2].result == "failure"
        assert results[2].test == "Test2"

    def test_sort_fields_as_none(self, test_result_factory: Callable[[], TestResult]) -> None:
        """Test sorting by multiple fields."""
        result_manager = ResultManager()
        test1 = test_result_factory()
        test1.result = AntaTestStatus.ERROR
        test1.test = "Test3"
        test1.custom_field = "custom"
        test2 = test_result_factory()
        test2.result = AntaTestStatus.ERROR
        test2.test = "Test1"
        test3 = test_result_factory()
        test3.result = AntaTestStatus.FAILURE
        test3.test = "Test2"

        result_manager.results = [test1, test2, test3]
        sorted_manager = result_manager.sort(["custom_field"])
        results = sorted_manager.results

        assert results[0].result == "error"
        assert results[0].test == "Test1"
        assert results[1].result == "failure"
        assert results[1].test == "Test2"
        assert results[2].result == "error"
        assert results[2].test == "Test3"
        assert results[2].custom_field == "custom"

    def test_sort_invalid_field(self) -> None:
        """Test that sort method raises ValueError for invalid sort_by fields."""
        result_manager = ResultManager()
        expected_match = (
            r"Invalid sort_by fields: ['bad_field']. Accepted fields are: "
            r"['name', 'test', 'categories', 'description', 'result', 'messages', 'atomic_results', 'custom_field']"
        )
        with pytest.raises(ValueError, match=re.escape(expected_match)):
            _ = result_manager.sort(["bad_field"])

    def test_sort_is_chainable(self) -> None:
        """Test that the sort method is chainable."""
        result_manager = ResultManager()
        assert isinstance(result_manager.sort(["name"]), ResultManager)

    def test_merge_result_manager(self) -> None:
        """Test the merge_results function."""
        result = ResultManager()
        final_merged_results = ResultManager.merge_results([result])
        assert isinstance(final_merged_results, ResultManager)

    def test_merge_two_result_managers(self, test_result_factory: Callable[[], TestResult]) -> None:
        """Test merging two non-empty ResultManager instances."""
        rm1 = ResultManager()
        test1_rm1 = test_result_factory()
        test1_rm1.name = "device1"
        rm1.add(test1_rm1)
        test2_rm1 = test_result_factory()
        test2_rm1.name = "device2"
        rm1.add(test2_rm1)

        rm2 = ResultManager()
        test1_rm2 = test_result_factory()
        test1_rm2.name = "device3"
        rm2.add(test1_rm2)

        merged_rm = ResultManager.merge_results([rm1, rm2])
        assert len(merged_rm) == 3
        assert {r.name for r in merged_rm.results} == {"device1", "device2", "device3"}

    def test_merge_empty_list(self) -> None:
        """Test merging an empty list of ResultManager instances."""
        merged_rm = ResultManager.merge_results([])
        assert isinstance(merged_rm, ResultManager)
        assert len(merged_rm) == 0