# -*- coding: utf-8 -*- """ pocoo.pkg.core.acl ~~~~~~~~~~~~~~~~~~ Pocoo ACL System. :copyright: 2006 by Armin Ronacher. :license: GNU GPL, see LICENSE for more details. """ from pocoo.db import meta from pocoo.pkg.core.forum import Site, Forum, Thread from pocoo.pkg.core.user import User, Group from pocoo.pkg.core.db import users, groups, group_members, privileges, \ forums, posts, acl_mapping, acl_subjects, acl_objects class AclManager(object): """ Manager object to manage ALCs. """ STRONG_NO = -1 WEAK_NO = 0 WEAK_YES = 1 STRONG_YES = 2 def __init__(self, ctx, subject): self.ctx = ctx self.subject = subject if isinstance(subject, User): self._type = 'user' elif isinstance(subject, Group): self._type = 'group' else: raise ValueError('neither user or group specified') def allow(self, privilege, obj, force=False): """Allows the subject privilege on obj.""" return self._set(privilege, obj, 1 + bool(force)) def default(self, privilege, obj): """Sets the state for privilege on obj back to weak yes.""" return self._set(privilege, obj, 0) def deny(self, privilege, obj, force=False): """Denies the subject privilege on obj.""" return self._set(privilege, obj, -1 - bool(force)) def can_access(self, privilege, obj): """Checks if the current subject with the required privilege somehow. Either directly or when the subject is a user and one of its groups can access it.""" #XXX: maybe this could be one big query instead of 4 #XXX: this currently does not work correctly, therefore return True return True if not isinstance(obj, (Forum, Thread, Site.__class__)): raise TypeError('obj must be a forum, thread or site') privilege = privilege.upper() s = self._get_subject_join().alias('s').c def do_check(obj, tendency): db = self.ctx.engine o = self._get_object_join(obj).alias('o').c # self check r = db.execute(meta.select([acl_mapping.c.state], (acl_mapping.c.priv_id == privileges.c.priv_id) & (acl_mapping.c.subject_id == s.subject_id) & (acl_mapping.c.object_id == o.object_id) & (privileges.c.name == privilege) )) row = r.fetchone() if row is not None: if row['state'] in (self.STRONG_NO, self.STRONG_YES): return row['state'] == self.STRONG_YES tendency = row['state'] # if the controlled subject is a user check all groups if isinstance(self.subject, User): r = db.execute(meta.select([acl_mapping.c.state], (acl_mapping.c.object_id == o.object_id) & (acl_mapping.c.subject_id == groups.c.subject_id) & (groups.c.group_id == group_members.c.group_id) & (group_members.c.user_id == self.subject.user_id) )) while True: row = r.fetchone() if row is None: break state = row[0] if state in (self.STRONG_YES, self.STRONG_NO): return state == self.STRONG_YES if tendency is None: tendency = state elif tendency == self.WEAK_NO and state == self.WEAK_YES: tendency = self.WEAK_YES # check related objects if isinstance(obj, Thread): return do_check(obj.forum, tendency) elif isinstance(obj, Forum): return do_check(Site, tendency) else: return tendency return do_check(obj, None) in (self.WEAK_YES, self.STRONG_YES) def _set(self, privilege, obj, state): """Helper functions for settings privileges.""" privilege = privilege.upper() if self.subject.subject_id is None: self._bootstrap() if obj.object_id is None: self._bootstrap_object(obj) # special state "0" which means delete if not state: p = meta.select([privileges.c.priv_id], privileges.c.name == privilege) self.ctx.engine.execute(acl_mapping.delete( (acl_mapping.c.priv_id == p.c.priv_id) & (acl_mapping.c.subject_id == self.subject.subject_id) & (acl_mapping.c.object_id == obj.object_id) )) return # touch privilege and check existing mapping priv_id = self._fetch_privilege(privilege) r = self.ctx.engine.execute(meta.select([acl_mapping.c.state], (acl_mapping.c.priv_id == priv_id) & (acl_mapping.c.subject_id == self.subject.subject_id) & (acl_mapping.c.object_id == obj.object_id) )) row = r.fetchone() if row is not None: # this rule exists already if row['state'] == state: return # goddamn, same rule - different state, delete old first self._set(privilege, obj, 0) # insert new rule self.ctx.engine.execute(acl_mapping.insert(), priv_id = priv_id, subject_id = self.subject.subject_id, object_id = obj.object_id, state = state ) def _bootstrap(self): """This method is automatically called when subject_id is None and an subject_id is required.""" r = self.ctx.engine.execute(acl_subjects.insert(), subject_type = self._type ) self.subject.subject_id = r.last_inserted_ids()[0] self.subject.save() def _bootstrap_object(self, obj): """Like _bootstrap but works for objects.""" objtype = self._get_object_type(obj) r = self.ctx.engine.execute(acl_objects.insert(), object_type = objtype ) obj.object_id = r.last_inserted_ids()[0] obj.save() def _get_object_type(self, obj): if isinstance(obj, Forum): return 'forum' elif isinstance(obj, Thread): return 'thread' elif obj is Site: return 'site' raise TypeError('obj isn\'t a forum or thread') def _get_object_join(self, obj): """Returns a subjoin for the object id.""" t = self._get_object_type(obj) if t == 'forum': return meta.select([forums.c.object_id], forums.c.forum_id == obj.forum_id ) elif t == 'thread': return meta.select([posts.c.object_id], posts.c.post_id == obj.post_id ) else: # XXX: it works ^^ # i really want something like meta.select('0 as group_id') class Fake(object): def alias(self, n): class _C(object): class c(object): object_id = 0 return _C return Fake() def _get_subject_join(self): """Returns a subjoin for the subject id.""" if self._type == 'user': return meta.select([users.c.subject_id], users.c.user_id == self.subject.user_id ) return meta.select([groups.c.subject_id], groups.c.group_id == self.subject.group_id ) def _fetch_privilege(self, name): """Returns the priv_id for the given privilege. If it doesn\'t exist by now the system will create a new privilege.""" r = self.ctx.engine.execute(meta.select([privileges.c.priv_id], privileges.c.name == name )) row = r.fetchone() if row is not None: return row[0] r = self.ctx.engine.execute(privileges.insert(), name = name ) return r.last_inserted_ids()[0] def __repr__(self): if self._type == 'user': id_ = self.subject.user_id else: id_ = self.subject.group_id if self.subject.subject_id is None: return '<%s %s:%d inactive>' % ( self.__class__.__name__, self._type, id_ ) return '<%s %s:%d active as %d>' % ( self.__class__.__name__, self._type, id_, self.subject.subject_id ) # -*- coding: utf-8 -*- """ pocoo.pkg.core.auth ~~~~~~~~~~~~~~~~~~~ Default authentication module. :copyright: 2006 by Armin Ronacher. :license: GNU GPL, see LICENSE for more details. """ from datetime import datetime from pocoo.context import Component from pocoo.utils.net import IP from pocoo.application import RequestWrapper from pocoo.settings import cfg from pocoo.pkg.core.user import User, check_login_data class AuthProvider(Component): @property def auth_name(self): """ has to return the name of the auth module for the configuration file. This name defaults to the classname. """ return self.__class__.__name__ def get_user(self, req): """ This method should either return a valid `User object`_ or ``None``. .. _User object: pocoo.pkg.core.user """ def get_user_id(self, session_dict): """ This method should either return the user_id of the user or ``None``. """ def do_login(self, req, username, password): """ This method should update the user session so that the auth provider can recognize the user in the ``get_user`` method. It has to return a valid ``HttpResponse``, for redirecting to external login scripts or ``False``, to display an error message (login failed). If it returns ``True`` pocoo will redirect to the last visited page. """ def do_logout(self, req): """ This method should return a valid ``Response`` for redirecting to external scripts or ``None``. """ class SessionAuth(AuthProvider): def get_user(self, req): try: user_id = req.session['user_id'] return User(self.ctx, user_id) except (KeyError, User.NotFound): return None def do_login(self, req, username, password): user_id = check_login_data(req.ctx, username, password) if user_id is not None: req.session['user_id'] = user_id return True return False def do_logout(self, req): if 'user_id' in req.session: req.session.pop('user_id') def get_user_id(self, session_dict): return session_dict.get('user_id') class AuthWrapper(RequestWrapper): def get_priority(self): # after SessionWrapper return 3 def process_request(self, req): # XXX: what to do with uid? uid = req.session.get('user_id', -1) req.auth = AuthController(req) req.user = req.auth.get_user() def process_response(self, req, resp): return resp def get_auth_provider_mapping(ctx): """Returns a list of auth providers.""" providers = {} for comp in ctx.get_components(AuthProvider): providers[comp.auth_name] = comp return providers def get_auth_provider(ctx): """Returns the enabled auth provider.""" if 'auth/provider' not in ctx._cache: providers = get_auth_provider_mapping(ctx) provider = providers[ctx.cfg.get('general', 'auth_module')] ctx._cache['auth/provider'] = provider return ctx._cache['auth/provider'] class AuthController(object): auth_provider = cfg.str('general', 'auth_module') def __init__(self, req): self.ctx = req.ctx self.req = req self.provider = get_auth_provider(req.ctx) def get_user(self): """ Returns the user for this request """ user = self.provider.get_user(self.req) if user is not None: user.ip = IP(self.req.environ['REMOTE_ADDR']) return user # return anonymous user return User(self.ctx, -1) def do_login(self, username, password): """ Returns a valid ``Response``, for redirecting to external login scripts or ``False``, to display an error message (login failed). If it returns ``True`` pocoo should redirect to the last visited page. """ rv = self.provider.do_login(self.req, username, password) if rv is not False: self.req.user = self.get_user() return rv return False def do_logout(self): """ Loggs the user out. Can eiter return None or a Response for external redirects. """ # update last login time self.req.user.last_login = datetime.now() self.req.user.save() self.provider.do_logout(self.req) #XXX: maybe a bit slow self.req.user = self.get_user() # -*- coding: utf-8 -*- """ pocoo.pkg.core.bbcode ~~~~~~~~~~~~~~~~~~~~~ Pocoo BBCode parser. :copyright: 2006 by Georg Brandl, Armin Ronacher. :license: GNU GPL, see LICENSE for more details. """ import re from pocoo import Component from pocoo.pkg.core.textfmt import MarkupFormat from pocoo.pkg.core.smilies import get_smiley_buttons, replace_smilies from pocoo.utils.html import escape_html, translate_color from pocoo.utils.activecache import Node, CallbackNode, NodeList tag_re = re.compile(r'(\[(/?[a-zA-Z0-9]+)(?:=(".+?"|.+?))?\])') class EndOfText(Exception): """Raise when the end of the text is reached.""" class TokenList(list): """A subclass of a list for tokens which allows to flatten the tokens so that the original bbcode is the return value.""" def flatten(self): return u''.join(token.raw for token in self) def __repr__(self): return '<%s %s>' % ( self.__class__.__name__, list.__repr__(self) ) class Token(object): """Token Baseclass""" def __repr__(self): return '<%s %s>' % ( self.__class__.__name__, self.raw ) class TextToken(Token): """A token for plain text.""" def __init__(self, data): self.data = self.raw = data class TagToken(Token): """A token for tags.""" def __init__(self, raw, tagname, attr): self.raw = raw self.name = tagname self.attr = attr class Parser(object): """ BBCode Parser Class """ def __init__(self, ctx, text, handlers, allowed_tags): self.ctx = ctx self._tokens = tag_re.split(text) self._tokens.reverse() self._is_text = True self._cache = [] self._handlers = handlers self._allowed_tags = allowed_tags def tag_allowed(self, tagname): """ Check if a tagname is allowed for this parser. """ if self._allowed_tags is None: return True return tagname in self._allowed_tags def get_next_token(self): """ Fetch the next raw token from the text Raise ``EndOfText`` if not further token exists. """ if self._cache: return self._cache.pop() get_token = self._tokens.pop if not self._tokens: raise EndOfText() if self._is_text: self._is_text = False return TextToken(get_token()) else: self._is_text = True raw = get_token() tagname = get_token().lower() attr = get_token() if attr and attr[:6] == attr[-6:] == '"': attr = attr[6:-6] return TagToken(raw, tagname, attr) def push_token(self, token): """ Pushes the last fetched token in a cache so that the next time you call ``get_next_token`` returns the pushed token. """ self._cache.append(token) def parse(self, needle=None, preserve_needle=False): """ Parses the text until ``needle`` or the end of text if not defined. If it finds the needle it will delete the needle token. If you want the needle token too set ``preserve_needle`` to ``True``. In comparison with the ``get_tokens`` method this method will call the node handlers for each node. """ result = NodeList() try: while True: token = self.get_next_token() if isinstance(token, TagToken) and token.name == needle: if preserve_needle: self.push_token(token) break result.append(self.get_node(token)) except EndOfText: pass return result def get_tokens(self, needle=None, preserve_needle=False): """ Like ``parse`` but returns an unparsed TokenList. Basically you would never need this method except for preserved areas like Code blocks etc. """ result = TokenList() try: while True: token = self.get_next_token() if isinstance(token, TagToken) and token.name == needle: if preserve_needle: self.push_token(token) break result.append(token) except EndOfText: pass return result def get_node(self, token): """ Return the node for a token. If the token was a ``TextToken`` the resulting node will call ``get_text_node`` which returns a \n to <br/> replaced version of the token value wrapped in a plain ``Node``. In all other cases it will try to lookup the node in the list of registered token handlers. If this fails it wraps the raw token value in a ``Node``. """ if isinstance(token, TextToken): return self.get_text_node(token.data) if self.tag_allowed(token.name): for handler in self._handlers: rv = handler.get_node(token, self) if rv is not None: if isinstance(rv, Node): return rv return Node(rv) return self.get_text_node(token.raw) def get_text_node(self, data): """ Newline replaces the text and wraps it in an ``Node``. """ text = replace_smilies(self.ctx, data) return Node(re.sub(r'\r?\n', '<br />\n', text)) def wrap_render(self, tag, parse_until): """ Renders untile ``parse_until`` and wraps it in the html tag ``tag``. """ return NodeList(Node('<%s>' % tag), self.parse(parse_until), Node('</%s>' % tag)) def joined_render(self, *args): """ Takes a number of arguments which are either strings, unicode objects or nodes. It creates a new newlist, iterates over all arguments and converts all to nodes if not happened by now. """ result = NodeList() for arg in args: if isinstance(arg, Node): result.append(arg) else: result.append(Node(arg)) return result def callback(self, callback, data): """ Returns a new ``CallbackNode``. Don't create callback nodes on your own, this method might do some further magic in the future. """ return CallbackNode(callback, *data) class BBCodeTagProvider(Component): #: list of handled tags tags = [] #: list of callbacks callbacks = [] def get_node(self, token, parser): """ Is called when a tag is found. It must return a valid ``Node`` or a string which is automatically wrapped into a plain ``Node``. """ def render_callback(self, req, callback, data): """ Has to handle a callback for ``callback`` with ``data`` and return a string """ return u'' def get_buttons(self, req): """ Return a valid button definition for "tagname" or None if no button is required. A valid button definition is a dict in the following form:: {'name': _('Bold'), 'description': _('Insert bold text'), 'icon': self.ctx.make_url('!cobalt/...'), 'insert': '[b]{text}[/b]'} """ return () class BBCode(MarkupFormat): """ BBCode markup format. """ name = 'bbcode' editor_javascript = '!cobalt/core/pocoo/app/BBCodeEditor.js' def __init__(self, ctx): super(BBCode, self).__init__(ctx) self.handlers = {} self.callbacks = {} for comp in ctx.get_components(BBCodeTagProvider): for tag in comp.tags: self.handlers.setdefault(tag, []).append(comp) for callback in comp.callbacks: self.callbacks[callback] = comp def get_signature_tags(self): """Returns the allowed signature tags or None if all""" if not hasattr(self, '_signature_tags'): r = self.ctx.cfg.get('board', 'bbcode_signature_tags', 'ALL') if r == 'ALL': self._signature_tags = None else: self._signature_tags = [s.strip().lower() for s in r.split(',')] return self._signature_tags def parse(self, text, signature): handlers = self.ctx.get_components(BBCodeTagProvider) allowed_tags = None if signature: allowed_tags = self.get_signature_tags() p = Parser(self.ctx, escape_html(text), handlers, allowed_tags) return p.parse() def render_callback(self, req, callback, data): """Redirect the callback to the BBCode Provider.""" for comp in self.ctx.get_components(BBCodeTagProvider): rv = comp.render_callback(req, callback, data) if rv is not None: return rv raise Exception('unhandled callback %r' % callback) def quote_text(self, req, text, username=None): if username is None: return '[quote]%s[/quote]' % text return '[quote="%s"]%s[/quote]' % (username, text) def get_editor_options(self, req, signature): buttons = [] if signature: signature_tags = self.get_signature_tags() for comp in self.ctx.get_components(BBCodeTagProvider): for button in comp.get_buttons(req): if signature and button['tagname'] not in signature_tags: continue buttons.append(button) return { 'buttons': buttons, 'smilies': get_smiley_buttons(req.ctx) } class BasicBBCodeTagProvider(BBCodeTagProvider): tags = ['b', 'i', 'u', 's', 'url', 'email', 'color', 'size', 'code', 'quote', 'list'] callbacks = ['quote', 'list'] def get_node(self, token, parser): ctx = self.ctx if token.name == 'b': if token.attr: return return parser.wrap_render('strong', '/b') if token.name == 'i': if token.attr: return return parser.wrap_render('em', '/i') if token.name == 'u': if token.attr: return return parser.wrap_render('ins', '/u') if token.name == 's': if token.attr: return return parser.wrap_render('del', '/s') if token.name == 'url': if token.attr: content = parser.parse('/url') url = token.attr else: tokenlist = parser.get_tokens('/url') content = url = tokenlist.flatten() if url.startswith('javascript:'): url = url[11:] return parser.joined_render('<a href="', url, '">', content, '</a>') if token.name == 'email': if token.attr: content = parser.parse('/email') mail = token.attr else: tokenlist = parser.get_tokens('/email') mail = content = tokenlist.flatten() return parser.joined_render('<a href="mailto:"', mail, '">', content, '</a>') if token.name == 'color': content = parser.parse('/color') try: color = translate_color(token.attr) except ValueError: return token.raw return parser.joined_render('<span style="color: ', color, '">', content, '</span>') if token.name == 'size': content = parser.parse('/size') if not token.attr or not token.attr.isdigit() or len(token.attr) > 2: return token.raw return parser.joined_render('<span style="font-size: ', token.attr, 'px">', content, '</span>') if token.name == 'img': if token.attr: return tokenlist = parser.get_tokens('/img') url = tokenlist.flatten() if url.startswith('javascript:'): url = url[11:] return u'<img src="%s" />' % url if token.name == 'code': if token.attr: return return u'<pre>%s</pre>' % parser.get_tokens('/code').flatten() if token.name == 'quote': return parser.callback('quote', (token.attr or u'', parser.parse('/quote'))) if token.name == 'list': return parser.callback('list', (token.attr or u'*', parser.parse('/list'))) def render_callback(self, req, callback, data): if callback == 'quote': _ = req.gettext written, body = data if written: if not written.endswith(':'): written = (_('%s wrote') % written) + u':' written = u'<div class="written_by">%s</div>' % written return u'<blockquote>%s%s</blockquote>' % ( written, body.render(req, self) ) if callback == 'list': type, body = data lines = [] for line in re.split(r'^\s*\[\*\](?m)', body.render(req, self)): line = line.strip() if line: lines.append(u'<li>%s</li>' % line) return u'<ul>%s</ul>' % u'\n'.join(lines) def get_buttons(self, req): _ = req.gettext make_url = self.ctx.make_url #XXX: themeable icon_url = lambda x: make_url('!cobalt/core/default/img/bbcode/' + x) return [ {'tagname': 'b', 'name': _('Bold'), 'description': _('Insert bold text'), 'insert': '[b]{text}[/b]', 'icon': icon_url('bold.png')}, {'tagname': 'i', 'name': _('Italic'), 'description': _('Insert italic text'), 'insert': '[i]{text}[/i]', 'icon': icon_url('italic.png')}, {'tagname': 'u', 'name': _('Underline'), 'description': _('Insert underlined text'), 'insert': '[u]{text}[/u]', 'icon': icon_url('underline.png')}, {'tagname': 's', 'name': _('Strikethrough'), 'description': _('Insert striked text'), 'insert': '[i]{text}[/i]', 'icon': icon_url('strikethrough.png')}, {'tagname': 'size', 'name': _('Font Size'), 'description': _('Change the font size'), 'insert': '[size={attr}]{text}[/size]', 'values': [ (8, _('Tiny')), (11, _('Small')), (13, _('Normal')), (18, _('Big')), (24, _('Huge')) ]}, {'tagname': 'color', 'name': _('Font Color'), 'description': _('Change Font Color'), 'insert': '[color={attr}]{text}[/size]', 'values': [ ('black', _('Black')), ('blue', _('Blue')), ('brown', _('Brown')), ('cyan', _('Cyan')), ('gray', _('Gray')), ('green', _('Green')), ('magenta', _('Magenta')), ('purple', _('Purple')), ('red', _('Red')), ('white', _('White')), ('yellow', _('Yellow')) ]}, {'tagname': 'url', 'name': _('Link'), 'description': _('Create a Link'), 'icon': icon_url('link.png'), 'insert': '[url]{text}[/url]'}, {'tagname': 'img', 'name': _('Image'), 'description': _('Insert an image'), 'icon': icon_url('img.png'), 'insert': '[img]{text}[/img]'}, {'tagname': 'code', 'name': _('Code'), 'description': _('Insert a codeblock'), 'icon': icon_url('code.png'), 'insert': '[code]{text}[/code]'}, {'tagname': 'quote', 'name': _('Quote'), 'description': _('Insert a blockquote'), 'icon': icon_url('quote.png'), 'insert': '[quote]{text}[/quote]'} ] # -*- coding: utf-8 -*- """ pocoo.pkg.core.cache ~~~~~~~~~~~~~~~~~~~~ Provides a very simple caching system for persistent processes. :copyright: 2006 by Armin Ronacher. :license: GNU GPL, see LICENSE for more details. """ from pocoo.application import RequestWrapper from pocoo.exceptions import PocooRuntimeError from pocoo.utils.cache import Cache # This is currently unused. class CacheSystem(RequestWrapper): def __init__(self, ctx): self.cache = Cache(autoprune=ctx.cfg.get('cache', 'autoprune', False)) self.uri2key = {} RequestWrapper.__init__(self, ctx) def get_priority(self): # caching has highest priority return 1 def process_request(self, req): req.cache_control = None req.cache = self.cache if req.environ['REQUEST_METHOD'] != 'GET': return if req.environ['REQUEST_URI'] not in self.uri2key: return key = self.uri2key[req.environ['REQUEST_URI']] return self.cache.fetch(key, None) def process_response(self, req, resp): if not req.cache_control: return resp action, key = req.cache_control if action == 'set': self.cache.dump(key, resp) self.uri2key[req.environ['REQUEST_URI']] = key elif action == 'update': if isinstance(key, basestring): self.cache.remove(key) else: for k in key: self.cache.remove(k) else: raise PocooRuntimeError('req.cache_control invalid') return resp # -*- coding: utf-8 -*- """ pocoo.pkg.core.captcha ~~~~~~~~~~~~~~~~~~~~~~ Captcha URL Handler. Displays a random captcha picture (debugging only). :copyright: 2006 by Armin Ronacher. :license: GNU GPL, see LICENSE for more details. """ from pocoo.application import RequestHandler from pocoo.http import Response class CaptchaImage(RequestHandler): handler_regexes = ['!captcha$'] def handle_request(self, req): from pocoo.utils.captcha import Captcha c = Captcha() response = Response(c.generate_image()) response['Content-Type'] = 'image/png' return response # -*- coding: utf-8 -*- """ pocoo.pkg.core.cobalt ~~~~~~~~~~~~~~~~~~~~~ Provides static content serving like mozilla's chrome:// scheme. :copyright: 2006 by Armin Ronacher, Georg Brandl. :license: GNU GPL, see LICENSE for more details. """ import os import time from mimetypes import guess_type from pocoo.template import FileRequirements class CobaltMiddleware(object): """ The Cobalt middleware serves static files. """ def __init__(self, app, ctx): self.app = app self.ctx = ctx self.cache_enabled = ctx.cfg.get_bool('cache', 'static_cache') def get_stylesheet_imports(self): if not self.cache_enabled or 'cobalt/stylesheet_imports' not in self.ctx._cache: handled = set() lines = [] for comp in self.ctx.get_components(FileRequirements): for name in comp.get_stylesheet_imports(): item = (comp.package, name) if item in handled: continue handled.add(item) url = '!cobalt/%s/%s' % item lines.append('@import url(%s);' % str(self.ctx.make_url(url))) self.ctx._cache['cobalt/stylesheet_imports'] = '\n'.join(lines) return self.ctx._cache['cobalt/stylesheet_imports'] def get_javascript_imports(self): if not self.cache_enabled or 'cobalt/javascript_imports' not in self.ctx._cache: handled = set() lines = [] onload = [] for comp in self.ctx.get_components(FileRequirements): for name in comp.get_javascript_imports(): item = (comp.package, name) if item in handled: continue handled.add(item) imp = self.ctx.pkgmanager.importers[comp.package] lines.append(imp.get_data(os.path.join('static', name))) self.ctx._cache['cobalt/javascript_imports'] = '\n\n'.join(lines) return self.ctx._cache['cobalt/javascript_imports'] def __call__(self, environ, start_response): path = environ.get('PATH_INFO', '/') if path.startswith('/!cobalt/'): mime_type = None try: pkgname, fname = path[9:].split('/', 1) if pkgname == '_import_': if fname == 'styles.css': mime_type = 'text/css' content = self.get_stylesheet_imports() elif fname == 'script.js': mime_type = 'application/x-javascript' content = self.get_javascript_imports() else: guessed_type = guess_type(fname) mime_type = guessed_type[0] or 'text/plain' imp = self.ctx.pkgmanager.importers[pkgname] content = imp.get_data(os.path.join('static', fname)) if mime_type is not None: expiry = time.time() + 3600 # cache for one hour expiry = time.asctime(time.gmtime(expiry)) headers = [('Content-Type', mime_type), ('Cache-Control', 'public'), ('Expires', expiry)] start_response('200 OK', headers) if environ.get('REQUEST_METHOD', 'GET') == 'HEAD': return [] else: return [content] except (ValueError, KeyError, IOError): # XXX: output custom error message? pass return self.app(environ, start_response) # -*- coding: utf-8 -*- """ pocoo.pkg.core.db ~~~~~~~~~~~~~~~~~ Pocoo core database definition. :copyright: 2006 by Armin Ronacher, Georg Brandl. :license: GNU GPL, see LICENSE for more details. """ from pocoo.db import meta, DatabaseObserver ANONYMOUS_USER_ID = -1 DEFAULT_USER_ID = 0 sessions = meta.Table('core_sessions', meta.Column('session_key', meta.Unicode(40), primary_key=True), meta.Column('ip_addr', meta.Unicode(15)), meta.Column('expires', meta.DateTime), meta.Column('last_reload', meta.DateTime), meta.Column('data', meta.Pickled(dict)), meta.Column('action', meta.Unicode), ) users = meta.Table('core_users', meta.Column('user_id', meta.Integer, primary_key=True), meta.Column('subject_id', meta.Integer, meta.ForeignKey('core_acl_subjects.subject_id')), meta.Column('username', meta.Unicode(40)), meta.Column('email', meta.Unicode(250)), meta.Column('pwhash', meta.Unicode(60)), meta.Column('act_key', meta.Unicode(8)), meta.Column('language', meta.Unicode(2)), meta.Column('profile', meta.Pickled(dict)), meta.Column('settings', meta.Pickled(dict)), meta.Column('last_login', meta.DateTime), meta.Column('register_date', meta.DateTime), meta.Column('post_count', meta.Integer), meta.Column('read_threads', meta.Binary), meta.Column('read_posts', meta.Binary), ) groups = meta.Table('core_groups', meta.Column('group_id', meta.Integer, primary_key=True), meta.Column('subject_id', meta.Integer, meta.ForeignKey('core_acl_subjects.subject_id')), meta.Column('name', meta.Unicode(40)), meta.Column('public', meta.Boolean), meta.Column('hidden', meta.Boolean) ) group_members = meta.Table('core_group_members', meta.Column('user_id', meta.Integer, meta.ForeignKey('core_users.user_id')), meta.Column('group_id', meta.Integer, meta.ForeignKey('core_groups.group_id')) ) forums = meta.Table('core_forums', meta.Column('forum_id', meta.Integer, primary_key=True), meta.Column('parent_id', meta.Integer, meta.ForeignKey('core_forums.forum_id')), meta.Column('object_id', meta.Integer, meta.ForeignKey('core_acl_objects.object_id')), meta.Column('name', meta.Unicode(100)), meta.Column('description', meta.Unicode), meta.Column('position', meta.Integer), meta.Column('link', meta.Unicode(100)), #XXX: foreign key doesn't work meta.Column('last_post_id', meta.Integer), meta.Column('post_count', meta.Integer), meta.Column('thread_count', meta.Integer) ) posts = meta.Table('core_posts', meta.Column('post_id', meta.Integer, primary_key=True), meta.Column('forum_id', meta.Integer, meta.ForeignKey('core_forums.forum_id')), meta.Column('parent_id', meta.Integer, meta.ForeignKey('core_posts.post_id')), meta.Column('root_post_id', meta.Integer, meta.ForeignKey('core_posts.post_id')), meta.Column('object_id', meta.Integer, meta.ForeignKey('core_acl_objects.object_id')), meta.Column('post_count', meta.Integer), meta.Column('view_count', meta.Integer), meta.Column('author_id', meta.Integer, meta.ForeignKey('core_users.user_id')), meta.Column('username', meta.Unicode(200)), meta.Column('title', meta.Unicode(200)), meta.Column('text', meta.Unicode), meta.Column('timestamp', meta.DateTime) ) privileges = meta.Table('core_privileges', meta.Column('priv_id', meta.Integer, primary_key=True), meta.Column('name', meta.Unicode(100)) ) acl_mapping = meta.Table('core_acl_mapping', meta.Column('priv_id', meta.Integer, meta.ForeignKey('core_privileges.priv_id')), meta.Column('subject_id', meta.Integer, meta.ForeignKey('core_acl_subjects.subject_id')), meta.Column('object_id', meta.Integer, meta.ForeignKey('core_acl_objects.object_id')), meta.Column('state', meta.Integer) ) acl_subjects = meta.Table('core_acl_subjects', meta.Column('subject_id', meta.Integer, primary_key=True), meta.Column('subject_type', meta.String(10)) ) acl_objects = meta.Table('core_acl_objects', meta.Column('object_id', meta.Integer, primary_key=True), meta.Column('object_type', meta.String(10)) ) acl_subject_join = meta.polymorphic_union({ 'user': users, 'group': groups }, 'subject_type') acl_object_join = meta.polymorphic_union({ 'forum': forums, 'post': posts }, 'object_type') class CoreTableObserver(DatabaseObserver): def after_table_creation(self, table): if table is users: e = self.ctx.engine.execute e(users.insert(), user_id=ANONYMOUS_USER_ID, username='anonymous') e(users.insert(), user_id=DEFAULT_USER_ID, username='default') # -*- coding: utf-8 -*- """ pocoo.pkg.core.feeds ~~~~~~~~~~~~~~~~~~~~ Provides RSS Feeds. :copyright: 2006 by Armin Ronacher. :license: GNU GPL, see LICENSE for more details. """ from pocoo import Component from pocoo.http import PageNotFound, Response from pocoo.application import RequestHandler from pocoo.db import meta from pocoo.utils.feed import Feed from pocoo.pkg.core.db import forums, posts, users from pocoo.pkg.core.textfmt import parse_and_render class FeedProvider(Component): #: identifier for this feed. must be lowercase identifier = 'unknown' def get_feed(self, req, parameter): """ Return a dict in the following form:: {'title': 'Title of this feed', 'description': 'Description of this feed', 'items': [{ 'title': 'title of this item', 'link': 'relative link of this item', 'author': 'author of this item', 'description': 'description of this item', 'pub_date': 'date of this item' }]} Can raise a `FeedNotFound` exception. """ @property def url(self): return self.ctx.make_url('feeds/%s.xml' % self.identifier) class FeedNotFound(Exception): pass class ThreadFeed(FeedProvider): identifier = 'thread' def get_feed(self, req, post_id): _ = req.gettext try: post_id = int(post_id) except: raise FeedNotFound() row = self.ctx.engine.execute(meta.select([posts.c.root_post_id], (posts.c.post_id == post_id) )).fetchone() if row is None: raise FeedNotFound() root_post_id = row[0] # select data result = self.ctx.engine.execute(meta.select( [posts.c.post_id, posts.c.title, posts.c.text, posts.c.timestamp, users.c.username], (posts.c.root_post_id == root_post_id) & (users.c.user_id == posts.c.author_id), order_by=[meta.desc(posts.c.post_id)], limit=10 )) return { 'title': _('Last Posts in Thread %d') % root_post_id, 'description': _('The last 10 posts in Thread %d') % root_post_id, 'items': [{ 'title': post['title'], 'link': 'post/%d' % post['post_id'], 'description': parse_and_render(req, post['text']), 'author': post['username'], 'pub_date': post['timestamp'] } for post in result] } class ForumFeed(FeedProvider): identifier = 'forum' def get_feed(self, req, forum_id): _ = req.gettext try: forum_id = int(forum_id) except: raise FeedNotFound() if self.ctx.engine.execute(meta.select([forums.c.forum_id], (forums.c.forum_id == forum_id) )).fetchone() is None: raise FeedNotFound() # select data result = self.ctx.engine.execute(meta.select( [posts.c.post_id, posts.c.title, posts.c.text, posts.c.timestamp, users.c.username], (posts.c.forum_id == forum_id) & (users.c.user_id == posts.c.author_id), order_by=[meta.desc(posts.c.post_id)], limit=10 )) return { 'title': _('Last Posts in Forum %d') % forum_id, 'description': _('The last 10 posts of forum %d') % forum_id, 'items': [{ 'title': post['title'], 'link': 'post/%d' % post['post_id'], 'description': parse_and_render(req, post['text']), 'author': post['username'], 'pub_date': post['timestamp'] } for post in result] } class RecentChangesFeed(FeedProvider): identifier = 'recent' def get_title(self, req): _ = req.gettext return _('Recent Changes') def get_description(self, req): _ = req.gettext return _('The recent posts') def get_feed(self, req, parameter): _ = req.gettext if parameter: raise FeedNotFound() result = self.ctx.engine.execute(meta.select( [posts.c.post_id, posts.c.title, posts.c.text, posts.c.timestamp, users.c.username], (users.c.user_id == posts.c.author_id), order_by=[meta.desc(posts.c.post_id)], limit=10 )) return { 'title': _('Recent Changes'), 'description': _('The most recent posts'), 'items': [{ 'title': post['title'], 'link': 'post/%d' % post['post_id'], 'description': parse_and_render(req, post['text']), 'author': post['username'], 'pub_date': post['timestamp'] } for post in result] } class FeedDisplay(RequestHandler): handler_regexes = [ r'^feeds/(?P<feed>[a-z0-9_-]+)\.xml$', r'^feeds/(?P<feed>[a-z0-9_-]+)/(?P<parameter>.+)\.xml$' ] def handle_request(self, req, feed, parameter=None): for feed_provider in self.ctx.get_components(FeedProvider): if feed_provider.identifier == feed: data = feed_provider.get_feed(req, parameter) feed = Feed(self.ctx, data['title'], data['description'], self.ctx.make_external_url('')) try: for item in data['items']: feed.add_item(**item) except FeedNotFound: return PageNotFound() resp = Response(feed.generate()) resp['Content-Type'] = 'text/xml' return resp return PageNotFound() # -*- coding: utf-8 -*- """ pocoo.pkg.core.forum ~~~~~~~~~~~~~~~~~~~~ Forum Utilities. :copyright: 2006 by Armin Ronacher, Benjamin Wiegand. :license: GNU GPL, see LICENSE for more details. """ from datetime import datetime from math import ceil from pocoo import Component from pocoo.db import meta, DatabaseModel, lazy_column from pocoo.pkg.core.user import User from pocoo.pkg.core.db import forums, posts, users, ANONYMOUS_USER_ID from pocoo.pkg.core.template import Pagination, LazyPagination from pocoo.pkg.core.textfmt import parse_and_render, quote_text from pocoo.utils.uri import urlencode from pocoo.utils.iterators import inciter # for default arguments in Thread _missing = object() class PostProcessor(Component): """ Process a posting before it is stored in the database. """ def process_post(self, text, title, reason): """ Process a posting. :param text: Text of the posting, possibly changed by another PostProcessor. :param title: The subject of the posting :param reason: Can be ``'new'`` or ``'edit'``. :returns: * ``True``: store the posting as-is, or * ``False``: refuse to store the posting, or * a string: use as the new posting text, or * a tuple: (text, title) for the posting """ return True def apply_post_processors(ctx, text, title, reason): """ Apply all `PostProcessor` components to the posting. Return (``text``, ``title``) tuple. """ for comp in ctx.get_components(PostProcessor): rv = comp.process_post(text, title, 'new') if not rv: raise ValueError('creation of posting denied') elif isinstance(rv, basestring): text = unicode(rv) elif isinstance(rv, tuple): text = unicode(rv[0]) title = unicode(rv[1]) return text, title def get_forum_index(req): """ Return a list of dicts with forum information so that the template can use it. If the request object has an identified user object attached the returned dict will include status information. (read, unread) """ ctx = req.ctx f = forums.c p = posts.c u = users.c columns = [f.forum_id, f.description, f.name, f.link, f.post_count, f.thread_count] categories = [] def do(con): for category in con.execute(meta.select(columns, f.parent_id == None)): category = dict(category) category['is_external_link'] = bool(category.pop('link')) category['url'] = ctx.make_url('forum', category['forum_id']) forums = [] for forum in con.execute(meta.select(columns + [f.last_post_id], f.parent_id == category['forum_id'])): forum = dict(forum) forum['is_external_link'] = bool(forum.pop('link')) forum['url'] = ctx.make_url('forum', forum['forum_id']) # get last post last_post_id = forum.pop('last_post_id') if last_post_id is not None: result = con.execute(meta.select([u.user_id, u.username, p.post_id, p.title, p.timestamp, p.username.label('guestname')], (p.post_id == last_post_id) & (p.post_id != None) & (u.user_id == p.author_id) )) last_post = dict(result.fetchone()) username = urlencode(last_post['username']) last_post['author'] = { 'user_id': last_post['user_id'], 'registered': last_post.pop('user_id') > 0, 'username': last_post.pop('guestname') or\ last_post.pop('username'), 'url': ctx.make_url('users', username) } last_post['url'] = ctx.make_url('post', last_post['post_id']) else: last_post = None forum['last_post'] = last_post subforums = [] for sf in con.execute(meta.select([f.forum_id, f.name, f.link], f.parent_id == forum['forum_id'])): sf = dict(sf) sf['is_external_url'] = bool(sf['forum_id']) sf['url'] = ctx.make_url('forum', sf.pop('forum_id')) subforums.append(sf) forum['subforums'] = subforums forums.append(forum) category['forums'] = forums categories.append(category) ctx.engine.transaction(do) return categories def get_forum(req, forum_id, page=1): """ Return a list of dicts so that the template can use it. Return ``None`` if the forum does not exist. """ ctx = req.ctx f = forums.c p = posts.c u = users.c columns = [f.forum_id, f.description, f.name, f.link, f.post_count, f.thread_count] forum_columns = [f.forum_id, f.name, f.description, f.link, f.post_count, f.thread_count, f.last_post_id] sf_columns = [f.forum_id, f.name, f.link] thread_columns = [p.post_id, p.title, p.timestamp, u.user_id, u.username, p.post_count, p.view_count, p.username.label('guestname')] def do(con): category = con.execute(meta.select(columns, f.forum_id == forum_id )).fetchone() if category is None: return category = dict(category) category['url'] = ctx.make_url('forum', category['forum_id']) # wo don't pop link here so that the ForumPage request handler # can use the link key for redirecting. That means we don't # need a second query. But template designers shouldn't # ever access {{ forum.link }} category['is_external_url'] = bool(category['link']) forums = [] for forum in con.execute(meta.select(forum_columns, f.parent_id == category['forum_id'])): forum = dict(forum) forum['url'] = ctx.make_url('forum', forum['forum_id']) # wo don't pop that here so that the ForumPage request handler # can use the link key for redirecting. That means we don't # need a second query. But template designers shouldn't # ever access {{ forum.link }} forum['is_external_link'] = bool(forum.pop('link')) subforums = [] for sf in con.execute(meta.select(sf_columns, f.parent_id == forum['forum_id'])): sf = dict(sf) sf['is_external_link'] = bool(sf.pop('link')) sf['url'] = ctx.make_url('forum', sf['forum_id']) subforums.append(sf) forum['subforums'] = subforums # get last post last_post_id = forum.pop('last_post_id') if last_post_id is not None: result = con.execute(meta.select([u.user_id, u.username, p.post_id, p.title, p.username.label('guestname'), p.timestamp], (p.post_id == last_post_id) & (p.post_id != None) & (u.user_id == p.author_id) )) last_post = result.fetchone() last_post = dict(last_post) username = urlencode(last_post['username']) last_post['author'] = { 'registered': last_post['user_id'] > 0, 'user_id': last_post.pop('user_id'), 'username': last_post.pop('guestname') or\ last_post.pop('username'), 'url': ctx.make_url('users', username), } last_post['url'] = ctx.make_url('post', last_post['post_id']) else: last_post = None forum['last_post'] = last_post forums.append(forum) category['forums'] = forums # pagination def get_page_link(number): link = 'forum/%d' % forum_id if number > 1: link += '?page=%d' % number return link threads_per_page = get_threads_per_page(req) page_count = int(ceil(category['thread_count'] / (threads_per_page * 1.0))) pagination = LazyPagination(req, page, threads_per_page, page_count, get_page_link) threads = [] for thread in con.execute(meta.select(thread_columns, (p.forum_id == category['forum_id']) & (p.parent_id == None) & (u.user_id == p.author_id), order_by=[meta.desc(p.post_id)], limit=threads_per_page, offset=threads_per_page * (page - 1) )): thread = dict(thread) thread['url'] = ctx.make_url('post', thread['post_id']) thread['author'] = { 'registered': thread['user_id'] > 0, 'user_id': thread.pop('user_id'), 'username': thread.pop('guestname') or thread['username'], 'url': ctx.make_url('users', urlencode(thread.pop('username'))) } # get last post result = con.execute(meta.select([u.user_id, u.username, p.post_id, p.title, p.timestamp, p.username.label('guestname')], (p.root_post_id == thread['post_id']) & (u.user_id == p.author_id), order_by=[meta.desc(p.post_id)], limit=1 )) last_post = result.fetchone() if last_post is not None: last_post = dict(last_post) username = last_post.pop('username') last_post['author'] = { 'registered': last_post['user_id'] > 0, 'user_id': last_post.pop('user_id'), 'username': last_post.pop('guestname') or username, 'url': ctx.make_url('users', urlencode(username)), } last_post['url'] = ctx.make_url('post', last_post['post_id']) thread['last_post'] = last_post threads.append(thread) category['threads'] = threads category['pagination'] = pagination return category return ctx.engine.transaction(do) def get_forum_pathbar(ctx, forum_id): """Return the pathbar for a given forum.""" f = forums.c pathbar = [] def do(con, fid=None): if fid is None: fid = forum_id row = con.execute(meta.select([f.parent_id, f.name], forums.c.forum_id == fid )).fetchone() if row is not None: l = 'forum/%d' % fid pathbar.append({ 'url': ctx.make_url(l), 'forum_id': fid, 'name': row['name'] }) if row['parent_id'] is not None: do(con, row['parent_id']) ctx.engine.transaction(do) pathbar.reverse() return pathbar def get_post_pathbar(ctx, post_id): """Returns the pathbar for a given post including all forums and subforums""" thread = Thread.by_child(ctx, post_id) pathbar = get_forum_pathbar(ctx, thread.forum_id) post_list = [ thread.root_post_id ] p = posts.c if thread.root_post_id != int(post_id): post_list.append(post_id) def do(con): for id in post_list: row = con.execute(meta.select([p.title], p.post_id == id )).fetchone() pathbar.append({ 'url': ctx.make_url('post/%s' % id), 'name': row["title"] }) ctx.engine.transaction(do) return pathbar def get_post_tree(req, post_id, inc_view_count=True): """ Return a dict with the thread information and a tree of posts. Per default it will increment the view counter of the thread requested. If you don't want that set ``inc_view_count`` to ``False``. """ ctx = req.ctx p = posts.c u = users.c f = forums.c # load the post requested and lookup root_post_id result = ctx.engine.execute(meta.select([p.root_post_id, p.post_id, p.title, p.text, p.timestamp, u.user_id, u.register_date, u.username, u.profile, u.post_count, p.username.label('guestname')], (p.post_id == post_id) & (u.user_id == p.author_id) )) row = result.fetchone() if row is None: # XXX: need error return here return post = dict(row) post['url'] = ctx.make_url('post', row['post_id']) post['author'] = { 'user_id': post['user_id'], 'username': post.pop('guestname') or post['username'], 'self': req.user.user_id == post['user_id'], 'registered': post.pop('user_id') > 0, 'register_date': post.pop('register_date'), 'url': ctx.make_url('users', urlencode(post.pop('username'))), 'profile': post.pop('profile'), 'post_count': post.pop('post_count'), } signature = None if post['author']['profile'].get('signature'): signature = parse_and_render(req, post['author']['profile']['signature'], signature=True) post['author']['signature'] = signature #XXX: cache here post['parsed_text'] = parse_and_render(req, post['text']) root_post_id = post.pop('root_post_id') result = ctx.engine.execute(meta.select([p.post_id, p.root_post_id, p.title, p.parent_id, p.timestamp, u.username, u.user_id, p.username.label('guestname')], (p.root_post_id == root_post_id) & (u.user_id == p.author_id) )) def prepare(row): d = dict(row) d['author'] = { 'user_id': d['user_id'], 'username': d.pop('guestname') or d['username'], 'self': req.user.user_id == d['user_id'], 'registered': d.pop('user_id') > 0, 'url': ctx.make_url('users', urlencode(d.pop('username'))), } d['active'] = d['post_id'] == post_id d['url'] = ctx.make_url('post', row['post_id']) return d # map threads by their parents and prepare the context mapping = {} flat_posts = [] for row in result: tmp = prepare(row) mapping.setdefault(row['parent_id'], []).append(tmp) flat_posts.append(tmp) root = mapping.pop(None, None) if root is None: return assert len(root) == 1, 'something went seriously wrong' # reassamble thread def reassamble(nodes): for node in nodes: node['children'] = n = mapping.pop(node['post_id'], []) reassamble(n) reassamble(root) # increment view_count if inc_view_count: result = ctx.engine.execute(meta.select([p.view_count], p.post_id == root_post_id)) ctx.engine.execute(posts.update(p.post_id == root_post_id), view_count = result.fetchone()[0] + 1 ) # fetch overall information for whole thread result = ctx.engine.execute(meta.select([p.post_id, p.title, p.forum_id, p.timestamp, u.user_id, u.username, f.name, p.username.label('guestname')], (p.post_id == root_post_id) & (u.user_id == p.author_id) & (f.forum_id == p.forum_id) )) row = result.fetchone() return { 'post_id': row['post_id'], 'url': ctx.make_url('post', row['post_id']), 'title': row['title'], 'timestamp': row['timestamp'], 'forum': { 'forum_id': row['forum_id'], 'name': row['name'], 'url': ctx.make_url('forum', row['forum_id']), }, 'author': { 'user_id': row['user_id'], 'registered': row['user_id'] > 0, 'username': row['guestname'] or row['username'], 'url': ctx.make_url('users', urlencode(row['username'])), }, 'posts': root, 'post': post } def get_post(req, post_id): """ Return exactly one post. If the post does not exist the result will be ``None``. """ ctx = req.ctx p = posts.c u = users.c r = ctx.engine.execute(meta.select([p.post_id, p.title, p.text, p.timestamp, u.user_id, u.username, u.profile, u.register_date, u.post_count, p.username.label('guestname')], (p.post_id == post_id) & (u.user_id == p.author_id) )) row = r.fetchone() if row is None: return post = dict(row) post['url'] = ctx.make_url('post', post['post_id']) post['author'] = { 'user_id': post['user_id'], 'username': post.pop('guestname') or post['username'], 'self': req.user.user_id == post['user_id'], 'registered': post.pop('user_id') > 0, 'register_date': post.pop('register_date'), 'url': ctx.make_url('users', urlencode(post.pop('username'))), 'profile': post.pop('profile'), 'post_count': post.pop('post_count') } signature = None if post['author']['profile'].get('signature'): signature = parse_and_render(req, post['author']['profile']['signature']) post['author']['signature'] = signature #XXX: cache here post['parsed_text'] = parse_and_render(req, post['text']) return post def get_last_posts(req, root_post_id, n=1, offset=0): """ Returns a flat view of the n latest posts that are children of root_post_id. """ p = posts.c u = users.c result = req.ctx.engine.execute(meta.select([p.post_id, p.title, p.text, p.timestamp, u.username, u.user_id, u.profile, u.post_count, u.register_date, p.username.label('guestname')], (p.root_post_id == root_post_id) & (u.user_id == p.author_id), order_by=[meta.desc(p.post_id)], limit=n, offset=offset )) def prepare(row): d = dict(row) user_id = d.pop('user_id') d['url'] = req.ctx.make_url('post', row['post_id']) d['author'] = { 'user_id': user_id, 'registered': user_id > 0, 'username': d.pop('guestname') or d.pop('username'), 'self': req.user.user_id == user_id, 'profile': d.pop('profile'), 'post_count': d.pop('post_count'), 'register_date': d.pop('register_date'), 'url': req.ctx.make_url('users', urlencode(row['username'])), } signature = None if d['author']['profile'].get('signature'): signature = parse_and_render(req, d['author']['profile']['signature'], signature=True) d['author']['signature'] = signature #XXX: this doesn't cache by now d['parsed_text'] = parse_and_render(req, d['text']) return d post_list = [prepare(row) for row in result] return post_list def get_flat_view(req, post_id, inc_view_count=True, order='asc'): """ Returns the flat view of an post and the next n posts so that the template can render a page. n is the number of posts per page defined in either the user settings or the global forum configuration. Per default it will increment the view counter of the thread requested. If you don't want that set ``inc_view_count`` to ``False``. If you want to get the latest post first, set ``order`` to ``'desc'``. """ ctx = req.ctx p = posts.c f = forums.c u = users.c # find root_post_id result = ctx.engine.execute(meta.select([p.root_post_id], p.post_id == post_id )) # XXX: This raises TypeError on failure. root_post_id = result.fetchone()[0] # select all post ids to calculate the position of the post on a page # the purpose of this calculation is to find the first and last post # on the page if the post_id given the function result = ctx.engine.execute(meta.select([p.post_id], p.root_post_id == root_post_id )) posts_per_page = get_posts_per_page(req) postlist = [row[0] for row in result] post_index = postlist.index(post_id) page = post_index // posts_per_page page_start = page * posts_per_page post_range_low = postlist[page_start] post_range_high = postlist[page_start:page_start + posts_per_page][-1] pagination = Pagination(req, postlist, page_start, posts_per_page, lambda x: 'post/%d' % x) orderfunc = (order == 'desc' and meta.desc or meta.asc) # select matching posts result = ctx.engine.execute(meta.select([p.post_id, p.root_post_id, p.title, p.forum_id, p.parent_id, p.text, p.timestamp, u.username, u.user_id, u.profile, u.post_count, u.register_date, p.username.label('guestname')], (p.root_post_id == root_post_id) & (p.post_id >= post_range_low) & (p.post_id <= post_range_high) & (u.user_id == p.author_id), order_by=[orderfunc(p.post_id)] )) def prepare(number, row): d = dict(row) user_id = d.pop('user_id') d['post_number'] = number d['url'] = ctx.make_url('post', row['post_id']) d['author'] = { 'user_id': user_id, 'registered': user_id > 0, 'username': d.pop('guestname') or d.pop('username'), 'self': req.user.user_id == user_id, 'profile': d.pop('profile'), 'post_count': d.pop('post_count'), 'register_date': d.pop('register_date'), 'url': ctx.make_url('users', urlencode(row['username'])), } signature = None if d['author']['profile'].get('signature'): signature = parse_and_render(req, d['author']['profile']['signature'], signature=True) d['author']['signature'] = signature #XXX: this doesn't cache by now d['parsed_text'] = parse_and_render(req, d['text']) return d real_posts = [prepare(num, row) for num, row in inciter(result, page_start)] # increment view_count if inc_view_count: result = ctx.engine.execute(meta.select([p.view_count], p.post_id == root_post_id)) ctx.engine.execute(posts.update(p.post_id == root_post_id), view_count = result.fetchone()[0] + 1 ) # and another query for the overview page result = ctx.engine.execute(meta.select([p.post_id, p.title, p.forum_id, p.timestamp, u.user_id, u.username, f.name, p.username.label('guestname')], (p.post_id == root_post_id) & (u.user_id == p.author_id) & (f.forum_id == p.forum_id) )) row = result.fetchone() return { 'post_id': row['post_id'], 'url': ctx.make_url('post', row['post_id']), 'title': row['title'], 'timestamp': row['timestamp'], 'forum': { 'forum_id': row['forum_id'], 'name': row['name'], 'url': ctx.make_url('forum', row['forum_id']), }, 'author': { 'user_id': row['user_id'], 'username': row['guestname'] or row['username'], 'url': ctx.make_url('users', urlencode(row['username'])), }, 'pagination': pagination, 'posts': real_posts } def get_last_thread_change(req, post_id): """ Return the timestamp of the last change in the thread. ``post_id`` must be the root_post_id, there is no further check done. Return ``None`` if something in the query went wrong (eg. no thread with the requested root_post_id exists) """ #XXX: doesn't cover edits result = req.ctx.engine.execute(meta.select([posts.c.timestamp], (posts.c.root_post_id == post_id), order_by=[meta.desc(posts.c.post_id)], limit=1 )) row = result.fetchone() if row is None: return return row[0] def get_posts_per_page(req): """ Return the number of posts a user wishes to display on the flat view page. """ try: posts_per_page = req.user.settings['posts_per_page'] if posts_per_page is not None: return posts_per_page except KeyError: pass return req.ctx.cfg.get_int('board', 'posts_per_page', 15) def get_threads_per_page(req): """ Return the number of posts a users whishes to display on the viewforum page. """ try: threads_per_page = req.user.settings['threads_per_page'] if threads_per_page is not None: return threads_per_page except KeyError: pass return req.ctx.cfg.get_int('board', 'threads_per_page', 20) def get_view_mode(req): """ Return the display mode a user has defined in the user settings or fall back to the default mode from the pocoo.conf. :return: either ``'flat'`` or ``'threaded'``. """ val = req.user.settings.get('view_mode') if val in ('flat', 'threaded'): return val val = req.ctx.cfg.get('board', 'default_view', None) return (val in ('flat', 'threaded')) and val or 'threaded' def quote_post(req, post_id): """ Return a tuple in the form ``(text, title)`` which is useful for replying and quoting existing posts. The title is prefixed with a local representation of 'Re:' and the text is quoted with the selected markup. """ p = posts.c u = users.c _ = req.gettext result = req.ctx.engine.execute(meta.select([p.title, p.text, u.username], (p.post_id == post_id) & (u.user_id == p.author_id) )) row = result.fetchone() if row is None: # XXX: ValueError? raise ValueError('post %s does not exist') suffix = _('Re:') title = row['title'] if not title.startswith(suffix): title = u'%s %s' % (suffix, title) text = quote_text(req, row['text'], row['username']) return text, title def edit_post(req, post_id): """ Return a tuple in the form (``text``, ``title``, ``username``) for the edit view. :see: `quote_post` """ p = posts.c result = req.ctx.engine.execute(meta.select([p.text, p.title, p.username], (p.post_id == post_id) )) row = result.fetchone() if row is None: # XXX: ValueError? raise ValueError('post %s does not exist') return tuple(row) class _Site(object): """A special singleton representing a whole site.""" object_id = 0 def __repr__(self): return '<%s>' % self.__class__.__name__ Site = _Site() class Forum(DatabaseModel): """ This class represents one forum. Don't pass instances of this class to templates, therefore there are some other functions in this module. The main purpose of this class is the creation and management of forums. You can also use this class for the ACL functions. """ def __init__(self, ctx, forum_id): self.ctx = ctx self.forum_id = forum_id super(Forum, self).__init__(ctx, forums, 'forum_id') parent_id = lazy_column('parent_id') object_id = lazy_column('object_id') name = lazy_column('name') description = lazy_column('description') position = lazy_column('position') link = lazy_column('link') @staticmethod def create(ctx, name, description="", parent=None, position=None, link=None): """Create a new forum.""" if isinstance(parent, Forum): parent = parent.forum_id result = ctx.engine.execute(forums.insert(), parent_id=parent, name=name, description=description, position=position, link=link, post_count=0, thread_count=0 ) return Forum(ctx, result.last_inserted_ids()[0]) def parent_get(self): return Forum(self.ctx, self.parent_id) def parent_set(self, value): if value is None: self.parent_id = None if isinstance(value, Forum): self.parent_id = value.forum_id parent = property(parent_get, parent_set) del parent_get, parent_set def __repr__(self): return '<%s %d: %r>' % ( self.__class__.__name__, self.forum_id, self.name ) class Thread(DatabaseModel): """ This class represents a root post with all of its children. You can use this class to manage a thread, add a new reply or edit one of its children. """ def __init__(self, ctx, root_post_id): self.ctx = ctx self.post_id = root_post_id super(Thread, self).__init__(ctx, posts, 'post_id', posts.c.parent_id == None ) forum_id = lazy_column('forum_id') parent_id = lazy_column('parent_id') root_post_id = lazy_column('root_post_id') object_id = lazy_column('object_id') author_id = lazy_column('author_id') title = lazy_column('title') text = lazy_column('text') timestamp = lazy_column('timestamp') @staticmethod def create(ctx, forum, author, title, text, timestamp=None): """Create a new thread. If author is a string it will be an anonymous posting.""" username = None if isinstance(forum, Forum): forum = forum.forum_id if isinstance(author, User): author = author.user_id elif isinstance(author, basestring): username = author author = ANONYMOUS_USER_ID if timestamp is None: timestamp = datetime.utcnow() def do(con): result = con.execute(posts.insert(), forum_id = forum, author_id = author, username = username, title = title, text = text, timestamp = timestamp, post_count = 1, view_count = 0 ) thread_id = result.last_inserted_ids()[-1] # increment author post count if author > -1: old = meta.select([users.c.post_count], users.c.user_id == author) con.execute(users.update(users.c.user_id == author), post_count = con.execute(old).fetchone()[0] + 1 ) # increment forum post and thread count old = meta.select([forums.c.post_count, forums.c.thread_count], forums.c.forum_id == forum) row = con.execute(old).fetchone() con.execute(forums.update(forums.c.forum_id == forum), post_count = row[0] + 1, thread_count = row[1] + 1, last_post_id = thread_id ) return thread_id thread_id = ctx.engine.transaction(do) # XXX: this feels a bit strange t = Thread(ctx, thread_id) t.root_post_id = t.post_id t.save() return t @staticmethod def by_child(ctx, post_id): """ Return the thread of a given ``post_id``. """ result = ctx.engine.execute(meta.select([posts.c.root_post_id], (posts.c.post_id == post_id) )) row = result.fetchone() if row is None: # XXX: ValueError? raise ValueError('post %s does not exist' % post_id) return Thread(ctx, row[0]) def reply(self, post_id, author, title, text, timestamp=None, no_processor=False): """ Reply to post ``post_id`` which is a child of the thread. Return the id of the new post. If ``author`` is a string it will be an anonymous posting. """ username = None if post_id is None: post_id = self.post_id if isinstance(author, User): author = author.user_id elif isinstance(author, basestring): username = author author = ANONYMOUS_USER_ID if timestamp is None: timestamp = datetime.utcnow() if not no_processor: text, title = apply_post_processors(self.ctx, text, title, 'new') def do(con): result = con.execute(meta.select([posts.c.root_post_id], posts.c.post_id == post_id )) row = result.fetchone() if row is None or row[0] != int(self.post_id): # XXX: ValueError? raise ValueError('The post either does not exist or does not ' 'belong to this thread') new_post_id = con.execute(posts.insert(), forum_id = self.forum_id, parent_id = post_id, root_post_id = self.post_id, author_id = author, username = username, title = title, text = text, timestamp = timestamp ).last_inserted_ids()[0] # increment author post count if author > -1: old = meta.select([users.c.post_count], users.c.user_id == author) con.execute(users.update(users.c.user_id == author), post_count = con.execute(old).fetchone()[0] + 1 ) # increment forum post count and update last_post_id old = meta.select([forums.c.post_count], forums.c.forum_id == self.forum_id) con.execute(forums.update(forums.c.forum_id == self.forum_id), post_count = con.execute(old).fetchone()[0] + 1, last_post_id = new_post_id ) # increment thread post count old = meta.select([posts.c.post_count], posts.c.post_id == self.post_id) con.execute(posts.update(posts.c.post_id == self.post_id), post_count = con.execute(old).fetchone()[0] + 1 ) return new_post_id return self.ctx.engine.transaction(do) def edit_reply(self, post_id, author=_missing, title=_missing, text=_missing, timestamp=_missing, no_processor=False): """Edit a reply.""" d = {} if author is not _missing: if isinstance(author, User): d['author_id'] = author.user_id # pylint: disable-msg=E1101 else: d['author_id'] = author if title is not _missing: d['title'] = title if text is not _missing: d['text'] = text if timestamp is not _missing: d['timestamp'] = timestamp if not no_processor and 'title' in d and 'text' in d: rv = apply_post_processors(self.ctx, d['text'], d['title'], 'edit') d['text'], d['title'] = rv self.ctx.engine.execute(posts.update(posts.c.post_id == post_id), **d) def has_child(self, post_id): """Check if a post_id is a child of this thread.""" result = self.ctx.engine.execute(meta.select([posts.c.root_post_id], posts.c.post_id == post_id )) row = result.fetchone() return row is not None and row['root_post_id'] == self.post_id def get_post_list(self): """ Return a flat list of all posts in this thread sorted by their post_id. """ result = self.ctx.engine.execute(posts.select( posts.c.root_post_id == self.post_id )) return map(dict, result) def count_children(self): """ Return the number of direct or indirect children of this thread. """ p = posts.c result = self.ctx.engine.execute(meta.select([meta.func.count(p.post_id)], p.root_post_id == self.post_id )) return result.fetchone()[0] __len__ = count_children def forum_get(self): if self.forum_id is not None: return Forum(self.ctx, self.forum_id) def forum_set(self, value): if not isinstance(value, Forum): raise TypeError('Can only set Forum instances') self.forum_id = value.forum_id forum = property(forum_get, forum_set) del forum_get, forum_set def __eq__(self, other): return self.post_id == other.post_id def __ne__(self, other): return not self.__eq__(other) def __repr__(self): return '<%s %d: %r>' % ( self.__class__.__name__, self.post_id, self.title ) # -*- coding: utf-8 -*-