1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
|
pypushflow |version|
====================
*pypushflow* is a task scheduler for cyclic and acyclic graphs.
*pypushflow* has been developed by the `Software group <http://www.esrf.eu/Instrumentation/software>`_ of the `European Synchrotron <https://www.esrf.eu/>`_.
Getting started
---------------
Install requirements
.. code:: bash
pip install pypushflow[mx]
Use the `mx` option for installation at MX beamlines.
Create and execute a workflow
.. code:: python
import logging
from pypushflow.Workflow import Workflow
from pypushflow.StopActor import StopActor
from pypushflow.StartActor import StartActor
from pypushflow.PythonActor import PythonActor
from pypushflow.ThreadCounter import ThreadCounter
class MyWorkflow(Workflow):
def __init__(self, name):
super().__init__(name, level=logging.DEBUG)
ctr = ThreadCounter(parent=self)
self.startActor = StartActor(parent=self, thread_counter=ctr)
self.pythonActor = PythonActor(
parent=self,
script="pypushflow.tests.tasks.pythonActorTest.py",
name="Python Actor Test",
thread_counter=ctr,
)
self.stopActor = StopActor(parent=self, thread_counter=ctr)
self.startActor.connect(self.pythonActor)
self.pythonActor.connect(self.stopActor)
testMyWorkflow = MyWorkflow("Test workflow")
inData = {"name": "World"}
outData = testMyWorkflow.run(inData, timeout=15, pool_type="process")
assert outData["reply"] == "Hello World!"
Run the tests
.. code:: bash
pip install pypushflow[test]
pytest --pyargs pypushflow.tests
Documentation
-------------
.. toctree::
:maxdepth: 2
concurrency
api
|