File: prototyping.py

package info (click to toggle)
pytango 10.1.4-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 8,304 kB
  • sloc: python: 27,795; cpp: 16,150; sql: 252; sh: 152; makefile: 43
file content (316 lines) | stat: -rw-r--r-- 11,418 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
# SPDX-FileCopyrightText: All Contributors to the PyTango project
# SPDX-License-Identifier: LGPL-3.0-or-later

import threading
import time

from tango import (
    AttrWriteType,
    AttributeProxy,
    Database,
    DeviceProxy,
    EnsureOmniThread,
    EventType,
    InfoIt,
    Group,
)
from tango.test_context import MultiDeviceTestContext
from tango.server import Device, attribute, command, device_property
from tango.utils import EventCallback

# <<<<<< If user wants to trace their own things -------
# Dependencies:
# pip or conda install the following libraries:
#   opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp-proto-grpc

# API
from opentelemetry import trace as trace_api

# SDK
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import (
    SERVICE_NAME,
    SERVICE_NAMESPACE,
    Resource,
)
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

resource = Resource.create({SERVICE_NAMESPACE: "org.institute", SERVICE_NAME: "my.app"})

provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(OTLPSpanExporter())
provider.add_span_processor(processor)

# Sets the global default tracer provider
trace_api.set_tracer_provider(provider)

# Creates a tracer from the global tracer provider
tracer = trace_api.get_tracer("user.tracer")
# ---- end of user's own tracing. >>>>>>

# import to pass context to a user thread:
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator


class Leader(Device):
    FollowerTRLs = device_property(dtype=(str,))

    @command(dtype_in=int)
    @InfoIt(show_args=True)
    def TurnFollowerOn(self, follower_id):
        self.debug_stream(f"Turning follower {follower_id} on...")
        follower_trl = self.FollowerTRLs[follower_id - 1]
        follower_device = DeviceProxy(follower_trl)
        follower_device.isOn = True

    @command(dtype_in=int)
    def TurnFollowerOff(self, follower_id):
        device_tracer = self.get_telemetry_tracer()
        with device_tracer.start_as_current_span("test.leader.span") as span:
            span.set_attribute("follower_id", follower_id)
            follower_trl = self.FollowerTRLs[follower_id - 1]
            follower_device = DeviceProxy(follower_trl)
            follower_device.isOn = False

    @command(dtype_in=bool, dtype_out=bool)
    def SetDeviceTracing(self, enable):
        self.set_telemetry_enabled(enable)
        return self.is_telemetry_enabled()

    @command(dtype_in=bool, dtype_out=bool)
    def SetKernelTracing(self, enable):
        self.set_kernel_tracing_enabled(enable)
        return self.is_kernel_tracing_enabled()

    @command(dtype_in=int)
    @InfoIt(show_args=True)
    def PollFollowerOnOff(self, follower_id):
        follower_trl = self.FollowerTRLs[follower_id - 1]
        follower_device = DeviceProxy(follower_trl)
        user_test_polling_and_events(follower_device)


class Follower(Device):
    def init_device(self):
        super().init_device()
        self._is_on = False

        attr = attribute(
            name="dynamicAttribute",
            access=AttrWriteType.READ,
            fget=self.read_dyn_attr,
        )
        self.add_attribute(attr)
        cmd = command(
            dtype_in=int,
            f=self.DynamicCommand,
        )
        self.add_command(cmd)

    isOn = attribute(access=AttrWriteType.READ_WRITE)

    @InfoIt(show_ret=True)
    def read_isOn(self) -> bool:
        return self._is_on

    @InfoIt(show_args=True)
    def write_isOn(self, value: bool) -> None:
        self._is_on = value

    def read_dyn_attr(self, arg) -> int:
        print(f"Follower.dynamicAttribute")
        return 123

    def DynamicCommand(self, arg):
        print(f"Follower.DynamicCommand({arg})")

    def delete_device(self):
        # just to show trace emitted for user code
        pass

    def dev_state(self):
        # just to show trace emitted for user code
        return super().dev_state()


devices_info = [
    {
        "class": Leader,
        "devices": (
            {
                "name": "device/leader/1",
                "properties": {
                    "FollowerTRLs": ["device/follower/1", "device/follower/2"],
                },
            },
        ),
    },
    {
        "class": Follower,
        "devices": [
            {"name": "device/follower/1", "properties": {}},
            {"name": "device/follower/2", "properties": {}},
        ],
    },
]


def call_from_thread_with_context(follower: DeviceProxy):
    carrier = {}
    TraceContextTextMapPropagator().inject(carrier)
    print(f"call_from_thread_with_context {carrier=}")
    thread = threading.Thread(target=thread_worker, args=(carrier, follower))
    thread.start()
    time.sleep(0.001)


def thread_worker(carrier, follower):
    with EnsureOmniThread():
        print(f"worker started for: {follower}")
        ctx = TraceContextTextMapPropagator().extract(carrier=carrier)
        with tracer.start_as_current_span("thread.worker.call", context=ctx):
            user_turn_follower_off_directly(follower)


def user_turn_follower_off_directly(follower: DeviceProxy):
    print(f"user_turn_follower_off_directly: {follower}")
    follower.isOn = False


def user_test_polling_and_events(follower: DeviceProxy):
    print(f"user_toggle_polling {follower.is_attribute_polled('isOn')=}")
    follower.poll_attribute("isOn", 10)
    print(f"user_toggle_polling {follower.is_attribute_polled('isOn')=}")
    user_test_events(follower)
    follower.stop_poll_attribute("isOn")
    print(f"user_toggle_polling {follower.is_attribute_polled('isOn')=}")


