File: my_plugin_2.py

package info (click to toggle)
datasette 0.65.2%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 3,540 kB
  • sloc: python: 19,371; javascript: 10,089; sh: 71; makefile: 47; ansic: 26
file content (211 lines) | stat: -rw-r--r-- 5,974 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
from datasette import hookimpl
from datasette.utils.asgi import Response
from functools import wraps
import markupsafe
import json


@hookimpl
def extra_js_urls():
    return [
        {
            "url": "https://plugin-example.datasette.io/jquery.js",
            "sri": "SRIHASH",
        },
        "https://plugin-example.datasette.io/plugin2.js",
    ]


@hookimpl
def render_cell(value, database):
    # Render {"href": "...", "label": "..."} as link
    if not isinstance(value, str):
        return None
    stripped = value.strip()
    if not stripped.startswith("{") and stripped.endswith("}"):
        return None
    try:
        data = json.loads(value)
    except ValueError:
        return None
    if not isinstance(data, dict):
        return None
    if set(data.keys()) != {"href", "label"}:
        return None
    href = data["href"]
    if not (
        href.startswith("/")
        or href.startswith("http://")
        or href.startswith("https://")
    ):
        return None
    return markupsafe.Markup(
        '<a data-database="{database}" href="{href}">{label}</a>'.format(
            database=database,
            href=markupsafe.escape(data["href"]),
            label=markupsafe.escape(data["label"] or "") or "&nbsp;",
        )
    )


@hookimpl
def extra_template_vars(template, database, table, view_name, request, datasette):
    # This helps unit tests that want to run assertions against the request object:
    datasette._last_request = request

    async def query_database(sql):
        first_db = list(datasette.databases.keys())[0]
        return (await datasette.execute(first_db, sql)).rows[0][0]

    async def inner():
        return {
            "extra_template_vars_from_awaitable": json.dumps(
                {
                    "template": template,
                    "scope_path": request.scope["path"] if request else None,
                    "awaitable": True,
                },
                default=lambda b: b.decode("utf8"),
            ),
            "query_database": query_database,
        }

    return inner


@hookimpl
def asgi_wrapper(datasette):
    def wrap_with_databases_header(app):
        @wraps(app)
        async def add_x_databases_header(scope, receive, send):
            async def wrapped_send(event):
                if event["type"] == "http.response.start":
                    original_headers = event.get("headers") or []
                    event = {
                        "type": event["type"],
                        "status": event["status"],
                        "headers": original_headers
                        + [
                            [
                                b"x-databases",
                                ", ".join(datasette.databases.keys()).encode("utf-8"),
                            ]
                        ],
                    }
                await send(event)

            await app(scope, receive, wrapped_send)

        return add_x_databases_header

    return wrap_with_databases_header


@hookimpl
def actor_from_request(datasette, request):
    async def inner():
        if request.args.get("_bot2"):
            result = await datasette.get_database().execute("select 1 + 1")
            return {"id": "bot2", "1+1": result.first()[0]}
        else:
            return None

    return inner


@hookimpl
def permission_allowed(datasette, actor, action):
    # Testing asyncio version of permission_allowed
    async def inner():
        assert 2 == (await datasette.get_database().execute("select 1 + 1")).first()[0]
        if action == "this_is_allowed_async":
            return True
        elif action == "this_is_denied_async":
            return False

    return inner


@hookimpl
def prepare_jinja2_environment(env, datasette):
    env.filters["format_numeric"] = lambda s: f"{float(s):,.0f}"
    env.filters["to_hello"] = lambda s: datasette._HELLO


@hookimpl
def startup(datasette):
    async def inner():
        result = await datasette.get_database().execute("select 1 + 1")
        datasette._startup_hook_calculation = result.first()[0]

    return inner


@hookimpl
def canned_queries(datasette, database):
    async def inner():
        return {
            "from_async_hook": "select {}".format(
                (
                    await datasette.get_database(database).execute("select 1 + 1")
                ).first()[0]
            )
        }

    return inner


@hookimpl(trylast=True)
def menu_links(datasette, actor):
    async def inner():
        if actor:
            return [{"href": datasette.urls.instance(), "label": "Hello 2"}]

    return inner


@hookimpl
def table_actions(datasette, database, table, actor, request):
    async def inner():
        if actor:
            label = "From async"
            if request.args.get("_hello"):
                label += " " + request.args["_hello"]
            return [{"href": datasette.urls.instance(), "label": label}]

    return inner


@hookimpl
def register_routes(datasette):
    config = datasette.plugin_config("register-route-demo")
    if not config:
        return
    path = config["path"]

    def new_table(request):
        return Response.text("/db/table: {}".format(sorted(request.url_vars.items())))

    return [
        (r"/{}/$".format(path), lambda: Response.text(path.upper())),
        # Also serves to demonstrate over-ride of default paths:
        (r"/(?P<db_name>[^/]+)/(?P<table_and_format>[^/]+?$)", new_table),
    ]


@hookimpl
def handle_exception(datasette, request, exception):
    datasette._exception_hook_fired = (request, exception)
    if request.args.get("_custom_error"):
        return Response.text("_custom_error")
    elif request.args.get("_custom_error_async"):

        async def inner():
            return Response.text("_custom_error_async")

        return inner


@hookimpl(specname="register_routes")
def register_triger_error():
    return ((r"/trigger-error", lambda: 1 / 0),)