File: base.py

package info (click to toggle)
python-pweave 0.30.3-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 5,064 kB
  • sloc: python: 30,281; makefile: 167
file content (283 lines) | stat: -rw-r--r-- 9,365 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# Processors that execute code from code chunks
import sys
import re
import os
import io
import copy
from ..config import *
import pickle

class PwebProcessorBase(object):
    """Processors run code from parsed Pweave documents. This is an abstract base
    class for specific implementations"""

    def __init__(self, parsed, kernel, source, docmode,
                       figdir, outdir):
        self.parsed = parsed
        self.source = source
        self.documentationmode = docmode
        self.figdir = figdir
        self.outdir = outdir
        self.executed = []

        self.cwd = os.path.dirname(os.path.abspath(source))
        self.basename = os.path.basename(os.path.abspath(source)).split(".")[0]
        self.pending_code = ""  # Used for multichunk splits

    def run(self):
        # Create directory for figures
        self.ensureDirectoryExists(self.getFigDirectory())
        # Documentation mode uses results from previous  executions
        # so that compilation is fast if you only work on doc chunks
        if self.documentationmode:
            success = self._getoldresults()
            if success:
                print("Restoring cached results")
                return
            else:
                sys.stderr.write(
                    "DOCUMENTATION MODE ERROR:\nCan't find stored results, running the code and caching results for the next documentation mode run\n")
                rcParams["storeresults"] = True

        self.executed = []

        # Term chunk returns a list of dicts, this flattens the results
        for chunk in self.parsed:
            res = self._runcode(chunk)
            if isinstance(res, list):
                self.executed = self.executed + res
            else:
                self.executed.append(res)


        self.isexecuted = True
        if rcParams["storeresults"]:
            self.store(self.executed)
        self.close()

    def close(self):
        pass

    def ensureDirectoryExists(self, figdir):
        if not os.path.isdir(figdir):
            os.makedirs(figdir)

    def getresults(self):
        #flattened = list(itertools.chain.from_iterable(self.executed))
        return copy.deepcopy(self.executed)

    def store(self, data):
        """Cache the results"""
        cachedir = os.path.join(self.cwd, rcParams["cachedir"])
        self.ensureDirectoryExists(cachedir)

        name = cachedir + "/" + self.basename + ".pkl"
        f = open(name, 'wb')
        pickle.dump(data, f, pickle.HIGHEST_PROTOCOL)
        f.close()

    def restore(self):
        """Restore results from cache"""
        cachedir = os.path.join(self.cwd, rcParams["cachedir"])
        name = cachedir + "/" + self.basename + ".pkl"

        if os.path.exists(name):
            f = open(name, 'rb')
            self._oldresults = pickle.load(f)
            f.close()
            return True
        else:
            return False

    def _runcode(self, chunk):
        """Execute code from a code chunk based on options"""
        if chunk['type'] != 'doc' and chunk['type'] != 'code':
            return chunk

        # Add defaultoptions to parsed options
        if chunk['type'] == 'code':
            defaults = rcParams["chunk"]["defaultoptions"].copy()
            defaults.update(chunk["options"])
            chunk.update(defaults)
            # This is a bit redundant,
            # it is added afterwards to support adding options as
            # metadata to notebooks
            chunk["options"] = defaults
            #del chunk['options']

            # Read the content from file or object
        if 'source' in chunk:
            source = chunk["source"]
            if os.path.isfile(source):
                chunk["content"] = "\n" + io.open(source, "r", encoding='utf-8').read().rstrip() + "\n" + chunk[
                    'content']
            else:
                chunk_text = chunk["content"]  # Get the text from chunk
                module_text = self.loadstring(
                    "import inspect\nprint(inspect.getsource(%s))" % source)  # Get the module source using inspect
                chunk["content"] = module_text[0]["text"].rstrip()
                if chunk_text.strip() != "":
                    chunk["content"] += "\n" + chunk_text

        if chunk['type'] == 'doc':
            chunk['content'] = self.loadinline(chunk['content'])
            return chunk


        if chunk['type'] == 'code':
            sys.stdout.write("Processing chunk %(number)s named %(name)s from line %(start_line)s\n" % chunk)

            old_content = None
            if not chunk["complete"]:
                self.pending_code += chunk["content"]
                chunk['result'] = ''
                return chunk
            elif self.pending_code != "":
                old_content = chunk["content"]
                chunk["content"] = self.pending_code + old_content  # Code from all pending chunks for running the code
                self.pending_code = ""

            if not chunk['evaluate']:
                chunk['result'] = ''
                return chunk

            self.pre_run_hook(chunk)

            if chunk['term']:
                # Running in term mode can return a list of chunks
                chunks = []
                sources, results = self.loadterm(chunk['content'], chunk=chunk)
                n = len(sources)
                content = ""
                for i in range(n):
                    if len(results[i]) == 0:
                        content += sources[i]
                    else:
                        new_chunk = chunk.copy()
                        new_chunk["content"] = content + sources[i].rstrip()
                        content = ""
                        new_chunk["result"] = results[i]
                        chunks.append(new_chunk)

                #Deal with not output, #73
                if len(content) > 0:
                    new_chunk = chunk.copy()
                    new_chunk["content"] = content
                    new_chunk["result"] = ""
                    chunks.append(new_chunk)

                return(chunks)
            else:
                chunk['result'] = self.loadstring(chunk['content'], chunk=chunk)

        #After executing the code save the figure
        if chunk['fig']:
            chunk['figure'] = self.savefigs(chunk)

        if old_content is not None:
            chunk['content'] = old_content  # The code from current chunk for display

        self.post_run_hook(chunk)


        return chunk

    def post_run_hook(self, chunk):
        pass

    def pre_run_hook(self, chunk):
        pass

    def init_matplotlib(self):
        pass

    def savefigs(self, chunk):
        pass

    def getFigDirectory(self):
        return os.path.join(self.outdir, self.figdir)

    def _getoldresults(self):
        """Get the results of previous run for documentation mode"""

        success = self.restore()
        if not success:
            return False

        executed = []

        n = len(self.parsed)

        for i in range(n):
            chunk = self.parsed[i]
            if chunk['type'] != "code":
                executed.append(self._hideinline(chunk.copy()))
            else:
                chunks = [c for c in self._oldresults if c["number"] == i and c["type"] == "code"]
                executed = executed + chunks

        self.executed = executed
        return True

    def load_shell(self, chunk):
        pass

    def loadstring(self, code, chunk=None):
        pass

    def loadterm(self, code_string, chunk=None):
        pass

    def load_inline_string(self, code_string):
        pass

    def loadinline(self, content):
        """Evaluate code from doc chunks using ERB markup"""
        # Flags don't work with ironpython
        splitted = re.split('(<%[\w\s\W]*?%>)', content)  # , flags = re.S)
        # No inline code
        if len(splitted) < 2:
            return content

        n = len(splitted)

        for i in range(n):
            elem = splitted[i]
            if not elem.startswith('<%'):
                continue
            if elem.startswith('<%='):
                code_str = elem.replace('<%=', '').replace('%>', '').lstrip()
                result = self.load_inline_string(code_str).strip()
                splitted[i] = result
                continue
            if elem.startswith('<%'):
                code_str = elem.replace('<%', '').replace('%>', '').lstrip()
                result = self.load_inline_string(code_str).strip()
                splitted[i] = result
        return ''.join(splitted)

    def add_echo(self, code_str):
        return 'print(%s),' % code_str

    def _hideinline(self, chunk):
        """Hide inline code in doc mode"""
        splitted = re.split('<%[\w\s\W]*?%>', chunk['content'])
        chunk['content'] = ''.join(splitted)
        return chunk

class ProtectStdStreams(object):
    def __init__(self, obj=None):
        self.__obj = obj

    def __enter__(self):
        self.__stdout = sys.stdout
        self.__stderr = sys.stderr
        self.__stdin = sys.stdin
        self.__displayhook = sys.displayhook
        return self.__obj

    def __exit__(self, type, value, traceback):
        sys.stdout = self.__stdout
        sys.stderr = self.__stderr
        sys.stdin = self.__stdin
        sys.displayhook = self.__displayhook