File: request.py

package info (click to toggle)
python-tuspy 1.0.3-0.1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 212 kB
  • sloc: python: 884; makefile: 3
file content (122 lines) | stat: -rw-r--r-- 3,945 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from typing import Optional
import base64
import asyncio
from functools import wraps

import requests
import aiohttp

from tusclient.exceptions import TusUploadFailed, TusCommunicationError


# Catches requests exceptions and throws custom tuspy errors.
def catch_requests_error(func):
    """Deocrator to catch requests exceptions"""

    @wraps(func)
    def _wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except requests.exceptions.RequestException as error:
            raise TusCommunicationError(error)

    return _wrapper


class BaseTusRequest:
    """
    Http Request Abstraction.

    Sets up tus custom http request on instantiation.

    requires argument 'uploader' an instance of tusclient.uploader.Uploader
    on instantiation.

    :Attributes:
        - response_headers (dict)
        - file (file):
            The file that is being uploaded.
    """

    def __init__(self, uploader):
        self._url = uploader.url
        self.response_headers = {}
        self.status_code = None
        self.response_content = None
        self.verify_tls_cert = bool(uploader.verify_tls_cert)
        self.file = uploader.get_file_stream()
        self.file.seek(uploader.offset)

        self._request_headers = {
            "upload-offset": str(uploader.offset),
            "Content-Type": "application/offset+octet-stream",
        }
        self._request_headers.update(uploader.get_headers())
        self._content_length = uploader.get_request_length()
        self._upload_checksum = uploader.upload_checksum
        self._checksum_algorithm = uploader.checksum_algorithm
        self._checksum_algorithm_name = uploader.checksum_algorithm_name

    def add_checksum(self, chunk: bytes):
        if self._upload_checksum:
            self._request_headers["upload-checksum"] = " ".join(
                (
                    self._checksum_algorithm_name,
                    base64.b64encode(self._checksum_algorithm(chunk).digest()).decode(
                        "ascii"
                    ),
                )
            )


class TusRequest(BaseTusRequest):
    """Class to handle async Tus upload requests"""

    def perform(self):
        """
        Perform actual request.
        """
        try:
            chunk = self.file.read(self._content_length)
            self.add_checksum(chunk)
            resp = requests.patch(
                self._url,
                data=chunk,
                headers=self._request_headers,
                verify=self.verify_tls_cert,
            )
            self.status_code = resp.status_code
            self.response_content = resp.content
            self.response_headers = {k.lower(): v for k, v in resp.headers.items()}
        except requests.exceptions.RequestException as error:
            raise TusUploadFailed(error)


class AsyncTusRequest(BaseTusRequest):
    """Class to handle async Tus upload requests"""

    def __init__(
        self, *args, io_loop: Optional[asyncio.AbstractEventLoop] = None, **kwargs
    ):
        self.io_loop = io_loop
        super().__init__(*args, **kwargs)

    async def perform(self):
        """
        Perform actual request.
        """
        chunk = self.file.read(self._content_length)
        self.add_checksum(chunk)
        try:
            async with aiohttp.ClientSession(loop=self.io_loop) as session:
                ssl = None if self.verify_tls_cert else False
                async with session.patch(
                    self._url, data=chunk, headers=self._request_headers, ssl=ssl
                ) as resp:
                    self.status_code = resp.status
                    self.response_headers = {
                        k.lower(): v for k, v in resp.headers.items()
                    }
                    self.response_content = await resp.content.read()
        except aiohttp.ClientError as error:
            raise TusUploadFailed(error)