1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
|
# examples/things_advanced_asgi.py
import json
import logging
import uuid
import falcon
import falcon.asgi
import httpx
class StorageEngine:
async def get_things(self, marker, limit):
return [{'id': str(uuid.uuid4()), 'color': 'green'}]
async def add_thing(self, thing):
thing['id'] = str(uuid.uuid4())
return thing
class StorageError(Exception):
@staticmethod
async def handle(ex, req, resp, params):
# TODO: Log the error, clean up, etc. before raising
raise falcon.HTTPInternalServerError()
class SinkAdapter:
engines = {
'ddg': 'https://duckduckgo.com',
'y': 'https://search.yahoo.com/search',
}
async def __call__(self, req, resp, engine):
url = self.engines[engine]
params = {'q': req.get_param('q', True)}
async with httpx.AsyncClient() as client:
result = await client.get(url, params=params)
resp.status = result.status_code
resp.content_type = result.headers['content-type']
resp.text = result.text
class AuthMiddleware:
async def process_request(self, req, resp):
token = req.get_header('Authorization')
account_id = req.get_header('Account-ID')
challenges = ['Token type="Fernet"']
if token is None:
description = 'Please provide an auth token as part of the request.'
raise falcon.HTTPUnauthorized(
title='Auth token required',
description=description,
challenges=challenges,
href='http://docs.example.com/auth',
)
if not self._token_is_valid(token, account_id):
description = (
'The provided auth token is not valid. '
'Please request a new token and try again.'
)
raise falcon.HTTPUnauthorized(
title='Authentication required',
description=description,
challenges=challenges,
href='http://docs.example.com/auth',
)
def _token_is_valid(self, token, account_id):
return True # Suuuuuure it's valid...
class RequireJSON:
async def process_request(self, req, resp):
if not req.client_accepts_json:
raise falcon.HTTPNotAcceptable(
description='This API only supports responses encoded as JSON.',
href='http://docs.examples.com/api/json',
)
if req.method in ('POST', 'PUT'):
if 'application/json' not in req.content_type:
raise falcon.HTTPUnsupportedMediaType(
title='This API only supports requests encoded as JSON.',
href='http://docs.examples.com/api/json',
)
class JSONTranslator:
# NOTE: Normally you would simply use req.get_media() and resp.media for
# this particular use case; this example serves only to illustrate
# what is possible.
async def process_request(self, req, resp):
# NOTE: Test explicitly for 0, since this property could be None in
# the case that the Content-Length header is missing (in which case we
# can't know if there is a body without actually attempting to read
# it from the request stream.)
if req.content_length == 0:
# Nothing to do
return
body = await req.stream.read()
if not body:
raise falcon.HTTPBadRequest(
title='Empty request body',
description='A valid JSON document is required.',
)
try:
req.context.doc = json.loads(body.decode('utf-8'))
except (ValueError, UnicodeDecodeError):
description = (
'Could not decode the request body. The '
'JSON was incorrect or not encoded as '
'UTF-8.'
)
raise falcon.HTTPBadRequest(title='Malformed JSON', description=description)
async def process_response(self, req, resp, resource, req_succeeded):
if not hasattr(resp.context, 'result'):
return
resp.text = json.dumps(resp.context.result)
def max_body(limit):
async def hook(req, resp, resource, params):
length = req.content_length
if length is not None and length > limit:
msg = (
'The size of the request is too large. The body must not '
'exceed ' + str(limit) + ' bytes in length.'
)
raise falcon.HTTPPayloadTooLarge(
title='Request body is too large', description=msg
)
return hook
class ThingsResource:
def __init__(self, db):
self.db = db
self.logger = logging.getLogger('thingsapp.' + __name__)
async def on_get(self, req, resp, user_id):
marker = req.get_param('marker') or ''
limit = req.get_param_as_int('limit') or 50
try:
result = await self.db.get_things(marker, limit)
except Exception as ex:
self.logger.error(ex)
description = (
'Aliens have attacked our base! We will '
'be back as soon as we fight them off. '
'We appreciate your patience.'
)
raise falcon.HTTPServiceUnavailable(
title='Service Outage', description=description, retry_after=30
)
# NOTE: Normally you would use resp.media for this sort of thing;
# this example serves only to demonstrate how the context can be
# used to pass arbitrary values between middleware components,
# hooks, and resources.
resp.context.result = result
resp.set_header('Powered-By', 'Falcon')
resp.status = falcon.HTTP_200
@falcon.before(max_body(64 * 1024))
async def on_post(self, req, resp, user_id):
try:
doc = req.context.doc
except AttributeError:
raise falcon.HTTPBadRequest(
title='Missing thing',
description='A thing must be submitted in the request body.',
)
proper_thing = await self.db.add_thing(doc)
resp.status = falcon.HTTP_201
resp.location = '/%s/things/%s' % (user_id, proper_thing['id'])
# The app instance is an ASGI callable
app = falcon.asgi.App(
middleware=[
# AuthMiddleware(),
RequireJSON(),
JSONTranslator(),
]
)
db = StorageEngine()
things = ThingsResource(db)
app.add_route('/{user_id}/things', things)
# If a responder ever raises an instance of StorageError, pass control to
# the given handler.
app.add_error_handler(StorageError, StorageError.handle)
# Proxy some things to another service; this example shows how you might
# send parts of an API off to a legacy system that hasn't been upgraded
# yet, or perhaps is a single cluster that all data centers have to share.
sink = SinkAdapter()
app.add_sink(sink, r'/search/(?P<engine>ddg|y)\Z')
|