1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244
|
from zope.interface import (
implementer,
providedBy,
)
from pyramid.interfaces import (
IDebugLogger,
IRequest,
IRequestExtensions,
IRootFactory,
IRouteRequest,
IRouter,
IRequestFactory,
IRoutesMapper,
ITraverser,
IView,
IViewClassifier,
ITweens,
)
from pyramid.events import (
ContextFound,
NewRequest,
NewResponse,
)
from pyramid.exceptions import PredicateMismatch
from pyramid.httpexceptions import HTTPNotFound
from pyramid.request import Request
from pyramid.threadlocal import manager
from pyramid.traversal import (
DefaultRootFactory,
ResourceTreeTraverser,
)
from pyramid.tweens import excview_tween_factory
@implementer(IRouter)
class Router(object):
debug_notfound = False
debug_routematch = False
threadlocal_manager = manager
def __init__(self, registry):
q = registry.queryUtility
self.logger = q(IDebugLogger)
self.root_factory = q(IRootFactory, default=DefaultRootFactory)
self.routes_mapper = q(IRoutesMapper)
self.request_factory = q(IRequestFactory, default=Request)
self.request_extensions = q(IRequestExtensions)
tweens = q(ITweens)
if tweens is None:
tweens = excview_tween_factory
self.orig_handle_request = self.handle_request
self.handle_request = tweens(self.handle_request, registry)
self.root_policy = self.root_factory # b/w compat
self.registry = registry
settings = registry.settings
if settings is not None:
self.debug_notfound = settings['debug_notfound']
self.debug_routematch = settings['debug_routematch']
def handle_request(self, request):
attrs = request.__dict__
registry = attrs['registry']
request.request_iface = IRequest
context = None
routes_mapper = self.routes_mapper
debug_routematch = self.debug_routematch
adapters = registry.adapters
has_listeners = registry.has_listeners
notify = registry.notify
logger = self.logger
has_listeners and notify(NewRequest(request))
# find the root object
root_factory = self.root_factory
if routes_mapper is not None:
info = routes_mapper(request)
match, route = info['match'], info['route']
if route is None:
if debug_routematch:
msg = ('no route matched for url %s' %
request.url)
logger and logger.debug(msg)
else:
attrs['matchdict'] = match
attrs['matched_route'] = route
if debug_routematch:
msg = (
'route matched for url %s; '
'route_name: %r, '
'path_info: %r, '
'pattern: %r, '
'matchdict: %r, '
'predicates: %r' % (
request.url,
route.name,
request.path_info,
route.pattern,
match,
', '.join([p.text() for p in route.predicates]))
)
logger and logger.debug(msg)
request.request_iface = registry.queryUtility(
IRouteRequest,
name=route.name,
default=IRequest)
root_factory = route.factory or self.root_factory
root = root_factory(request)
attrs['root'] = root
# find a context
traverser = adapters.queryAdapter(root, ITraverser)
if traverser is None:
traverser = ResourceTreeTraverser(root)
tdict = traverser(request)
context, view_name, subpath, traversed, vroot, vroot_path = (
tdict['context'],
tdict['view_name'],
tdict['subpath'],
tdict['traversed'],
tdict['virtual_root'],
tdict['virtual_root_path']
)
attrs.update(tdict)
has_listeners and notify(ContextFound(request))
# find a view callable
context_iface = providedBy(context)
view_callable = adapters.lookup(
(IViewClassifier, request.request_iface, context_iface),
IView, name=view_name, default=None)
# invoke the view callable
if view_callable is None:
if self.debug_notfound:
msg = (
'debug_notfound of url %s; path_info: %r, '
'context: %r, view_name: %r, subpath: %r, '
'traversed: %r, root: %r, vroot: %r, '
'vroot_path: %r' % (
request.url, request.path_info, context,
view_name, subpath, traversed, root, vroot,
vroot_path)
)
logger and logger.debug(msg)
else:
msg = request.path_info
raise HTTPNotFound(msg)
else:
try:
response = view_callable(context, request)
except PredicateMismatch:
# look for other views that meet the predicate
# criteria
for iface in context_iface.__sro__[1:]:
previous_view_callable = view_callable
view_callable = adapters.lookup(
(IViewClassifier, request.request_iface, iface),
IView, name=view_name, default=None)
# intermediate bases may lookup same view_callable
if view_callable is previous_view_callable:
continue
if view_callable is not None:
try:
response = view_callable(context, request)
break
except PredicateMismatch:
pass
else:
raise
return response
def invoke_subrequest(self, request, use_tweens=False):
"""Obtain a response object from the Pyramid application based on
information in the ``request`` object provided. The ``request``
object must be an object that implements the Pyramid request
interface (such as a :class:`pyramid.request.Request` instance). If
``use_tweens`` is ``True``, the request will be sent to the
:term:`tween` in the tween stack closest to the request ingress. If
``use_tweens`` is ``False``, the request will be sent to the main
router handler, and no tweens will be invoked.
See the API for pyramid.request for complete documentation.
"""
registry = self.registry
has_listeners = self.registry.has_listeners
notify = self.registry.notify
threadlocals = {'registry':registry, 'request':request}
manager = self.threadlocal_manager
manager.push(threadlocals)
request.registry = registry
request.invoke_subrequest = self.invoke_subrequest
if use_tweens:
handle_request = self.handle_request
else:
handle_request = self.orig_handle_request
try:
try:
extensions = self.request_extensions
if extensions is not None:
request._set_extensions(extensions)
response = handle_request(request)
if request.response_callbacks:
request._process_response_callbacks(response)
has_listeners and notify(NewResponse(request, response))
return response
finally:
if request.finished_callbacks:
request._process_finished_callbacks()
finally:
manager.pop()
def __call__(self, environ, start_response):
"""
Accept ``environ`` and ``start_response``; create a
:term:`request` and route the request to a :app:`Pyramid`
view based on introspection of :term:`view configuration`
within the application registry; call ``start_response`` and
return an iterable.
"""
request = self.request_factory(environ)
response = self.invoke_subrequest(request, use_tweens=True)
return response(request.environ, start_response)
|