File: xml_rpc.py

package info (click to toggle)
trac-xmlrpc 1.2.0%2Bsvn18657-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 296 kB
  • sloc: python: 3,024; javascript: 26; makefile: 3
file content (235 lines) | stat: -rw-r--r-- 8,678 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
# -*- coding: utf-8 -*-
"""
License: BSD

(c) 2005-2008 ::: Alec Thomas (alec@swapoff.org)
(c) 2009      ::: www.CodeResort.com - BV Network AS (simon-code@bvnetwork.no)
"""

import datetime
import re
import sys
import time

try:
    import babel
except ImportError:
    babel = None

from trac.core import Component, implements
from trac.perm import PermissionError
from trac.resource import ResourceNotFound
from trac.util.datefmt import utc
from trac.util.html import Fragment, Markup
from trac.util.text import empty, exception_to_unicode, to_unicode
from trac.web.api import RequestDone

from .api import (IRPCProtocol, Binary, MethodNotFound, ProtocolException,
                  ServiceException)
from .util import basestring, cleandoc_, gettext, unichr, xmlrpclib

__all__ = ['XmlRpcProtocol']

REPLACEMENT_CHAR = u'\uFFFD'  # Unicode replacement character
_illegal_unichrs = [(0x00, 0x08), (0x0B, 0x0C), (0x0E, 0x1F), (0x7F, 0x84),
                    (0x86, 0x9F), (0xFDD0, 0xFDDF), (0xFFFE, 0xFFFF)]

if sys.maxunicode >= 0x10000:  # not narrow build
    _illegal_unichrs.extend([(0x1FFFE, 0x1FFFF), (0x2FFFE, 0x2FFFF),
                             (0x3FFFE, 0x3FFFF), (0x4FFFE, 0x4FFFF),
                             (0x5FFFE, 0x5FFFF), (0x6FFFE, 0x6FFFF),
                             (0x7FFFE, 0x7FFFF), (0x8FFFE, 0x8FFFF),
                             (0x9FFFE, 0x9FFFF), (0xAFFFE, 0xAFFFF),
                             (0xBFFFE, 0xBFFFF), (0xCFFFE, 0xCFFFF),
                             (0xDFFFE, 0xDFFFF), (0xEFFFE, 0xEFFFF),
                             (0xFFFFE, 0xFFFFF), (0x10FFFE, 0x10FFFF)])

_illegal_ranges = ["%s-%s" % (unichr(low), unichr(high))
                   for (low, high) in _illegal_unichrs]

_illegal_xml_chars_RE = re.compile(u'[%s]' % u''.join(_illegal_ranges))


def to_xmlrpc_datetime(dt):
    """ Convert a datetime.datetime object to a xmlrpclib DateTime object """
    return xmlrpclib.DateTime(dt.utctimetuple())


def from_xmlrpc_datetime(data):
    """Return datetime (in utc) from XMLRPC datetime string (is always utc)"""
    t = list(time.strptime(data.value, "%Y%m%dT%H:%M:%S")[0:6])
    return datetime.datetime(*t, tzinfo=utc)


