1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120
|
import time
from os import path
from threading import Thread
from jinja2 import Environment
from jinja2 import PackageLoader
from werkzeug.exceptions import HTTPException
from werkzeug.exceptions import NotFound
from werkzeug.middleware.shared_data import SharedDataMiddleware
from werkzeug.routing import Map
from werkzeug.routing import Rule
from werkzeug.wrappers import Request
from werkzeug.wrappers import Response
from .db import Database
from .network import ServerBrowser
templates = path.join(path.dirname(__file__), "templates")
pages = {}
url_map = Map([Rule("/shared/<file>", endpoint="shared")])
def make_app(database, interval=120):
return SharedDataMiddleware(
Cup(database, interval),
{"/shared": path.join(path.dirname(__file__), "shared")},
)
class PageMeta(type):
def __init__(cls, name, bases, d):
type.__init__(cls, name, bases, d)
if d.get("url_rule") is not None:
pages[cls.identifier] = cls
url_map.add(
Rule(cls.url_rule, endpoint=cls.identifier, **cls.url_arguments)
)
@property
def identifier(cls):
return cls.__name__.lower()
def _with_metaclass(meta, *bases):
"""Create a base class with a metaclass."""
class metaclass(type):
def __new__(metacls, name, this_bases, d):
return meta(name, bases, d)
return type.__new__(metaclass, "temporary_class", (), {})
class Page(_with_metaclass(PageMeta, object)):
url_arguments = {}
def __init__(self, cup, request, url_adapter):
self.cup = cup
self.request = request
self.url_adapter = url_adapter
def url_for(self, endpoint, **values):
return self.url_adapter.build(endpoint, values)
def process(self):
pass
def render_template(self, template=None):
if template is None:
template = f"{type(self).identifier}.html"
context = dict(self.__dict__)
context.update(url_for=self.url_for, self=self)
return self.cup.render_template(template, context)
def get_response(self):
return Response(self.render_template(), mimetype="text/html")
class Cup:
def __init__(self, database, interval=120):
self.jinja_env = Environment(loader=PackageLoader("cupoftee"), autoescape=True)
self.interval = interval
self.db = Database(database)
self.server_browser = ServerBrowser(self)
self.updater = Thread(None, self.update_server_browser)
self.updater.daemon = True
self.updater.start()
def update_server_browser(self):
while 1:
if self.server_browser.sync():
wait = self.interval
else:
wait = self.interval // 2
time.sleep(wait)
def dispatch_request(self, request):
url_adapter = url_map.bind_to_environ(request.environ)
try:
endpoint, values = url_adapter.match()
page = pages[endpoint](self, request, url_adapter)
response = page.process(**values)
except NotFound:
page = MissingPage(self, request, url_adapter)
response = page.process()
except HTTPException as e:
return e
return response or page.get_response()
def __call__(self, environ, start_response):
request = Request(environ)
return self.dispatch_request(request)(environ, start_response)
def render_template(self, name, **context):
template = self.jinja_env.get_template(name)
return template.render(context)
from cupoftee.pages import MissingPage
|