1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
|
# SPDX-FileCopyrightText: All Contributors to the PyTango project
# SPDX-License-Identifier: LGPL-3.0-or-later
import threading
import time
from tango import (
AttrWriteType,
AttributeProxy,
Database,
DeviceProxy,
EnsureOmniThread,
EventType,
InfoIt,
Group,
)
from tango.test_context import MultiDeviceTestContext
from tango.server import Device, attribute, command, device_property
from tango.utils import EventCallback
# <<<<<< If user wants to trace their own things -------
# Dependencies:
# pip or conda install the following libraries:
# opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp-proto-grpc
# API
from opentelemetry import trace as trace_api
# SDK
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import (
SERVICE_NAME,
SERVICE_NAMESPACE,
Resource,
)
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
resource = Resource.create({SERVICE_NAMESPACE: "org.institute", SERVICE_NAME: "my.app"})
provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(OTLPSpanExporter())
provider.add_span_processor(processor)
# Sets the global default tracer provider
trace_api.set_tracer_provider(provider)
# Creates a tracer from the global tracer provider
tracer = trace_api.get_tracer("user.tracer")
# ---- end of user's own tracing. >>>>>>
# import to pass context to a user thread:
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
class Leader(Device):
FollowerTRLs = device_property(dtype=(str,))
@command(dtype_in=int)
@InfoIt(show_args=True)
def TurnFollowerOn(self, follower_id):
self.debug_stream(f"Turning follower {follower_id} on...")
follower_trl = self.FollowerTRLs[follower_id - 1]
follower_device = DeviceProxy(follower_trl)
follower_device.isOn = True
@command(dtype_in=int)
def TurnFollowerOff(self, follower_id):
device_tracer = self.get_telemetry_tracer()
with device_tracer.start_as_current_span("test.leader.span") as span:
span.set_attribute("follower_id", follower_id)
follower_trl = self.FollowerTRLs[follower_id - 1]
follower_device = DeviceProxy(follower_trl)
follower_device.isOn = False
@command(dtype_in=bool, dtype_out=bool)
def SetDeviceTracing(self, enable):
self.set_telemetry_enabled(enable)
return self.is_telemetry_enabled()
@command(dtype_in=bool, dtype_out=bool)
def SetKernelTracing(self, enable):
self.set_kernel_tracing_enabled(enable)
return self.is_kernel_tracing_enabled()
@command(dtype_in=int)
@InfoIt(show_args=True)
def PollFollowerOnOff(self, follower_id):
follower_trl = self.FollowerTRLs[follower_id - 1]
follower_device = DeviceProxy(follower_trl)
user_test_polling_and_events(follower_device)
class Follower(Device):
def init_device(self):
super().init_device()
self._is_on = False
attr = attribute(
name="dynamicAttribute",
access=AttrWriteType.READ,
fget=self.read_dyn_attr,
)
self.add_attribute(attr)
cmd = command(
dtype_in=int,
f=self.DynamicCommand,
)
self.add_command(cmd)
isOn = attribute(access=AttrWriteType.READ_WRITE)
@InfoIt(show_ret=True)
def read_isOn(self) -> bool:
return self._is_on
@InfoIt(show_args=True)
def write_isOn(self, value: bool) -> None:
self._is_on = value
def read_dyn_attr(self, arg) -> int:
print(f"Follower.dynamicAttribute")
return 123
def DynamicCommand(self, arg):
print(f"Follower.DynamicCommand({arg})")
def delete_device(self):
# just to show trace emitted for user code
pass
def dev_state(self):
# just to show trace emitted for user code
return super().dev_state()
devices_info = [
{
"class": Leader,
"devices": (
{
"name": "device/leader/1",
"properties": {
"FollowerTRLs": ["device/follower/1", "device/follower/2"],
},
},
),
},
{
"class": Follower,
"devices": [
{"name": "device/follower/1", "properties": {}},
{"name": "device/follower/2", "properties": {}},
],
},
]
def call_from_thread_with_context(follower: DeviceProxy):
carrier = {}
TraceContextTextMapPropagator().inject(carrier)
print(f"call_from_thread_with_context {carrier=}")
thread = threading.Thread(target=thread_worker, args=(carrier, follower))
thread.start()
time.sleep(0.001)
def thread_worker(carrier, follower):
with EnsureOmniThread():
print(f"worker started for: {follower}")
ctx = TraceContextTextMapPropagator().extract(carrier=carrier)
with tracer.start_as_current_span("thread.worker.call", context=ctx):
user_turn_follower_off_directly(follower)
def user_turn_follower_off_directly(follower: DeviceProxy):
print(f"user_turn_follower_off_directly: {follower}")
follower.isOn = False
def user_test_polling_and_events(follower: DeviceProxy):
print(f"user_toggle_polling {follower.is_attribute_polled('isOn')=}")
follower.poll_attribute("isOn", 10)
print(f"user_toggle_polling {follower.is_attribute_polled('isOn')=}")
user_test_events(follower)
follower.stop_poll_attribute("isOn")
print(f"user_toggle_polling {follower.is_attribute_polled('isOn')=}")
def user_test_events(follower: DeviceProxy):
eid = follower.subscribe_event("isOn", EventType.CHANGE_EVENT, EventCallback())
follower.isOn = False
time.sleep(0.01)
follower.isOn = True
time.sleep(0.01)
follower.unsubscribe_event(eid)
def main():
with MultiDeviceTestContext(devices_info, process=False, timeout=600, debug=3):
leader = DeviceProxy("device/leader/1")
follower_1 = DeviceProxy("device/follower/1")
follower_2 = DeviceProxy("device/follower/2")
# user could create their own span, e.g.:
with tracer.start_as_current_span(
"my.app.main", kind=trace_api.SpanKind.CLIENT
):
for loop in range(2):
with tracer.start_as_current_span(
"my.app.main.inner-loop", kind=trace_api.SpanKind.CLIENT
) as span:
span.set_attribute("operation.value", loop)
# tell leader to enable both followers
leader.command_inout("TurnFollowerOn", 1)
leader.command_inout("TurnFollowerOn", 2)
# check initial state: both followers are on
_ = follower_1.read_attribute("isOn").value
_ = follower_2.isOn
# turn off, using low-level and high-level API
follower_1.write_attribute("isOn", 0)
user_turn_follower_off_directly(follower_2)
call_from_thread_with_context(follower_2)
leader.TurnFollowerOff(1) # FIXME
with tracer.start_as_current_span(
"test.deviceproxy.other", kind=trace_api.SpanKind.CLIENT
):
# read multiple attributes, and check config
_ = follower_2.read_attributes(["isOn", "state"])
_ = follower_1.get_attribute_config("isOn")
_ = follower_1.read_attribute("dynamicAttribute")
_ = follower_1.command_inout("DynamicCommand", 22)
with tracer.start_as_current_span(
"test.polling-and-events", kind=trace_api.SpanKind.CLIENT
):
leader.PollFollowerOnOff(1)
# polling changes not within device context aren't traced yet
follower_1.poll_attribute("isOn", 1000)
follower_1.stop_poll_attribute("isOn")
with tracer.start_as_current_span(
"test.attributeproxy", kind=trace_api.SpanKind.CLIENT
):
follower_1_is_on = AttributeProxy("device/follower/1/isOn")
_ = follower_1_is_on.ping()
reading = follower_1_is_on.read()
print(f"AttributeProxy reading: {reading.value}")
with tracer.start_as_current_span(
"test.group", kind=trace_api.SpanKind.CLIENT
):
group = Group("add-one-at-a-time")
group.add("device/follower/1")
group.add("device/follower/2")
group.ping()
reply = group.read_attribute("isOn")
print(
f"Group reading: 1:{reply[0].get_data().value}, 2:{reply[1].get_data().value}"
)
with tracer.start_as_current_span(
"test.database", kind=trace_api.SpanKind.CLIENT
):
db = Database()
_ = db.get_server_list()
_ = db.put_device_property("sys/tg_test/1", {"foo": "bar"})
prop_val = db.get_device_property("sys/tg_test/1", "foo")
print(f"got property foo: {prop_val}")
_ = db.delete_device_property("sys/tg_test/1", "foo")
if __name__ == "__main__":
main()
# instead of running main, can run as Tango server providing the two classes:
# tango.server.run([Leader, Follower])
"""
Examples:
*** Change code above to use main() instead of tango.server.run(...)
# Run example with telemetry on, traces go to stdout
$ TANGO_TELEMETRY_ENABLE=on python prototyping.py
# Run example with telemetry on, traces to to local collector via gRPC
$ TANGO_TELEMETRY_ENABLE=on TANGO_TELEMETRY_TRACES_EXPORTER=grpc python
*** Change code above to use tango.server.run(...) instead of main()
# Python client with telemetry on, traces go to stdout
# (create your own DeviceProxy)
$ TANGO_TELEMETRY_ENABLE=on python
# Python client with telemetry on, traces to to local collector via gRPC
# (create your own DeviceProxy)
$ TANGO_TELEMETRY_ENABLE=on TANGO_TELEMETRY_TRACES_EXPORTER=grpc python
# Run Leader device server, telemetry disabled
$ python prototyping.py Leader --host=127.0.0.1 -v3
# Run Leader device server, telemetry on, traces and logs to local collector via gRPC
$ TANGO_TELEMETRY_ENABLE=on TANGO_TELEMETRY_TRACES_EXPORTER=grpc TANGO_TELEMETRY_LOGS_EXPORTER=grpc python prototyping.py Leader --host=127.0.0.1 -v3
# Run Follower device server, telemetry on, traces and logs to local collector via gRPC
# Also include additional process info in the traces using experimental OpenTelemetry
# resource detector.
$ TANGO_TELEMETRY_ENABLE=on TANGO_TELEMETRY_TRACES_EXPORTER=grpc TANGO_TELEMETRY_LOGS_EXPORTER=grpc OTEL_EXPERIMENTAL_RESOURCE_DETECTORS=process python prototyping.py Follower --host=127.0.0.1 -v3
"""
|