def user_test_events(follower: DeviceProxy):
    eid = follower.subscribe_event("isOn", EventType.CHANGE_EVENT, EventCallback())
    follower.isOn = False
    time.sleep(0.01)
    follower.isOn = True
    time.sleep(0.01)
    follower.unsubscribe_event(eid)


def main():
    with MultiDeviceTestContext(devices_info, process=False, timeout=600, debug=3):
        leader = DeviceProxy("device/leader/1")
        follower_1 = DeviceProxy("device/follower/1")
        follower_2 = DeviceProxy("device/follower/2")
        # user could create their own span, e.g.:
        with tracer.start_as_current_span(
            "my.app.main", kind=trace_api.SpanKind.CLIENT
        ):
            for loop in range(2):
                with tracer.start_as_current_span(
                    "my.app.main.inner-loop", kind=trace_api.SpanKind.CLIENT
                ) as span:
                    span.set_attribute("operation.value", loop)

                    # tell leader to enable both followers
                    leader.command_inout("TurnFollowerOn", 1)
                    leader.command_inout("TurnFollowerOn", 2)

                    # check initial state: both followers are on
                    _ = follower_1.read_attribute("isOn").value
                    _ = follower_2.isOn

                    # turn off, using low-level and high-level API
                    follower_1.write_attribute("isOn", 0)
                    user_turn_follower_off_directly(follower_2)

                    call_from_thread_with_context(follower_2)

                    leader.TurnFollowerOff(1)  # FIXME

                    with tracer.start_as_current_span(
                        "test.deviceproxy.other", kind=trace_api.SpanKind.CLIENT
                    ):
                        # read multiple attributes, and check config
                        _ = follower_2.read_attributes(["isOn", "state"])
                        _ = follower_1.get_attribute_config("isOn")
                        _ = follower_1.read_attribute("dynamicAttribute")
                        _ = follower_1.command_inout("DynamicCommand", 22)

                    with tracer.start_as_current_span(
                        "test.polling-and-events", kind=trace_api.SpanKind.CLIENT
                    ):
                        leader.PollFollowerOnOff(1)
                        # polling changes not within device context aren't traced yet
                        follower_1.poll_attribute("isOn", 1000)
                        follower_1.stop_poll_attribute("isOn")

                    with tracer.start_as_current_span(
                        "test.attributeproxy", kind=trace_api.SpanKind.CLIENT
                    ):
                        follower_1_is_on = AttributeProxy("device/follower/1/isOn")
                        _ = follower_1_is_on.ping()
                        reading = follower_1_is_on.read()
                        print(f"AttributeProxy reading: {reading.value}")

                    with tracer.start_as_current_span(
                        "test.group", kind=trace_api.SpanKind.CLIENT
                    ):
                        group = Group("add-one-at-a-time")
                        group.add("device/follower/1")
                        group.add("device/follower/2")
                        group.ping()
                        reply = group.read_attribute("isOn")
                        print(
                            f"Group reading: 1:{reply[0].get_data().value}, 2:{reply[1].get_data().value}"
                        )

                    with tracer.start_as_current_span(
                        "test.database", kind=trace_api.SpanKind.CLIENT
                    ):
                        db = Database()
                        _ = db.get_server_list()
                        _ = db.put_device_property("sys/tg_test/1", {"foo": "bar"})
                        prop_val = db.get_device_property("sys/tg_test/1", "foo")
                        print(f"got property foo: {prop_val}")
                        _ = db.delete_device_property("sys/tg_test/1", "foo")


if __name__ == "__main__":
    main()

    # instead of running main, can run as Tango server providing the two classes:
    # tango.server.run([Leader, Follower])

"""
Examples:

*** Change code above to use main() instead of tango.server.run(...)

# Run example with telemetry on, traces go to stdout
$ TANGO_TELEMETRY_ENABLE=on python prototyping.py

# Run example with telemetry on, traces to to local collector via gRPC
$ TANGO_TELEMETRY_ENABLE=on TANGO_TELEMETRY_TRACES_EXPORTER=grpc python

*** Change code above to use tango.server.run(...) instead of main()

# Python client with telemetry on, traces go to stdout
#  (create your own DeviceProxy)
$ TANGO_TELEMETRY_ENABLE=on python

# Python client with telemetry on, traces to to local collector via gRPC
# (create your own DeviceProxy)
$ TANGO_TELEMETRY_ENABLE=on TANGO_TELEMETRY_TRACES_EXPORTER=grpc python

# Run Leader device server, telemetry disabled
$ python prototyping.py Leader --host=127.0.0.1 -v3

# Run Leader device server, telemetry on, traces and logs to local collector via gRPC
$ TANGO_TELEMETRY_ENABLE=on TANGO_TELEMETRY_TRACES_EXPORTER=grpc TANGO_TELEMETRY_LOGS_EXPORTER=grpc python prototyping.py Leader --host=127.0.0.1 -v3

# Run Follower device server, telemetry on, traces and logs to local collector via gRPC
# Also include additional process info in the traces using experimental OpenTelemetry
# resource detector.
$ TANGO_TELEMETRY_ENABLE=on TANGO_TELEMETRY_TRACES_EXPORTER=grpc TANGO_TELEMETRY_LOGS_EXPORTER=grpc OTEL_EXPERIMENTAL_RESOURCE_DETECTORS=process python prototyping.py Follower --host=127.0.0.1 -v3

"""