1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
|
# -*- coding: utf-8 -*-
"""
This module contains the server socket definition.
"""
import argparse
import inspect
import logging
import json
import os
import struct
import sys
import time
import traceback
import threading
try:
import socketserver
PY33 = True
except ImportError:
import SocketServer as socketserver
PY33 = False
def _logger():
""" Returns the module's logger """
return logging.getLogger(__name__)
HEARTBEAT_DELAY = 60 # delay max without heartbeat signal
def import_class(klass):
"""
Imports a class from a fully qualified name string.
:param klass: class string, e.g.
"pyqode.core.backend.workers.CodeCompletionWorker"
:return: The corresponding class
"""
path = klass.rfind(".")
class_name = klass[path + 1: len(klass)]
try:
module = __import__(klass[0:path], globals(), locals(), [class_name])
klass = getattr(module, class_name)
except ImportError as e:
raise ImportError('%s: %s' % (klass, str(e)))
except AttributeError:
raise ImportError(klass)
else:
return klass
class JsonServer(socketserver.TCPServer):
"""
A server socket based on a json messaging system.
"""
class _Handler(socketserver.BaseRequestHandler):
def read_bytes(self, size):
"""
Read x bytes
:param size: number of bytes to read.
"""
if not PY33:
data = ''
else:
data = bytes()
while len(data) < size:
tmp = self.request.recv(size - len(data))
data += tmp
if tmp == '':
raise RuntimeError("socket connection broken")
return data
def get_msg_len(self):
""" Gets message len """
data = self.read_bytes(4)
payload = struct.unpack('=I', data)
return payload[0]
def read(self):
""" Reads a json string from socket and load it. """
size = self.get_msg_len()
data = self.read_bytes(size).decode('utf-8')
return json.loads(data)
def send(self, obj):
"""
Sends a python obj as a json string on the socket.
:param obj: The object to send, must be Json serializable.
"""
msg = json.dumps(obj).encode('utf-8')
_logger().log(1, 'sending %d bytes for the payload', len(msg))
header = struct.pack('=I', len(msg))
self.request.sendall(header)
self.request.sendall(msg)
def handle(self):
"""
Hanlde the request and keep it alive while shutdown signal
has not been received
"""
self.srv.reset_heartbeat()
# make sure to have enough time to handle the request
self.srv.timeout = HEARTBEAT_DELAY * 10
data = self.read()
self._handle(data)
self.srv.timeout = HEARTBEAT_DELAY
self.srv.reset_heartbeat()
def _handle(self, data):
"""
Handles a work request.
"""
try:
_logger().log(1, 'handling request %r', data)
assert data['worker']
assert data['request_id']
assert data['data'] is not None
response = {'request_id': data['request_id'], 'results': []}
try:
worker = import_class(data['worker'])
except ImportError:
_logger().exception('Failed to import worker class')
else:
if inspect.isclass(worker):
worker = worker()
_logger().log(1, 'worker: %r', worker)
_logger().log(1, 'data: %r', data['data'])
try:
ret_val = worker(data['data'])
except Exception:
_logger().exception(
'something went bad with worker %r(data=%r)',
worker, data['data'])
ret_val = None
if ret_val is None:
ret_val = []
response = {'request_id': data['request_id'],
'results': ret_val}
finally:
_logger().log(1, 'sending response: %r', response)
try:
self.send(response)
except ConnectionAbortedError:
pass
except:
_logger().warn('error with data=%r', data)
exc1, exc2, exc3 = sys.exc_info()
traceback.print_exception(exc1, exc2, exc3, file=sys.stderr)
def __init__(self, args=None):
"""
:param args: Argument parser args. If None, the server will setup and
use its own argument parser (using
:meth:`pyqode.core.backend.default_parser`)
"""
self.reset_heartbeat()
if not args:
args = default_parser().parse_args()
self.port = args.port
self.timeout = HEARTBEAT_DELAY
self._Handler.srv = self
socketserver.TCPServer.__init__(
self, ('127.0.0.1', int(args.port)), self._Handler)
print('started on 127.0.0.1:%d' % int(args.port))
print('running with python %d.%d.%d' % (sys.version_info[:3]))
self._heartbeat_thread = threading.Thread(target=self.heartbeat)
self._heartbeat_thread.setDaemon(True)
self._heartbeat_thread.start()
def reset_heartbeat(self):
self.last_time = time.time()
self.elapsed_time = 0
def heartbeat(self):
while True:
elapsed_time = time.time() - self.last_time
if elapsed_time > self.timeout:
self.shutdown()
sys.exit(1)
time.sleep(1)
def default_parser():
"""
Configures and return the default argument parser. You should use this
parser as a base if you want to add custom arguments.
The default parser only has one argument, the tcp port used to start the
server socket. *(CodeEdit picks up a free port and use it to run
the server and connect its client socket)*
:returns: The default server argument parser.
"""
parser = argparse.ArgumentParser()
parser.add_argument("port", help="the local tcp port to use to run "
"the server")
return parser
def serve_forever(args=None):
"""
Creates the server and serves forever
:param args: Optional args if you decided to use your own
argument parser. Default is None to let the JsonServer setup its own
parser and parse command line arguments.
"""
class Unbuffered(object):
def __init__(self, stream):
self.stream = stream
def write(self, data):
self.stream.write(data)
self.stream.flush()
def __getattr__(self, attr):
return getattr(self.stream, attr)
sys.stdout = Unbuffered(sys.stdout)
sys.stderr = Unbuffered(sys.stderr)
server = JsonServer(args=args)
server.serve_forever()
# Server script example
if __name__ == '__main__':
from pyqode.core import backend
backend.CodeCompletionWorker.providers.append(
backend.DocumentWordsProvider())
serve_forever()
|