File: alarm.py

package info (click to toggle)
python-pyepics 3.4.1%2Bds-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 2,080 kB
  • sloc: python: 11,184; makefile: 106; javascript: 104; sh: 1
file content (154 lines) | stat: -rw-r--r-- 5,817 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
 #!/usr/bin/env python
#  M Newville <newville@cars.uchicago.edu>
#  The University of Chicago, 2010
#  Epics Open License
"""
alarm module -- Alarm class
"""
import sys
import time
import operator
from . import pv
from .utils import is_string

class Alarm(object):
    """ alarm class for a PV:
    run a user-supplied callback when a PV's value goes out of range

    quick synopsis:
       The supplied callback will be run when a comparison of the PV's
       value and a trip point is True.  An optional alert delay can be
       set to limit how frequently the callback is run

    arguments:
       pvname         name of PV for which to set alarm
       trip_point     value of trip point
       comparison     a string for the comparison operation: one of
                          'eq', 'ne', 'le', 'lt', 'ge', 'gt'
                          '==', '!=', '<=', '<' , '>=', '>'
       callback       function to run when comparison(value,trip_point) is True
       alert_delay    time (in seconds) to stay quiet after executing a callback.
                      this is a minimum time, as it is checked only when a PVs
                      value actually changes.  See note below.

    example:
       >>> from epics import alarm, poll
       >>> def alarmHandler(pvname=None, value=None, **kw):
       >>>     print 'Alarm!! ', pvname, value
       >>> alarm(pvname = 'XX.VAL',
       >>>       comparison='gt',
       >>>       callback = alarmHandler,
       >>>       trip_point=2.0,
       >>>       alert_delay=600)
       >>> while True:
       >>>     poll()

    when 'XX.VAL' exceeds (is 'gt') 2.0, the alarmHandler will be called.

    notes:
      alarm_delay:  The alarm delay avoids over-notification by specifying a

                    time period to NOT send messages after a message has been
                    sent, even if a value is changing and out-of-range.  Since
                    Epics callback are used to process events, the alarm state
                    will only be checked when a PV's value changes.

      callback function:  the user-supplied callback function should be prepared
                    for a large number of keyword arguments: use **kw!!!
                    For further explanation, see notes in pv.py.

                    These keyword arguments will always be included:

                    pvname      name of PV
                    value       current value of PV
                    char_value  text string for PV
                    trip_point  will hold the trip point used to define 'out of range'
                    comparison  string
                    self.user_callback(pvname=pvname, value=value,
                                   char_value=char_value,
                                   trip_point=self.trip_point,
                                   comparison=self.cmp.__name__, **kw)

    """
    ops = {'eq': operator.__eq__,
           '==': operator.__eq__,
           'ne': operator.__ne__,
           '!=': operator.__ne__,
           'le': operator.__le__,
           '<=': operator.__le__,
           'lt': operator.__lt__,
           '<' : operator.__lt__,
           'ge': operator.__ge__,
           '>=': operator.__ge__,
           'gt': operator.__gt__,
           '>' : operator.__gt__ }

    def __init__(self, pvname, comparison=None, trip_point=None,
                 callback=None, alert_delay=10):

        if isinstance(pvname, pv.PV):
            self.pv = pvname
        elif is_string(pvname):
            self.pv = pv.get_pv(pvname)
            self.pv.connect()

        if self.pv is None or comparison is None or trip_point is None:
            msg = 'alarm requires valid PV, comparison, and trip_point'
            raise UserWarning(msg)


        self.trip_point = trip_point

        self.last_alert  = 0
        self.alert_delay = alert_delay
        self.user_callback = callback

        self.cmp = None
        self.comp_name = 'Not Defined'
        if callable(comparison):
            self.comp_name  = comparison.__name__
            self.cmp = comparison
        elif comparison is not None:
            self.cmp   = self.ops.get(comparison.replace('_', ''), None)
            if self.cmp is not None:
                self.comp_name  = comparison

        self.alarm_state = False
        self.pv.add_callback(self.check_alarm)
        self.check_alarm()

    def __repr__(self):
        return "<Alarm '%s', comp=%s, trip_point=%s >" % (self.pv.name,
                                                          self.comp_name,
                                                          self.trip_point)
    def reset(self):
        "resets the alarm state"
        self.last_alert = 0
        self.alarm_state = False

    def check_alarm(self, pvname=None, value=None, char_value=None, **kw):
        """checks alarm status, act if needed.
        """
        if (pvname is None or value is None or
            self.cmp is None or self.trip_point is None): return

        val = value
        if char_value is None:
            char_value = value
        old_alarm_state  = self.alarm_state
        self.alarm_state =  self.cmp(val, self.trip_point)

        now = time.time()

        if (self.alarm_state and not old_alarm_state and
            ((now - self.last_alert) > self.alert_delay)) :
            self.last_alert = now
            if callable(self.user_callback):
                self.user_callback(pvname=pvname, value=value,
                                   char_value=char_value,
                                   trip_point=self.trip_point,
                                   comparison=self.comp_name, **kw)

            else:
                sys.stdout.write('Alarm: %s=%s (%s)\n' % (pvname, char_value,
                                                          time.ctime()))