File: wiki.py

package info (click to toggle)
trac-xmlrpc 1.2.0%2Bsvn18657-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 296 kB
  • sloc: python: 3,024; javascript: 26; makefile: 3
file content (241 lines) | stat: -rw-r--r-- 10,030 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# -*- coding: utf-8 -*-
"""
License: BSD

(c) 2005-2008 ::: Alec Thomas (alec@swapoff.org)
(c) 2009      ::: www.CodeResort.com - BV Network AS (simon-code@bvnetwork.no)
"""

import io
import os
from datetime import datetime

from trac.attachment import Attachment
from trac.core import Component, ExtensionPoint, TracError, implements
from trac.resource import Resource, ResourceNotFound
from trac.util.datefmt import from_utimestamp, to_utimestamp
from trac.util.text import to_unicode
from trac.wiki.api import WikiSystem, IWikiPageManipulator
from trac.wiki.model import WikiPage
from trac.wiki.formatter import format_to_html

from .api import IXMLRPCHandler, Binary, unicode
from .util import getargspec, web_context


__all__ = ['WikiRPC']


if 'remote_addr' in getargspec(WikiPage.save)[0]:
    def _page_save(page, author, comment, remote_addr=None, t=None):
        page.save(author, comment, remote_addr=remote_addr, t=t)
else:
    def _page_save(page, author, comment, remote_addr=None, t=None):
        page.save(author, comment, t=t)


