1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235
|
# -*- coding: utf-8 -*-
"""
License: BSD
(c) 2005-2008 ::: Alec Thomas (alec@swapoff.org)
(c) 2009 ::: www.CodeResort.com - BV Network AS (simon-code@bvnetwork.no)
"""
import datetime
import re
import sys
import time
try:
import babel
except ImportError:
babel = None
from trac.core import Component, implements
from trac.perm import PermissionError
from trac.resource import ResourceNotFound
from trac.util.datefmt import utc
from trac.util.html import Fragment, Markup
from trac.util.text import empty, exception_to_unicode, to_unicode
from trac.web.api import RequestDone
from .api import (IRPCProtocol, Binary, MethodNotFound, ProtocolException,
ServiceException)
from .util import basestring, cleandoc_, gettext, unichr, xmlrpclib
__all__ = ['XmlRpcProtocol']
REPLACEMENT_CHAR = u'\uFFFD' # Unicode replacement character
_illegal_unichrs = [(0x00, 0x08), (0x0B, 0x0C), (0x0E, 0x1F), (0x7F, 0x84),
(0x86, 0x9F), (0xFDD0, 0xFDDF), (0xFFFE, 0xFFFF)]
if sys.maxunicode >= 0x10000: # not narrow build
_illegal_unichrs.extend([(0x1FFFE, 0x1FFFF), (0x2FFFE, 0x2FFFF),
(0x3FFFE, 0x3FFFF), (0x4FFFE, 0x4FFFF),
(0x5FFFE, 0x5FFFF), (0x6FFFE, 0x6FFFF),
(0x7FFFE, 0x7FFFF), (0x8FFFE, 0x8FFFF),
(0x9FFFE, 0x9FFFF), (0xAFFFE, 0xAFFFF),
(0xBFFFE, 0xBFFFF), (0xCFFFE, 0xCFFFF),
(0xDFFFE, 0xDFFFF), (0xEFFFE, 0xEFFFF),
(0xFFFFE, 0xFFFFF), (0x10FFFE, 0x10FFFF)])
_illegal_ranges = ["%s-%s" % (unichr(low), unichr(high))
for (low, high) in _illegal_unichrs]
_illegal_xml_chars_RE = re.compile(u'[%s]' % u''.join(_illegal_ranges))
def to_xmlrpc_datetime(dt):
""" Convert a datetime.datetime object to a xmlrpclib DateTime object """
return xmlrpclib.DateTime(dt.utctimetuple())
def from_xmlrpc_datetime(data):
"""Return datetime (in utc) from XMLRPC datetime string (is always utc)"""
t = list(time.strptime(data.value, "%Y%m%dT%H:%M:%S")[0:6])
return datetime.datetime(*t, tzinfo=utc)
class XmlRpcProtocol(Component):
_description = cleandoc_(r"""
There should be XML-RPC client implementations available for all
popular programming languages.
Example call using `curl`:
{{{
user: ~ > cat body.xml
<?xml version="1.0"?>
<methodCall>
<methodName>wiki.getPage</methodName>
<params>
<param><string>WikiStart</string></param>
</params>
</methodCall>
user: ~ > curl -H "Content-Type: application/xml" --data @body.xml %(url_anon)s
<?xml version='1.0'?>
<methodResponse>
<params>
<param>
<value><string>= Welcome to....
}}}
The following snippet illustrates how to perform authenticated calls in Python.
{{{
>>> try:
... from xmlrpc import client as cli
... except ImportError:
... import xmlrpclib as cli
...
>>> p = cli.ServerProxy(%(url_auth)r)
>>> p.system.getAPIVersion()
%(version)r
}}}
""")
implements(IRPCProtocol)
# IRPCProtocol methods
def rpc_info(self):
return 'XML-RPC', gettext(self._description)
def rpc_match(self):
# Legacy path xmlrpc provided for backwards compatibility:
# Using this order to get better docs
yield 'rpc', 'application/xml'
yield 'xmlrpc', 'application/xml'
yield 'rpc', 'text/xml'
yield 'xmlrpc', 'text/xml'
def parse_rpc_request(self, req, content_type):
""" Parse XML-RPC requests."""
try:
args, method = xmlrpclib.loads(
req.read(int(req.get_header('Content-Length'))))
except Exception as e:
self.log.debug("RPC(xml) parse error: %s", to_unicode(e))
raise ProtocolException(xmlrpclib.Fault(-32700, to_unicode(e)))
else:
self.log.debug("RPC(xml) call by '%s', method '%s' with args: %s",
req.authname, method, repr(args))
args = self._normalize_xml_input(args)
return {'method': method, 'params': args}
def send_rpc_result(self, req, result):
"""Send the result of the XML-RPC call back to the client."""
rpcreq = req.rpc
method = rpcreq.get('method')
self.log.debug("RPC(xml) '%s' result: %s", method, repr(result))
result = tuple(self._normalize_xml_output([result]))
self._send_response(req,
xmlrpclib.dumps(result, methodresponse=True), rpcreq['mimetype'])
def send_rpc_error(self, req, e):
"""Send an XML-RPC fault message back to the caller"""
rpcreq = req.rpc
fault = None
if isinstance(e, ProtocolException):
fault = e._exc
elif isinstance(e, ServiceException):
e = e._exc
elif isinstance(e, MethodNotFound):
fault = xmlrpclib.Fault(-32601, to_unicode(e))
elif isinstance(e, PermissionError):
fault = xmlrpclib.Fault(403, to_unicode(e))
elif isinstance(e, ResourceNotFound):
fault = xmlrpclib.Fault(404, to_unicode(e))
if fault is not None:
self._send_response(req, xmlrpclib.dumps(fault), rpcreq['mimetype'])
else:
self.log.error('%s%s', e, exception_to_unicode(e, traceback=True))
err_code = hasattr(e, 'code') and e.code or 1
method = rpcreq.get('method')
self._send_response(req,
xmlrpclib.dumps(
xmlrpclib.Fault(err_code,
"'%s' while executing '%s()'" % (str(e), method))),
rpcreq['mimetype'])
# Internal methods
def _send_response(self, req, response, content_type='application/xml'):
response = to_unicode(response)
response = _illegal_xml_chars_RE.sub(REPLACEMENT_CHAR, response)
response = response.encode("utf-8")
req.send_response(200)
req.send_header('Content-Type', content_type)
req.send_header('Content-Length', len(response))
req.end_headers()
req.write(response)
raise RequestDone
def _normalize_xml_input(self, args):
""" Normalizes arguments (at any level - traversing dicts and lists):
1. xmlrpc.DateTime is converted to Python datetime
2. tracrpc.api.Binary => xmlrpclib.Binary
2. String line-endings same as from web (`\n` => `\r\n`)
"""
new_args = []
for arg in args:
# self.env.log.debug("arg %s, type %s" % (arg, type(arg)))
if isinstance(arg, xmlrpclib.DateTime):
new_args.append(from_xmlrpc_datetime(arg))
elif isinstance(arg, xmlrpclib.Binary):
arg.__class__ = Binary
new_args.append(arg)
elif isinstance(arg, basestring):
new_args.append(arg.replace("\n", "\r\n"))
elif isinstance(arg, dict):
for key, val in arg.items():
arg[key], = self._normalize_xml_input([val])
new_args.append(arg)
elif isinstance(arg, (list, tuple)):
new_args.append(self._normalize_xml_input(arg))
else:
new_args.append(arg)
return new_args
def _normalize_xml_output(self, result):
""" Normalizes and converts output (traversing it):
1. None => ''
2. datetime => xmlrpclib.DateTime
3. Binary => xmlrpclib.Binary
4. Fragment|Markup => unicode
"""
new_result = []
for res in result:
if isinstance(res, datetime.datetime):
new_result.append(to_xmlrpc_datetime(res))
elif isinstance(res, Binary):
res.__class__ = xmlrpclib.Binary
new_result.append(res)
elif res is None or res is empty:
new_result.append('')
elif isinstance(res, (Fragment, Markup)):
new_result.append(to_unicode(res))
elif babel and isinstance(res, babel.support.LazyProxy):
new_result.append(to_unicode(res))
elif isinstance(res, dict):
for key, val in res.items():
res[key], = self._normalize_xml_output([val])
new_result.append(res)
elif isinstance(res, list) or isinstance(res, tuple):
new_result.append(self._normalize_xml_output(res))
else:
new_result.append(res)
return new_result
|