1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
|
#!/usr/bin/env python3
# Copyright 2021-2022 python-tuf contributors
# SPDX-License-Identifier: MIT OR Apache-2.0
"""Simple repository example application
The application stores metadata and targets in memory, and serves them via http.
Nothing is persisted on disk or loaded from disk. The application simulates a
live repository by adding new target files periodically.
"""
import argparse
import logging
import sys
from datetime import datetime
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from time import time
from typing import Dict, List
from _simplerepo import SimpleRepository
from tuf.api.serialization.json import JSONSerializer
logger = logging.getLogger(__name__)
class ReqHandler(BaseHTTPRequestHandler):
"""HTTP handler for the repository example application
Serves metadata, targets and a small upload API using a SimpleRepository
"""
def do_POST(self):
"""Handle POST requests, aka the 'uploader API'"""
content_len = int(self.headers.get("content-length", 0))
data = self.rfile.read(content_len)
if self.path.startswith("/api/delegation/"):
role = self.path[len("/api/delegation/") :]
if not self.server.repo.submit_delegation(role, data):
return self.send_error(400, f"Failed to delegate to {role}")
elif self.path.startswith("/api/role/"):
role = self.path[len("/api/role/") :]
if not self.server.repo.submit_role(role, data):
return self.send_error(400, f"Failed to submit role {role}")
else:
return self.send_error(404)
self.send_response(200)
self.end_headers()
def do_GET(self):
"""Handle GET: metadata and target files"""
data = None
if self.path.startswith("/metadata/") and self.path.endswith(".json"):
data = self.get_metadata(
self.path[len("/metadata/") : -len(".json")]
)
elif self.path.startswith("/targets/"):
data = self.get_target(self.path[len("/targets/") :])
if data is None:
self.send_error(404)
else:
self.send_response(200)
self.send_header("Content-length", len(data))
self.end_headers()
self.wfile.write(data)
def get_metadata(self, ver_and_role: str):
repo = self.server.repo
ver_str, sep, role = ver_and_role.rpartition(".")
if sep == "":
# 0 will lead to list lookup with -1, meaning latest version
ver = 0
else:
ver = int(ver_str)
if role not in repo.role_cache or ver > len(repo.role_cache[role]):
return None
# return metadata
return repo.role_cache[role][ver - 1].to_bytes(JSONSerializer())
def get_target(self, targetpath: str):
repo = self.server.repo
# unimplement the dumb hashing scheme
# TODO: maybe use hashed paths as the target_cache key
dir, sep, hashname = targetpath.rpartition("/")
_, _, name = hashname.partition(".")
target = f"{dir}{sep}{name}"
if target not in repo.target_cache:
return None
# send the target content
return repo.target_cache[target]
class RepositoryServer(ThreadingHTTPServer):
def __init__(self, port: int):
super().__init__(("127.0.0.1", port), ReqHandler)
self.timeout = 1
self.repo = SimpleRepository()
def main(argv: List[str]) -> None:
"""Example repository server"""
parser = argparse.ArgumentParser()
parser.add_argument("-v", "--verbose", action="count")
parser.add_argument("-p", "--port", type=int, default=8001)
args, _ = parser.parse_known_args(argv)
level = logging.DEBUG if args.verbose else logging.INFO
logging.basicConfig(level=level)
server = RepositoryServer(args.port)
last_change = 0
counter = 0
logger.info(
f"Now serving. Root v1 at http://127.0.0.1:{server.server_port}/metadata/1.root.json"
)
while True:
# Simulate a live repository: Add a new target file every few seconds
if time() - last_change > 10:
last_change = int(time())
counter += 1
content = str(datetime.fromtimestamp(last_change))
server.repo.add_target(f"file{str(counter)}.txt", content)
server.handle_request()
if __name__ == "__main__":
main(sys.argv)
|