File: repo

package info (click to toggle)
python-tuf 6.0.0-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,300 kB
  • sloc: python: 7,738; makefile: 8
file content (142 lines) | stat: -rwxr-xr-x 4,485 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#!/usr/bin/env python3
# Copyright 2021-2022 python-tuf contributors
# SPDX-License-Identifier: MIT OR Apache-2.0

"""Simple repository example application

The application stores metadata and targets in memory, and serves them via http.
Nothing is persisted on disk or loaded from disk. The application simulates a
live repository by adding new target files periodically.
"""

import argparse
import logging
import sys
from datetime import datetime
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from time import time
from typing import Dict, List

from _simplerepo import SimpleRepository

from tuf.api.serialization.json import JSONSerializer

logger = logging.getLogger(__name__)


class ReqHandler(BaseHTTPRequestHandler):
    """HTTP handler for the repository example application

    Serves metadata, targets and a small upload API using a SimpleRepository
    """

    def do_POST(self):
        """Handle POST requests, aka the 'uploader API'"""

        content_len = int(self.headers.get("content-length", 0))
        data = self.rfile.read(content_len)

        if self.path.startswith("/api/delegation/"):
            role = self.path[len("/api/delegation/") :]
            if not self.server.repo.submit_delegation(role, data):
                return self.send_error(400, f"Failed to delegate to {role}")
        elif self.path.startswith("/api/role/"):
            role = self.path[len("/api/role/") :]
            if not self.server.repo.submit_role(role, data):
                return self.send_error(400, f"Failed to submit role {role}")
        else:
            return self.send_error(404)

        self.send_response(200)
        self.end_headers()

    def do_GET(self):
        """Handle GET: metadata and target files"""
        data = None

        if self.path.startswith("/metadata/") and self.path.endswith(".json"):
            data = self.get_metadata(
                self.path[len("/metadata/") : -len(".json")]
            )
        elif self.path.startswith("/targets/"):
            data = self.get_target(self.path[len("/targets/") :])

        if data is None:
            self.send_error(404)
        else:
            self.send_response(200)
            self.send_header("Content-length", len(data))
            self.end_headers()
            self.wfile.write(data)

    def get_metadata(self, ver_and_role: str):
        repo = self.server.repo

        ver_str, sep, role = ver_and_role.rpartition(".")
        if sep == "":
            # 0 will lead to list lookup with -1, meaning latest version
            ver = 0
        else:
            ver = int(ver_str)

        if role not in repo.role_cache or ver > len(repo.role_cache[role]):
            return None

        # return metadata
        return repo.role_cache[role][ver - 1].to_bytes(JSONSerializer())

    def get_target(self, targetpath: str):
        repo = self.server.repo

        # unimplement the dumb hashing scheme
        # TODO: maybe use hashed paths as the target_cache key
        dir, sep, hashname = targetpath.rpartition("/")
        _, _, name = hashname.partition(".")
        target = f"{dir}{sep}{name}"

        if target not in repo.target_cache:
            return None

        # send the target content
        return repo.target_cache[target]


class RepositoryServer(ThreadingHTTPServer):
    def __init__(self, port: int):
        super().__init__(("127.0.0.1", port), ReqHandler)
        self.timeout = 1
        self.repo = SimpleRepository()


def main(argv: List[str]) -> None:
    """Example repository server"""

    parser = argparse.ArgumentParser()
    parser.add_argument("-v", "--verbose", action="count")
    parser.add_argument("-p", "--port", type=int, default=8001)
    args, _ = parser.parse_known_args(argv)

    level = logging.DEBUG if args.verbose else logging.INFO
    logging.basicConfig(level=level)

    server = RepositoryServer(args.port)
    last_change = 0
    counter = 0

    logger.info(
        f"Now serving. Root v1 at http://127.0.0.1:{server.server_port}/metadata/1.root.json"
    )

    while True:
        # Simulate a live repository: Add a new target file every few seconds
        if time() - last_change > 10:
            last_change = int(time())
            counter += 1
            content = str(datetime.fromtimestamp(last_change))
            server.repo.add_target(f"file{str(counter)}.txt", content)

        server.handle_request()


if __name__ == "__main__":
    main(sys.argv)