1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212
|
# -*- coding: utf-8 -*-
# Python side of the testing protocol.
if PYTHON3:
import os, signal, subprocess
else:
__metaclass__ = type
import os
try:
import signal
except ImportError:
# Jython misses this module.
signal = None
try:
import subprocess
except ImportError:
# Jython misses this module.
subprocess = None
# Make sure that ../Pymacs will be found within this process.
import sys
if '..' not in sys.path:
sys.path.insert(0, '..')
import Pymacs
lisp = Pymacs.lisp
class Launch:
# Make sure that ../Pymacs will be found in external processes.
def __init__(self):
self.pythonpath_saved = os.environ.get('PYTHONPATH')
os.environ['PYTHONPATH'] = '..'
def __del__(self):
if self.pythonpath_saved is None:
del os.environ['PYTHONPATH']
else:
os.environ['PYTHONPATH'] = self.pythonpath_saved
class Emacs(Launch):
# Requests towards Emacs are written to file "_request", while
# replies from Emacs are read from file "_reply". We call Emacs
# attention by erasing "_reply", and Emacs calls our attention by
# erasing "_request". These rules guarantee that no file is ever
# read by one side before it has been fully written by the other.
# Busy waiting, with built-in delays, is used on both sides.
popen = None
def __init__(self):
Launch.__init__(self)
self.cleanup()
import atexit
atexit.register(self.cleanup)
emacs = os.environ.get('EMACS') or 'emacs'
self.command = emacs, '-batch', '--no-site', '-q', '-l', 'setup.el'
if subprocess is None:
self.command = self.command + ('-f', 'run-one-request')
else:
self.command = self.command + ('-f', 'run-all-requests')
def cleanup(self):
if self.popen is not None:
self.popen.poll()
if self.popen.returncode is None:
if signal is not None:
os.kill(self.popen.pid, signal.SIGINT)
os.waitpid(self.popen.pid, 0)
self.popen = None
if os.path.exists('_request'):
os.remove('_request')
if os.path.exists('_reply'):
os.remove('_reply')
def receive(self):
if subprocess is None:
handle = open('_reply')
buffer = handle.read()
handle.close()
else:
import time
while os.path.exists('_request'):
self.popen.poll()
assert self.popen.returncode is None, self.popen.returncode
time.sleep(0.005)
self.popen.poll()
assert self.popen.returncode is None, self.popen.returncode
if PYTHON3:
handle = open('_reply', newline='')
else:
handle = open('_reply')
buffer = handle.read()
handle.close()
return buffer
def send(self, text):
if PYTHON3:
handle = open('_request', 'w', newline='')
else:
handle = open('_request', 'w')
handle.write(text)
handle.close()
if subprocess is None:
status = os.system(' '.join(self.command))
assert status == 0, status
else:
if os.path.exists('_reply'):
os.remove('_reply')
if self.popen is None:
self.popen = subprocess.Popen(self.command)
self.popen.poll()
assert self.popen.returncode is None, self.popen.returncode
def start_emacs():
Emacs.services = Emacs()
def stop_emacs():
Emacs.services.cleanup()
def ask_emacs(text, printer=None):
if printer is not None:
text = '(%s %s)' % (printer, text)
Emacs.services.send(text)
return Emacs.services.receive()
class Python(Launch):
def __init__(self):
Launch.__init__(self)
# Start a Pymacs helper subprocess for executing Python code.
import subprocess
self.process = subprocess.Popen(
[os.environ.get('PYTHON') or 'python',
'-c', 'from Pymacs import main; main(\'..\')'],
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
text = self.receive()
assert text == '(version "@VERSION@")\n', repr(text)
if PYTHON3:
def receive(self):
# Receive a Lisp expression from the Pymacs helper.
stdout = self.process.stdout
data = stdout.read(3)
if not data or data[0] != ord(b'<'):
if data == b'Tra':
# Likely a traceback, and the Pymacs helper terminated.
diagnostic = (
'got:\n' + (data + stdout.read()).decode('UTF-8'))
else:
diagnostic = 'got ' + repr(data)
raise Pymacs.ProtocolError("'<' expected, %s\n" % diagnostic)
while data[-1] != ord('\t'):
data += stdout.read(1)
return str(stdout.read(int(data[1:-1])), 'UTF-8')
else:
def receive(self):
# Receive a Lisp expression from the Pymacs helper.
stdout = self.process.stdout
data = stdout.read(3)
if not data or data[0] != '<':
if data == 'Tra':
# Likely a traceback, and the Pymacs helper terminated.
diagnostic = 'got:\n' + data + stdout.read()
else:
diagnostic = 'got ' + repr(data)
raise Pymacs.ProtocolError("'<' expected, %s\n" % diagnostic)
while data[-1] != '\t':
data += stdout.read(1)
return stdout.read(int(data[1:-1]))
if PYTHON3:
def send(self, text):
# Send TEXT, a Python expression, to the Pymacs helper.
stdin = self.process.stdin
data = text.encode('UTF-8')
if text[-1] == '\n':
stdin.write(('>%d\t' % len(data)).encode('ASCII'))
stdin.write(data)
else:
stdin.write(('>%d\t' % (len(data) + 1)).encode('ASCII'))
stdin.write(data)
stdin.write(b'\n')
stdin.flush()
else:
def send(self, text):
# Send TEXT, a Python expression, to the Pymacs helper.
stdin = self.process.stdin
if text[-1] == '\n':
stdin.write('>%d\t%s' % (len(text), text))
else:
stdin.write('>%d\t%s\n' % (len(text) + 1, text))
stdin.flush()
def start_python():
Python.services = Python()
def stop_python():
Python.services.process.kill()
def ask_python(text):
Python.services.send(text)
return Python.services.receive()
|