File: kernel.py

package info (click to toggle)
swiftlang 6.0.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, trixie
  • size: 2,519,992 kB
  • sloc: cpp: 9,107,863; ansic: 2,040,022; asm: 1,135,751; python: 296,500; objc: 82,456; f90: 60,502; lisp: 34,951; pascal: 19,946; sh: 18,133; perl: 7,482; ml: 4,937; javascript: 4,117; makefile: 3,840; awk: 3,535; xml: 914; fortran: 619; cs: 573; ruby: 573
file content (188 lines) | stat: -rw-r--r-- 6,172 bytes parent folder | download | duplicates (12)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
# See https://llvm.org/LICENSE.txt for license information.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

from subprocess import Popen
import os
import subprocess
import tempfile
import traceback
from ipykernel.kernelbase import Kernel

__version__ = "0.0.1"


def _get_executable():
    """Find the mlir-opt executable."""

    def is_exe(fpath):
        """Returns whether executable file."""
        return os.path.isfile(fpath) and os.access(fpath, os.X_OK)

    program = os.environ.get("MLIR_OPT_EXECUTABLE", "mlir-opt")
    path, name = os.path.split(program)
    # Attempt to get the executable
    if path:
        if is_exe(program):
            return program
    else:
        for path in os.environ["PATH"].split(os.pathsep):
            file = os.path.join(path, name)
            if is_exe(file):
                return file
    raise OSError("mlir-opt not found, please see README")


class MlirOptKernel(Kernel):
    """Kernel using mlir-opt inside jupyter.

    The reproducer syntax (`// configuration:`) is used to run passes. The
    previous result can be referenced to by using `_` (this variable is reset
    upon error). E.g.,

    ```mlir
    // configuration: --pass
    func.func @foo(%tensor: tensor<2x3xf64>) -> tensor<3x2xf64> { ... }
    ```

    ```mlir
    // configuration: --next-pass
    _
    ```
    """

    implementation = "mlir"
    implementation_version = __version__

    language_version = __version__
    language = "mlir"
    language_info = {
        "name": "mlir",
        "codemirror_mode": {"name": "mlir"},
        "mimetype": "text/x-mlir",
        "file_extension": ".mlir",
        "pygments_lexer": "text",
    }

    @property
    def banner(self):
        """Returns kernel banner."""
        # Just a placeholder.
        return "mlir-opt kernel %s" % __version__

    def __init__(self, **kwargs):
        Kernel.__init__(self, **kwargs)
        self._ = None
        self.executable = None
        self.silent = False

    def get_executable(self):
        """Returns the mlir-opt executable path."""
        if not self.executable:
            self.executable = _get_executable()
        return self.executable

    def process_output(self, output):
        """Reports regular command output."""
        if not self.silent:
            # Send standard output
            stream_content = {"name": "stdout", "text": output}
            self.send_response(self.iopub_socket, "stream", stream_content)

    def process_error(self, output):
        """Reports error response."""
        if not self.silent:
            # Send standard error
            stream_content = {"name": "stderr", "text": output}
            self.send_response(self.iopub_socket, "stream", stream_content)

    def do_execute(
        self, code, silent, store_history=True, user_expressions=None, allow_stdin=False
    ):
        """Execute user code using mlir-opt binary."""

        def ok_status():
            """Returns OK status."""
            return {
                "status": "ok",
                "execution_count": self.execution_count,
                "payload": [],
                "user_expressions": {},
            }

        def run(code):
            """Run the code by pipeing via filesystem."""
            try:
                inputmlir = tempfile.NamedTemporaryFile(delete=False)
                command = [
                    # Specify input and output file to error out if also
                    # set as arg.
                    self.get_executable(),
                    "--color",
                    inputmlir.name,
                    "-o",
                    "-",
                ]
                # Simple handling of repeating last line.
                if code.endswith("\n_"):
                    if not self._:
                        raise NameError("No previous result set")
                    code = code[:-1] + self._
                inputmlir.write(code.encode("utf-8"))
                inputmlir.close()
                pipe = Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
                output, errors = pipe.communicate()
                exitcode = pipe.returncode
            finally:
                os.unlink(inputmlir.name)

            # Replace temporary filename with placeholder. This takes the very
            # remote chance where the full input filename (generated above)
            # overlaps with something in the dump unrelated to the file.
            fname = inputmlir.name.encode("utf-8")
            output = output.replace(fname, b"<<input>>")
            errors = errors.replace(fname, b"<<input>>")
            return output, errors, exitcode

        self.silent = silent
        if not code.strip():
            return ok_status()

        try:
            output, errors, exitcode = run(code)

            if exitcode:
                self._ = None
            else:
                self._ = output.decode("utf-8")
        except KeyboardInterrupt:
            return {"status": "abort", "execution_count": self.execution_count}
        except Exception as error:
            # Print traceback for local debugging.
            traceback.print_exc()
            self._ = None
            exitcode = 255
            errors = repr(error).encode("utf-8")

        if exitcode:
            content = {"ename": "", "evalue": str(exitcode), "traceback": []}

            self.send_response(self.iopub_socket, "error", content)
            self.process_error(errors.decode("utf-8"))

            content["execution_count"] = self.execution_count
            content["status"] = "error"
            return content

        if not silent:
            data = {}
            data["text/x-mlir"] = self._
            content = {
                "execution_count": self.execution_count,
                "data": data,
                "metadata": {},
            }
            self.send_response(self.iopub_socket, "execute_result", content)
            self.process_output(self._)
            self.process_error(errors.decode("utf-8"))
        return ok_status()