class WikiRPC(Component):
    """Superset of the
    [http://www.jspwiki.org/Wiki.jsp?page=WikiRPCInterface2 WikiRPC API]. """

    implements(IXMLRPCHandler)

    manipulators = ExtensionPoint(IWikiPageManipulator)

    def __init__(self):
        self.wiki = WikiSystem(self.env)

    def xmlrpc_namespace(self):
        return 'wiki'

    def xmlrpc_methods(self):
        yield (None, ((dict, datetime),), self.getRecentChanges)
        yield ('WIKI_VIEW', ((int,),), self.getRPCVersionSupported)
        yield (None, ((str, str), (str, str, int),), self.getPage)
        yield (None, ((str, str, int),), self.getPage, 'getPageVersion')
        yield (None, ((str, str), (str, str, int)), self.getPageHTML)
        yield (None, ((str, str), (str, str, int)), self.getPageHTML, 'getPageHTMLVersion')
        yield (None, ((list,),), self.getAllPages)
        yield (None, ((dict, str), (dict, str, int)), self.getPageInfo)
        yield (None, ((dict, str, int),), self.getPageInfo, 'getPageInfoVersion')
        yield (None, ((bool, str, str, dict),), self.putPage)
        yield (None, ((list, str),), self.listAttachments)
        yield (None, ((Binary, str),), self.getAttachment)
        yield (None, ((bool, str, Binary),), self.putAttachment)
        yield (None, ((bool, str, str, str, Binary),
                               (bool, str, str, str, Binary, bool)),
                               self.putAttachmentEx)
        yield (None, ((bool, str),(bool, str, int)), self.deletePage)
        yield (None, ((bool, str),), self.deleteAttachment)
        yield ('WIKI_VIEW', ((list, str),), self.listLinks)
        yield ('WIKI_VIEW', ((str, str),), self.wikiToHtml)

    def _fetch_page(self, req, pagename, version=None):
        # Helper for getting the WikiPage that performs basic checks
        page = WikiPage(self.env, pagename, version)
        req.perm(page.resource).require('WIKI_VIEW')
        if page.exists:
            return page
        else:
            msg = 'Wiki page "%s" does not exist' % pagename
            if version is not None:
                msg += ' at version %s' % version
            raise ResourceNotFound(msg)

    def _page_info(self, name, when, author, version, comment):
        return dict(name=name, lastModified=when,
                    author=author, version=int(version), comment=comment)

    def getRecentChanges(self, req, since):
        """ Get list of changed pages since timestamp """
        since = to_utimestamp(since)
        wiki_realm = Resource('wiki')
        query = ('SELECT name, time, author, version, comment '
                 'FROM wiki w1 '
                 'WHERE time >= %s '
                 'AND version = (SELECT MAX(version) '
                 '               FROM wiki w2 '
                 '               WHERE w2.name=w1.name) '
                 'ORDER BY time DESC')
        if hasattr(self.env, 'db_query'):
            generator = self.env.db_query(query, (since,))
        else:
            db = self.env.get_db_cnx()
            cursor = db.cursor()
            cursor.execute(query, (since,))
            generator = cursor
        result = []
        for name, when, author, version, comment in generator:
            if 'WIKI_VIEW' in req.perm(wiki_realm(id=name, version=version)):
                result.append(
                    self._page_info(name, from_utimestamp(when),
                                    author, version, comment))
        return result

    def getRPCVersionSupported(self, req):
        """ Returns 2 with this version of the Trac API. """
        return 2

    def getPage(self, req, pagename, version=None):
        """ Get the raw Wiki text of page, latest version. """
        page = self._fetch_page(req, pagename, version)
        return page.text

    def getPageHTML(self, req, pagename, version=None):
        """ Return latest version of page as rendered HTML, utf8 encoded. """
        page = self._fetch_page(req, pagename, version)
        fields = {'text': page.text}
        for manipulator in self.manipulators:
            manipulator.prepare_wiki_page(req, page, fields)
        context = web_context(req, page.resource, absurls=True)
        html = unicode(format_to_html(self.env, context, fields['text']))
        return '<html><body>%s</body></html>' % html

    def getAllPages(self, req):
        """ Returns a list of all pages. The result is an array of utf8 pagenames. """
        pages = []
        for page in self.wiki.get_pages():
            if 'WIKI_VIEW' in req.perm(Resource('wiki', page)):
                pages.append(page)
        return pages

    def getPageInfo(self, req, pagename, version=None):
        """ Returns information about the given page. """
        page = WikiPage(self.env, pagename, version)
        req.perm(page.resource).require('WIKI_VIEW')
        if page.exists:
            for item in page.get_history():
                return self._page_info(page.name, item[1], item[2],
                                       page.version, page.comment)

    def putPage(self, req, pagename, content, attributes):
        """ writes the content of the page. """
        page = WikiPage(self.env, pagename)
        if page.readonly:
            req.perm(page.resource).require('WIKI_ADMIN')
        elif not page.exists:
            req.perm(page.resource).require('WIKI_CREATE')
        else:
            req.perm(page.resource).require('WIKI_MODIFY')

        page.text = content
        if req.perm(page.resource).has_permission('WIKI_ADMIN'):
            page.readonly = attributes.get('readonly') and 1 or 0

        _page_save(page, attributes.get('author', req.authname),
                   attributes.get('comment'), remote_addr=req.remote_addr)
        return True

    def deletePage(self, req, name, version=None):
        """Delete a Wiki page (all versions) or a specific version by
        including an optional version number. Attachments will also be
        deleted if page no longer exists. Returns True for success."""
        wp = WikiPage(self.env, name, version)
        req.perm(wp.resource).require('WIKI_DELETE')
        try:
            wp.delete(version)
            return True
        except:
            return False

    def listAttachments(self, req, pagename):
        """ Lists attachments on a given page. """
        for a in Attachment.select(self.env, 'wiki', pagename):
            if 'ATTACHMENT_VIEW' in req.perm(a.resource):
                yield pagename + '/' + a.filename

    def getAttachment(self, req, path):
        """ returns the content of an attachment. """
        pagename, filename = os.path.split(path)
        attachment = Attachment(self.env, 'wiki', pagename, filename)
        req.perm(attachment.resource).require('ATTACHMENT_VIEW')
        return Binary(attachment.open().read())

    def putAttachment(self, req, path, data):
        """ (over)writes an attachment. Returns True if successful.
        
        This method is compatible with WikiRPC.  `putAttachmentEx` has a more
        extensive set of (Trac-specific) features. """
        pagename, filename = os.path.split(path)
        self.putAttachmentEx(req, pagename, filename, None, data)
        return True

    def putAttachmentEx(self, req, pagename, filename, description, data, replace=True):
        """ Attach a file to a Wiki page. Returns the (possibly transformed)
        filename of the attachment.
        
        Use this method if you don't care about WikiRPC compatibility. """
        if not WikiPage(self.env, pagename).exists:
            raise ResourceNotFound('Wiki page "%s" does not exist' % pagename)
        if replace:
            try:
                attachment = Attachment(self.env, 'wiki', pagename, filename)
                req.perm(attachment.resource).require('ATTACHMENT_DELETE')
                attachment.delete()
            except TracError:
                pass
        attachment = Attachment(self.env, 'wiki', pagename)
        req.perm(attachment.resource).require('ATTACHMENT_CREATE')
        attachment.author = req.authname
        attachment.description = description
        attachment.insert(filename, io.BytesIO(data.data), len(data.data))
        return attachment.filename

    def deleteAttachment(self, req, path):
        """ Delete an attachment. """
        pagename, filename = os.path.split(path)
        if not WikiPage(self.env, pagename).exists:
            raise ResourceNotFound('Wiki page "%s" does not exist' % pagename)
        attachment = Attachment(self.env, 'wiki', pagename, filename)
        req.perm(attachment.resource).require('ATTACHMENT_DELETE')
        attachment.delete()
        return True

    def listLinks(self, req, pagename):
        """ ''Not implemented'' """
        return []

    def wikiToHtml(self, req, text):
        """ Render arbitrary Wiki text as HTML. """
        context = web_context(req, absurls=1)
        return(to_unicode(format_to_html(self.env, context, text)))