File: save_restore.py

package info (click to toggle)
python-pyepics 3.4.1%2Bds-3
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 2,080 kB
  • sloc: python: 11,184; makefile: 106; javascript: 104; sh: 1
file content (206 lines) | stat: -rwxr-xr-x 7,593 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
#!/usr/bin/env python
"""
A python module that uses pyepics to save/restore sets of pvs from files.

Copyright 2011 Angus Gratton <angus.gratton@anu.edu.au>
Australian National University
EPICS Open License

The module is intended to be compatible with the 'autosave' module format used in synApps.

Files -

xxx.req - A request file with a list of pvs to save. Format is the same as autosave request format,
          including being able to have "file yyy.req VAR=A,OTHER=B" style macro expansions.

xxx.sav - A saved file with the current PV values, to save/restore. Standalone file, this is a
          compatible format to the .sav files which are used by autosave.

This module requires/uses pyparsing parser framework. Debian/Ubuntu package is "python-pyparsing"
Web site is http://pyparsing.wikispaces.com/

"""

from pyparsing import (Literal, Optional, Word, Combine, Regex, Group,
                       ZeroOrMore, OneOrMore, LineEnd, LineStart, StringEnd,
                       alphanums, alphas, nums, printables)

import sys
import os
import datetime
import json
from epics.pv import get_pv

def restore_pvs(filepath, debug=False):
    """
    Restore pvs from a save file via Channel Access

    debug - Set to True if you want a line printed for each value set

    Returns True if all pvs were restored successfully.
    """
    pv_vals = []
    failures = []
    # preload PV names and values, hoping PV connections happen in background
    with open(filepath, 'r') as fh:
        for line in fh.readlines():
            if len(line) < 2 or  line.startswith('<END') or line.startswith('#'):
                continue
            pvname, value = [w.strip() for w in line[:-1].split(' ', 1)]
            if value.startswith('@array@'):
                value = value.replace('@array@', '').strip()
                if value.startswith('{') and value.endswith('}'):
                    value = value[1:-1]
                value = json.loads(value)
            thispv = get_pv(pvname, connect=False)
            pv_vals.append((thispv, value))

    for thispv, value in pv_vals:
        thispv.connect()
        pvname = thispv.pvname
        if not thispv.connected:
            print("Cannot connect to %s" % (pvname))
        elif not thispv.write_access:
            print("No write access to %s" % (pvname))
        else:
            if debug:
                print("Setting %s to %s" % (pvname, value))
            try:
                thispv.put(value, wait=False)
            except:
                exctype, excvalue, exctrace = sys.exc_info()
                print("Error restoring %s to %s : %s" % (pvname, value,
                                                         exctype, excvalue))
                failues.append(pvname)
    return len(failures) == 0

def save_pvs(request_file, save_file, debug=False):
    """
    Save pvs from a request file to a save file, via Channel Access

    Set debug=True to print a line for each PV saved.

    Will print a warning if a PV cannot connect.
    """
    saver = AutoSaver(request_file)
    saver.save(save_file, verbose=debug)

class AutoSaver(object):
    """Autosave class"""
    def __init__(self, request_file=None):
        self.request_file = request_file
        self.pvs = []
        if request_file is not None:
            self.read_request_file(request_file)

    def read_request_file(self, request_file=None):
        if request_file is not None:
            self.request_file = request_file
        self.pvs = []
        for pvname in _parse_request_file(request_file):
            self.pvs.append(get_pv(pvname, connect=False))

    def save(self, save_file=None, verbose=False):
        """save PVs to save_file"""
        now = datetime.datetime.now()
        if save_file is None:
            sfile = self.request_file
            if sfile.endswith('.req'):
                sfile = sfile[:-4]
            tstamp = now.strftime("%Y%b%d_%H%M%S")
            save_file = "%s_%s.sav" % (sfile, tstamp)

        buff = ["# File saved by pyepics AutoSaver.save() on %s" % now,
                "# Edit with extreme care."]

        for thispv in self.pvs:
            pvname = thispv.pvname
            thispv.wait_for_connection()
            if thispv.connected:
                if thispv.count == 1:
                    value = str(thispv.get())
                elif thispv.count > 1 and 'char' in thispv.type:
                    value = thispv.get(as_string=True)
                elif thispv.count > 1 and 'char' not in thispv.type:
                    value = '@array@ %s' % json.dumps(thispv.get().tolist())
                buff.append("%s %s" % (pvname, value))
                if verbose:
                    print( "PV %s = %s" % (pvname, value))
            elif verbose:
                print("PV %s not connected" % (pvname))


        buff.append("<END>\n")
        with open(save_file, 'w') as fh:
            fh.write("\n".join(buff))
        print("wrote %s"% save_file)

def _parse_request_file(request_file, macro_values={}):
    """
    Internal function to parse a request file.

    Parse happens in two stages, first build an AST then walk it and do
    file expansions (which recurse through here.)

    Returns a list of PV names.

    """
    ast = [ x for x in req_file.parseFile(request_file).asList() if len(x) > 0 ]

    result = []
    for n in ast:
        if len(n) == 1: # simple PV name
            pvname = n[0]
            for m,v in macro_values.items(): # please forgive me this awful macro expansion method
                pvname = pvname.replace("$(%s)" % m, v)
            result.append(pvname)
        elif n[0] == 'file': # include file
            subfile = n[1]
            subfile = os.path.normpath(os.path.join(os.path.dirname(request_file), subfile))
            sub_macro_vals = macro_values.copy()
            sub_macro_vals.update(dict(n[2:]))
            result += _parse_request_file(subfile, sub_macro_vals)
        else:
            raise Exception("Unexpected entry parsed from request file: %s" % n)
    return result

# request & save file grammar (combined because lots of it is pretty similar)
point = Literal('.')
minus = Literal('-')
ignored_quote = Literal('"').suppress()
ignored_comma = Literal(',').suppress()

file_name = Word(alphanums+":._-+/\\")

number = Word(nums)
integer = Combine( Optional(minus) + number )
float_number = Combine( integer +
                        Optional( point + Optional(number) )
                        ).setParseAction(lambda t:float(t[0]))

# PV names according to app developer guide and tech-talk email thread at:
# https://epics.anl.gov/tech-talk/2019/msg01429.php
pv_name = Combine(Word(alphanums+'_-+:[]<>;{}')
                  + Optional(Combine('.') + Word(printables)))
pv_value = (float_number | Word(printables))

pv_assignment = pv_name + pv_value

comment = Literal("#") + Regex(r".*")

macro = Group( Word(alphas) + Literal("=").suppress() + pv_name )
macros = Optional(macro + ZeroOrMore(Word(";,").suppress() + macro) )

#file_include = Literal("file") + pv_name + macros
file_include = Literal("file") + \
               (file_name | ignored_quote + file_name + ignored_quote) \
               + Optional(ignored_comma) + macros

def line(contents):
    return LineStart() + ZeroOrMore(Group(contents)) + LineEnd().suppress()

req_line = line( file_include | comment.suppress() | pv_name )
req_file = OneOrMore(req_line) + StringEnd().suppress()

sav_line = line( comment.suppress() | Literal("<END>").suppress() | pv_assignment)
sav_file = OneOrMore(sav_line) + StringEnd().suppress()