File: run_bliss_scans.py

package info (click to toggle)
python-ewoksdata 0.6.0~rc0-4
  • links: PTS, VCS
  • area: main
  • in suites: sid, trixie
  • size: 292 kB
  • sloc: python: 1,661; makefile: 3
file content (186 lines) | stat: -rw-r--r-- 5,906 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
"""
Bliss repository (used in production on the publishing side):

* Bliss branch master: blissdata version 2.0.0 (python_requires = >=3.9)
* Bliss branch  2.1.x: blissdata version 1.1.2 (python_requires = >=3.8)
* Bliss branch  2.0.x: blissdata version 1.0.3 (python_requires = >=3.8, <3.10)
* Bliss branch 1.11.x: blissdata version 0.3.4 (python_requires = >=3.7, <3.10)
"""

import bliss  # noqa F401 patch environment the way Bliss wants it (not standard)

import os
import sys
from pprint import pprint

if sys.version_info >= (3, 8):
    from importlib.metadata import version
else:
    from importlib_metadata import version
from packaging.specifiers import SpecifierSet

import gevent
import blissdemo
from bliss.config import static
from bliss.shell import standard

from ewoksdata.data.blissdata import last_lima_image
from ewoksdata.data.blissdata import iter_bliss_scan_data_from_memory
from ewoksdata.data.blissdata import iter_bliss_scan_data_from_memory_slice


_BLISS_VERSION = version("bliss")
_BLISSDATA_VERSION = version("blissdata")

if _BLISS_VERSION in SpecifierSet("<2", prereleases=True):
    from bliss.data.node import get_session_node
else:
    from blissdata.beacon.data import BeaconData
    from blissdata.redis_engine.store import DataStore
    from blissdata.redis_engine.exceptions import NoScanAvailable


def test_iter_memory(scan_key) -> None:
    lima_names = ["difflab6"]
    counter_names = ["diode1"]
    print(f"Iterate scan {scan_key} ...")
    n = 0
    for data in iter_bliss_scan_data_from_memory(scan_key, lima_names, counter_names):
        pprint({k: v.shape for k, v in data.items()})
        n += 1
    assert n == 10
    print(f"Received all data from {scan_key}.")


def test_iter_slice_memory(scan_key) -> None:
    lima_names = ["difflab6"]
    counter_names = ["diode1"]
    print(f"Iterate scan slice {scan_key} ...")
    n = 0
    for data in iter_bliss_scan_data_from_memory_slice(
        scan_key, lima_names, counter_names, slice_range=(3, 5)
    ):
        pprint({k: v.shape for k, v in data.items()})
        n += 1
    assert n == 2
    print(f"Received all data from {scan_key}.")


if _BLISSDATA_VERSION in SpecifierSet("<1", prereleases=True):

    def test_last_lima_image(scan_key) -> None:
        print(f"Get last 'difflab6' image from {scan_key} ...")
        gevent.sleep(5)

        db_name = f"{scan_key}:timer:difflab6:image"

        image = last_lima_image(db_name)
        pprint({"difflab6": image.shape})
        assert image.ndim == 2

elif _BLISSDATA_VERSION in SpecifierSet("<2", prereleases=True):

    def test_last_lima_image(scan_key) -> None:
        print(f"Get last 'difflab6' image from {scan_key} ...")
        gevent.sleep(5)

        redis_url = BeaconData().get_redis_data_db()
        data_store = DataStore(redis_url)
        scan = data_store.load_scan(scan_key)
        channel_info = scan.streams["difflab6:image"].info

        image = last_lima_image(channel_info)
        pprint({"difflab6": image.shape})
        assert image.ndim == 2

else:

    def test_last_lima_image(scan_key) -> None:
        print(f"Get last 'difflab6' image from {scan_key} ...")
        gevent.sleep(5)

        image = last_lima_image(scan_key, "difflab6")
        pprint({"difflab6": image.shape})
        assert image.ndim == 2


if _BLISSDATA_VERSION in SpecifierSet("<1", prereleases=True):
    _TESTS = test_iter_memory, test_last_lima_image
else:
    _TESTS = test_iter_memory, test_last_lima_image, test_iter_slice_memory


if _BLISS_VERSION in SpecifierSet("<2", prereleases=True):

    def init_execute_tests() -> None:
        session = get_session_node("demo_session")
        scan_types = ("scan", "scan_group")
        event_iterator = session.walk_on_new_events(exclude_children=scan_types)
        return (event_iterator,)

    def execute_tests(event_iterator) -> None:
        it_tests = iter(_TESTS)
        run_test = next(it_tests)

        for ev in event_iterator:
            if ev.type == ev.type.NEW_NODE and ev.node.type == "scan":
                db_name = ev.node.db_name
                run_test(db_name)
                try:
                    run_test = next(it_tests)
                except StopIteration:
                    break

else:

    def init_execute_tests() -> None:
        redis_url = BeaconData().get_redis_data_db()
        data_store = DataStore(redis_url)
        since = data_store.get_last_scan_timetag()
        return data_store, since

    def execute_tests(data_store, since) -> None:
        for run_test in _TESTS:
            while True:
                try:
                    since, scan_key = data_store.get_next_scan(since=since, timeout=1)
                    run_test(scan_key)
                    break
                except NoScanAvailable:
                    pass


def run_scans(session, nscans):
    loopscan = session.env_dict["loopscan"]
    detectors = session.env_dict["difflab6"], session.env_dict["diode1"]
    for _ in range(nscans):
        print("Scan starts ...")
        loopscan(10, 0.1, *detectors)
        print("Scan finished.")


def start_bliss_session():
    config = static.get_config()
    bliss_session = config.get("demo_session")
    if _BLISS_VERSION in SpecifierSet(">=2.1.0dev0", prereleases=True):
        bliss_session.active_session()

    env_dict = dict()
    env_dict.update(standard.__dict__)

    assert bliss_session.setup(env_dict=env_dict), "Session setup failed"
    return bliss_session


if __name__ == "__main__":
    os.environ.setdefault("BEACON_HOST", "localhost:10001")
    os.environ.setdefault("TANGO_HOST", "localhost:10000")
    os.environ.setdefault("DEMO_ROOT", blissdemo.__path__[0])

    session = start_bliss_session()
    args = init_execute_tests()
    tests = gevent.spawn(execute_tests, *args)

    nscans = len(_TESTS)
    run_scans(session, nscans)
    tests.get(timeout=nscans * 60 + 30)