1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
|
```{eval-rst}
.. currentmodule:: tango
```
(multiprocessing)=
# Multiprocessing/Multithreading
## Using clients with multiprocessing
Since version 9.3.0 PyTango provides {meth}`tango.ApiUtil.cleanup`
which resets CORBA connection.
This static function is needed when you want to use {mod}`tango` with
{mod}`multiprocessing` in your client code.
In the case when both your parent process and your child process create
{class}`~tango.DeviceProxy`, {class}`~tango.Database`, {class}`~tango.Group`,
or/and {class}`~tango.AttributeProxy`
your child process inherits the context from your parent process,
i.e. open file descriptors, the TANGO and the CORBA state.
Sharing the above objects between the processes may cause unpredictable
errors, e.g., *TRANSIENT_CallTimedout*, *unidentifiable C++ exception*.
Therefore, when you start a new process you must reset CORBA connection:
```python
import time
import tango
from multiprocessing import Process
class Worker(Process):
def run(self):
# reset CORBA connection
tango.ApiUtil.cleanup()
proxy = tango.DeviceProxy("sys/tg_test/1")
stime = time.time()
etime = stime
while etime - stime < 1.:
try:
proxy.read_attribute("double_scalar")
except Exception as e:
print(str(e))
etime = time.time()
def run_workers():
workers = [Worker() for _ in range(6)]
for wk in workers:
wk.start()
for wk in workers:
wk.join()
db = tango.Database()
dp = tango.DeviceProxy("sys/tg_test/1")
if __name__ == '__main__':
for i in range(4):
run_workers()
```
After `cleanup()` all references to {class}`~tango.DeviceProxy`,
{class}`~tango.AttributeProxy`, {class}`~tango.Group` or {class}`~tango.Database` objects
in the current process become invalid
and these objects need to be reconstructed.
## Multithreading - clients and servers
When performing Tango I/O from user-created threads, there can be problems.
This is often more noticeable with event subscription/unsubscription, and
when pushing events, but it could affect any Tango I/O.
A client subscribing and unsubscribing to events via a user thread may see
a crash, a deadlock, or `Event channel is not responding anymore` errors.
A device server pushing events from a user-created thread (including asyncio
callbacks) might see `Not able to acquire serialization (dev, class or process) monitor`
errors, if it is using the default [green mode](#green-modes-overview) {obj}`tango.GreenMode.Synchronous`.
If the device server is using an asynchronous green mode, i.e., {obj}`tango.GreenMode.Gevent` or
{obj}`tango.GreenMode.Asyncio`, then Tango's [device server serialisation](https://tango-controls.readthedocs.io/en/latest/development/advanced/threading.html#serialization-model-within-a-device-server)
is disabled - see the [green mode warning](#green-modes-no-sync-warning). This means you are
likely to see a crash when pushing events from a user thread, especially if an attribute
is read around the same time. The method described below **WILL NOT** help
for this. There is no solution (at least with cppTango 9.5.0 and PyTango 9.5.0, and earlier).
As PyTango wraps the cppTango library, we need to consider how cppTango's threads work.
cppTango was originally developed at a time where C++ didn't have standard
threads. All the threads currently created in cppTango are "omni threads",
since this is what the omniORB library is using to create threads and since
this implementation is available for free with omniORB.
In C++, users used to create omni threads in the past so there was no issue.
Since C++11, C++ comes with an implementation of standard threads.
cppTango is currently (version 9.4.1) not directly thread safe when
a user is using C++11 standard threads or threads different than omni threads.
This lack of thread safety includes threads created from Python's
{mod}`threading` module.
In an ideal future cppTango should protect itself, regardless
of what type of threads are used. In the meantime, we need a work-around.
The work-around when using threads which are not omni threads is to create an
object of the C++ class `omni_thread::ensure_self` in the user thread, just
after the thread creation, and to delete this object only when the thread
has finished its job. This `omni_thread::ensure_self` object provides a
dummy omniORB ID for the thread. This ID is used when accessing thread
locks within cppTango, so the ID must remain the same for the lifetime
of the thread. Also note that this object MUST be released before the
thread has exited, otherwise omniORB will throw an exception.
A Pythonic way to implement this work-around for multithreaded
applications is available via the {class}`~tango.EnsureOmniThread` class.
It was added in PyTango version 9.3.2. This class is best used as a
context handler to wrap the target method of the user thread. An example
is shown below:
```
import tango
from threading import Thread
from time import sleep
def thread_task():
with tango.EnsureOmniThread():
eid = dp.subscribe_event(
"double_scalar", tango.EventType.PERIODIC_EVENT, cb)
while running:
print(f"num events stored {len(cb.get_events())}")
sleep(1)
dp.unsubscribe_event(eid)
cb = tango.utils.EventCallback() # print events to stdout
dp = tango.DeviceProxy("sys/tg_test/1")
dp.poll_attribute("double_scalar", 1000)
thread = Thread(target=thread_task)
running = True
thread.start()
sleep(5)
running = False
thread.join()
```
Another way to create threads in Python is the
{class}`concurrent.futures.ThreadPoolExecutor`. The problem with this is that
the API does not provide an easy way for the context handler to cover the
lifetime of the threads, which are created as daemons. There are several options here:
1. PyTango has its own {class}`~tango.utils.PyTangoThreadPoolExecutor` (`tango.utils`).
It is based on the standard {class}`concurrent.futures.ThreadPoolExecutor`
and does patching with {class}`~tango.EnsureOmniThread` of the each thread at the startup time.
2. A second option is to at least use the context handler for the functions that are submitted to the
executor. I.e., `executor.submit(thread_task)`. This is not guaranteed to work.
|