File: api_handler.py

package info (click to toggle)
mozjs128 128.14.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,132,528 kB
  • sloc: javascript: 2,181,430; cpp: 1,371,420; python: 776,651; ansic: 641,398; xml: 117,736; sh: 17,537; asm: 13,468; makefile: 11,191; yacc: 4,504; perl: 2,221; lex: 1,414; ruby: 1,032; exp: 756; java: 185; sql: 66; sed: 18
file content (99 lines) | stat: -rw-r--r-- 3,318 bytes parent folder | download | duplicates (20)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# mypy: allow-untyped-defs

import json
import sys
import traceback
import logging

from urllib.parse import parse_qsl

global logger
logger = logging.getLogger("wave-api-handler")


class ApiHandler:
    def __init__(self, web_root):
        self._web_root = web_root

    def set_headers(self, response, headers):
        if not isinstance(response.headers, list):
            response.headers = []
        for header in headers:
            response.headers.append(header)

    def send_json(self, data, response, status=None):
        if status is None:
            status = 200
        json_string = json.dumps(data, indent=4)
        response.content = json_string
        self.set_headers(response, [("Content-Type", "application/json")])
        response.status = status

    def send_file(self, blob, file_name, response):
        self.set_headers(response,
                         [("Content-Disposition",
                           "attachment;filename=" + file_name)])
        response.content = blob

    def send_zip(self, data, file_name, response):
        response.headers = [("Content-Type", "application/x-compressed")]
        self.send_file(data, file_name, response)

    def parse_uri(self, request):
        path = request.url_parts.path
        if self._web_root is not None:
            path = path[len(self._web_root):]

        uri_parts = list(filter(None, path.split("/")))
        return uri_parts

    def parse_query_parameters(self, request):
        return dict(parse_qsl(request.url_parts.query))

    def handle_exception(self, message):
        info = sys.exc_info()
        traceback.print_tb(info[2])
        logger.error(f"{message}: {info[0].__name__}: {info[1].args[0]}")

    def create_hal_list(self, items, uris, index, count, total):
        hal_list = {}
        links = {}
        if uris is not None:
            for relation in uris:
                if relation == "self":
                    continue
                uri = uris[relation]
                templated = "{" in uri
                links[relation] = {"href": uri, "templated": templated}

            if "self" in uris:
                self_uri = uris["self"]
                self_uri += f"?index={index}&count={count}"
                links["self"] = {"href": self_uri}

                first_uri = uris["self"]
                first_uri += f"?index={0}&count={count}"
                links["first"] = {"href": first_uri}

                last_uri = uris["self"]
                last_uri += f"?index={total - (total % count)}&count={count}"
                links["last"] = {"href": last_uri}

                if index + count <= total:
                    next_index = index + count
                    next_uri = uris["self"]
                    next_uri += f"?index={next_index}&count={count}"
                    links["next"] = {"href": next_uri}

                if index != 0:
                    previous_index = index - count
                    if previous_index < 0:
                        previous_index = 0
                    previous_uri = uris["self"]
                    previous_uri += f"?index={previous_index}&count={count}"
                    links["previous"] = {"href": previous_uri}

        hal_list["_links"] = links
        hal_list["items"] = items

        return hal_list