class XmlRpcProtocol(Component):

    _description = cleandoc_(r"""
    There should be XML-RPC client implementations available for all
    popular programming languages.
    Example call using `curl`:

    {{{
    user: ~ > cat body.xml
    <?xml version="1.0"?>
    <methodCall>
    <methodName>wiki.getPage</methodName>
    <params>
    <param><string>WikiStart</string></param>
    </params>
    </methodCall>

    user: ~ > curl -H "Content-Type: application/xml" --data @body.xml %(url_anon)s
    <?xml version='1.0'?>
    <methodResponse>
    <params>
    <param>
    <value><string>= Welcome to....
    }}}

    The following snippet illustrates how to perform authenticated calls in Python.

    {{{
    >>> try:
    ...     from xmlrpc import client as cli
    ... except ImportError:
    ...     import xmlrpclib as cli
    ...
    >>> p = cli.ServerProxy(%(url_auth)r)
    >>> p.system.getAPIVersion()
    %(version)r
    }}}
    """)

    implements(IRPCProtocol)

    # IRPCProtocol methods

    def rpc_info(self):
        return 'XML-RPC', gettext(self._description)

    def rpc_match(self):
        # Legacy path xmlrpc provided for backwards compatibility:
        # Using this order to get better docs
        yield 'rpc', 'application/xml'
        yield 'xmlrpc', 'application/xml'
        yield 'rpc', 'text/xml'
        yield 'xmlrpc', 'text/xml'

    def parse_rpc_request(self, req, content_type):
        """ Parse XML-RPC requests."""
        try:
            args, method = xmlrpclib.loads(
                        req.read(int(req.get_header('Content-Length'))))
        except Exception as e:
            self.log.debug("RPC(xml) parse error: %s", to_unicode(e))
            raise ProtocolException(xmlrpclib.Fault(-32700, to_unicode(e)))
        else:
            self.log.debug("RPC(xml) call by '%s', method '%s' with args: %s",
                           req.authname, method, repr(args))
            args = self._normalize_xml_input(args)
            return {'method': method, 'params': args}

    def send_rpc_result(self, req, result):
        """Send the result of the XML-RPC call back to the client."""
        rpcreq = req.rpc
        method = rpcreq.get('method')
        self.log.debug("RPC(xml) '%s' result: %s", method, repr(result))
        result = tuple(self._normalize_xml_output([result]))
        self._send_response(req,
                xmlrpclib.dumps(result, methodresponse=True), rpcreq['mimetype'])

    def send_rpc_error(self, req, e):
        """Send an XML-RPC fault message back to the caller"""
        rpcreq = req.rpc
        fault = None
        if isinstance(e, ProtocolException):
            fault = e._exc
        elif isinstance(e, ServiceException):
            e = e._exc
        elif isinstance(e, MethodNotFound):
            fault = xmlrpclib.Fault(-32601, to_unicode(e))
        elif isinstance(e, PermissionError):
            fault = xmlrpclib.Fault(403, to_unicode(e))
        elif isinstance(e, ResourceNotFound):
            fault = xmlrpclib.Fault(404, to_unicode(e))

        if fault is not None:
            self._send_response(req, xmlrpclib.dumps(fault), rpcreq['mimetype'])
        else:
            self.log.error('%s%s', e, exception_to_unicode(e, traceback=True))
            err_code = hasattr(e, 'code') and e.code or 1
            method = rpcreq.get('method')
            self._send_response(req,
                    xmlrpclib.dumps(
                        xmlrpclib.Fault(err_code,
                            "'%s' while executing '%s()'" % (str(e), method))),
                    rpcreq['mimetype'])

    # Internal methods

    def _send_response(self, req, response, content_type='application/xml'):
        response = to_unicode(response)
        response = _illegal_xml_chars_RE.sub(REPLACEMENT_CHAR, response)
        response = response.encode("utf-8")
        req.send_response(200)
        req.send_header('Content-Type', content_type)
        req.send_header('Content-Length', len(response))
        req.end_headers()
        req.write(response)
        raise RequestDone

    def _normalize_xml_input(self, args):
        """ Normalizes arguments (at any level - traversing dicts and lists):
        1. xmlrpc.DateTime is converted to Python datetime
        2. tracrpc.api.Binary => xmlrpclib.Binary
        2. String line-endings same as from web (`\n` => `\r\n`)
        """
        new_args = []
        for arg in args:
            # self.env.log.debug("arg %s, type %s" % (arg, type(arg)))
            if isinstance(arg, xmlrpclib.DateTime):
                new_args.append(from_xmlrpc_datetime(arg))
            elif isinstance(arg, xmlrpclib.Binary):
                arg.__class__ = Binary
                new_args.append(arg)
            elif isinstance(arg, basestring):
                new_args.append(arg.replace("\n", "\r\n"))
            elif isinstance(arg, dict):
                for key, val in arg.items():
                    arg[key], = self._normalize_xml_input([val])
                new_args.append(arg)
            elif isinstance(arg, (list, tuple)):
                new_args.append(self._normalize_xml_input(arg))
            else:
                new_args.append(arg)
        return new_args

    def _normalize_xml_output(self, result):
        """ Normalizes and converts output (traversing it):
        1. None => ''
        2. datetime => xmlrpclib.DateTime
        3. Binary => xmlrpclib.Binary
        4. Fragment|Markup => unicode
        """
        new_result = []
        for res in result:
            if isinstance(res, datetime.datetime):
                new_result.append(to_xmlrpc_datetime(res))
            elif isinstance(res, Binary):
                res.__class__ = xmlrpclib.Binary
                new_result.append(res)
            elif res is None or res is empty:
                new_result.append('')
            elif isinstance(res, (Fragment, Markup)):
                new_result.append(to_unicode(res))
            elif babel and isinstance(res, babel.support.LazyProxy):
                new_result.append(to_unicode(res))
            elif isinstance(res, dict):
                for key, val in res.items():
                    res[key], = self._normalize_xml_output([val])
                new_result.append(res)
            elif isinstance(res, list) or isinstance(res, tuple):
                new_result.append(self._normalize_xml_output(res))
            else:
                new_result.append(res)
        return new_result