File: test_wipe.py

package info (click to toggle)
fdb 5.20.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 89,268 kB
  • sloc: cpp: 40,830; python: 5,079; sh: 4,996; makefile: 32; ansic: 8
file content (124 lines) | stat: -rw-r--r-- 3,965 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from pyfdb import FDB
from pyfdb.pyfdb_iterator import WipeElementType, _WipeElementType


def test_wipe_enum_mapping():
    assert WipeElementType.ERROR.value == _WipeElementType.ERROR.value
    assert WipeElementType.CATALOGUE_INFO.value == _WipeElementType.CATALOGUE_INFO.value
    assert WipeElementType.CATALOGUE.value == _WipeElementType.CATALOGUE.value
    assert WipeElementType.CATALOGUE_INDEX.value == _WipeElementType.CATALOGUE_INDEX.value
    assert WipeElementType.CATALOGUE_SAFE.value == _WipeElementType.CATALOGUE_SAFE.value
    assert WipeElementType.CATALOGUE_CONTROL.value == _WipeElementType.CATALOGUE_CONTROL.value
    assert WipeElementType.STORE.value == _WipeElementType.STORE.value
    assert WipeElementType.STORE_AUX.value == _WipeElementType.STORE_AUX.value
    assert WipeElementType.STORE_SAFE.value == _WipeElementType.STORE_SAFE.value
    assert WipeElementType.UNKNOWN.value == _WipeElementType.UNKNOWN.value


def test_wipe_dryrun(read_write_fdb_setup):
    fdb = FDB(read_write_fdb_setup)

    elements = list(fdb.list({"class": "ea"}))
    assert len(elements) > 0

    wipe_iterator = fdb.wipe({"class": "ea"})
    wiped_elements = list(wipe_iterator)
    assert len(wiped_elements) > 0

    for el in wiped_elements:
        print(el)

    elements_after_wipe = list(fdb.list({"class": "ea"}))
    assert len(elements) == len(elements_after_wipe)


def test_wipe_all_doit(read_write_fdb_setup):
    fdb = FDB(read_write_fdb_setup)

    elements = list(fdb.list({"class": "ea"}))
    assert len(elements) > 0

    wipe_iterator = fdb.wipe({"class": "ea"}, doit=True)
    wiped_elements = list(wipe_iterator)
    assert len(wiped_elements) > 0

    elements_after_wipe = list(fdb.list({"class": "ea"}))
    print(f"#Elements before: {len(elements)}, Elements after: {len(elements_after_wipe)}")
    assert len(elements) > len(elements_after_wipe)


def test_wipe_single_date_doit(read_write_fdb_setup):
    fdb = FDB(read_write_fdb_setup)

    elements = list(fdb.list({"class": "ea"}))
    assert len(elements) > 0

    wipe_iterator = fdb.wipe({"class": "ea", "date": "20200101"}, doit=True)
    wiped_elements = list(wipe_iterator)
    assert len(wiped_elements) > 0

    elements_after_wipe = list(fdb.list({"class": "ea"}))
    print(f"#Elements before: {len(elements)}, Elements after: {len(elements_after_wipe)}")
    assert len(elements) > len(elements_after_wipe)
    assert len(elements) == 96
    assert len(elements_after_wipe) == 72


def populate_fdb(fdb: FDB):
    selection = {
        "class": "rd",
        "expver": "xxxx",
        "stream": "oper",
        "type": "fc",
        "date": "20000101",
        "time": "0000",
        "domain": "g",
        "levtype": "pl",
        "levelist": "300",
        "param": "138",
        "step": "0",
    }

    # Write 4 fields to the FDB based on BASE_REQUEST
    requests = [selection.copy() for _ in range(4)]

    # Modify on each of the 3 levels of the schema
    requests[1]["step"] = "1"
    requests[2]["date"] = "20000102"
    requests[3]["levtype"] = "sfc"
    del requests[3]["levelist"]

    number_of_fields = 4

    data = b"-1 Kelvin"
    for i in range(number_of_fields):
        key = requests[i]
        key = [(k, v) for k, v in key.items()]
        fdb.archive(identifier=key, data=data)
    fdb.flush()

    return number_of_fields


def test_wipe_list(empty_fdb_setup):
    fdb = FDB(empty_fdb_setup)

    number_of_fields = populate_fdb(fdb)
    assert len(list(fdb.list({}))) == number_of_fields

    # Wipe without doit: Do not actually delete anything.
    wipe_iterator = fdb.wipe({"class": "rd"})

    assert len(list(fdb.list({}))) == number_of_fields

    # Wipe, do it
    wipe_iterator = fdb.wipe({"class": "rd"}, doit=True)

    # Consume all wipe iterator elements
    for el in wipe_iterator:
        print(el.msg())
        print(el.type())
        print(el.uris())

    elements = list(fdb.list({}))
    assert len(elements) == 0