File: test_requests_file.py

package info (click to toggle)
requests-file 3.0.1-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 104 kB
  • sloc: python: 221; makefile: 3
file content (210 lines) | stat: -rw-r--r-- 8,185 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import unittest
import requests
from requests_file import FileAdapter

import os, stat
import tempfile
import shutil
import platform


class FileRequestTestCase(unittest.TestCase):
    def setUp(self):
        self._session = requests.Session()
        self._session.mount("file://", FileAdapter())

    def _pathToURL(self, path):
        """Convert a filesystem path to a URL path"""
        urldrive, urlpath = os.path.splitdrive(path)

        # Split the path on the os spearator and recombine it with / as the
        # separator. There probably aren't any OS's that allow / as a path
        # component, but just in case, encode any remaining /'s.
        urlsplit = (part.replace("/", "%2F") for part in urlpath.split(os.sep))
        urlpath = "/".join(urlsplit)

        # Encode /'s in the drive for the imaginary case where that can be a thing
        urldrive = urldrive.replace("/", "%2F")

        # Add the leading /. If there is a drive component, this needs to be
        # placed before the drive.
        urldrive = "/" + urldrive

        return urldrive + urlpath

    def test_fetch_regular(self):
        # Fetch this file using requests
        with open(__file__, "rb") as f:
            testdata = f.read()
        response = self._session.get(
            "file://%s" % self._pathToURL(os.path.abspath(__file__))
        )

        self.assertEqual(response.status_code, requests.codes.ok)
        self.assertEqual(response.headers["Content-Length"], str(len(testdata)))
        self.assertEqual(response.content, testdata)
        self.assertEqual(response.url, "file://%s" % self._pathToURL(os.path.abspath(__file__)))
        self.assertIsInstance(response.request, requests.PreparedRequest)

        response.close()

    def test_fetch_missing(self):
        # Fetch a file that (hopefully) doesn't exist, look for a 404
        response = self._session.get("file:///no/such/path")
        self.assertEqual(response.status_code, requests.codes.not_found)
        self.assertTrue(response.text)
        self.assertEqual(response.url, "file:///no/such/path")
        self.assertIsInstance(response.request, requests.PreparedRequest)

        # Ensure the missing file's path is in the response reason
        self.assertIn("/no/such/path", response.reason)

        response.close()

    @unittest.skipIf(
        hasattr(os, "geteuid") and os.geteuid() == 0,
        "Skipping permissions test since running as root",
    )
    def test_fetch_no_access(self):
        # Create a file and remove read permissions, try to get a 403
        # probably doesn't work on windows
        with tempfile.NamedTemporaryFile() as tmp:
            os.chmod(tmp.name, 0)
            response = self._session.get(
                "file://%s" % self._pathToURL(os.path.abspath(tmp.name))
            )

            self.assertEqual(response.status_code, requests.codes.forbidden)
            self.assertTrue(response.text)
            self.assertEqual(response.url, "file://%s" % self._pathToURL(os.path.abspath(tmp.name)))
            self.assertIn(os.path.abspath(tmp.name), response.reason)

            response.close()

    @unittest.skipIf(platform.system() == "Windows", "skipping locale test on windows")
    def test_fetch_missing_localized(self):
        # Make sure translated error messages don't cause any problems
        import locale

        saved_locale = locale.setlocale(locale.LC_MESSAGES, None)
        try:
            locale.setlocale(locale.LC_MESSAGES, "ru_RU.UTF-8")
            response = self._session.get("file:///no/such/path")
            self.assertEqual(response.status_code, requests.codes.not_found)
            self.assertTrue(response.text)
            self.assertEqual(response.url, "file:///no/such/path")
            self.assertIn("/no/such/path", response.reason)
            response.close()
        except locale.Error:
            unittest.SkipTest("ru_RU.UTF-8 locale not available")
        finally:
            locale.setlocale(locale.LC_MESSAGES, saved_locale)

    def test_head(self):
        # Check that HEAD returns the content-length
        testlen = os.stat(__file__).st_size
        response = self._session.head(
            "file://%s" % self._pathToURL(os.path.abspath(__file__))
        )

        self.assertEqual(response.status_code, requests.codes.ok)
        self.assertEqual(response.headers["Content-Length"], str(testlen))
        self.assertEqual(response.url, "file://%s" % self._pathToURL(os.path.abspath(__file__)))

        response.close()

    def test_fetch_post(self):
        # Make sure that non-GET methods are rejected
        self.assertRaises(
            ValueError,
            self._session.post,
            ("file://%s" % self._pathToURL(os.path.abspath(__file__))),
        )

    def test_fetch_nonlocal(self):
        # Make sure that network locations are rejected
        self.assertRaises(
            ValueError,
            self._session.get,
            ("file://example.com%s" % self._pathToURL(os.path.abspath(__file__))),
        )
        self.assertRaises(
            ValueError,
            self._session.get,
            ("file://localhost:8080%s" % self._pathToURL(os.path.abspath(__file__))),
        )

        # localhost is ok, though
        with open(__file__, "rb") as f:
            testdata = f.read()
        response = self._session.get(
            "file://localhost%s" % self._pathToURL(os.path.abspath(__file__))
        )
        self.assertEqual(response.status_code, requests.codes.ok)
        self.assertEqual(response.content, testdata)
        response.close()

    def test_funny_names(self):
        testdata = "yo wassup man\n".encode("ascii")
        tmpdir = tempfile.mkdtemp()

        try:
            with open(os.path.join(tmpdir, "spa ces"), "w+b") as space_file:
                space_file.write(testdata)
                space_file.flush()
                response = self._session.get(
                    "file://%s/spa%%20ces" % self._pathToURL(tmpdir)
                )
                self.assertEqual(response.status_code, requests.codes.ok)
                self.assertEqual(response.content, testdata)
                response.close()

            with open(os.path.join(tmpdir, "per%cent"), "w+b") as percent_file:
                percent_file.write(testdata)
                percent_file.flush()
                response = self._session.get(
                    "file://%s/per%%25cent" % self._pathToURL(tmpdir)
                )
                self.assertEqual(response.status_code, requests.codes.ok)
                self.assertEqual(response.content, testdata)
                response.close()

            # percent-encoded directory separators should be rejected
            with open(os.path.join(tmpdir, "badname"), "w+b") as bad_file:
                response = self._session.get(
                    "file://%s%%%Xbadname" % (self._pathToURL(tmpdir), ord(os.sep))
                )
                self.assertEqual(response.status_code, requests.codes.not_found)
                response.close()

        finally:
            shutil.rmtree(tmpdir)

    def test_close(self):
        # Open a request for this file
        response = self._session.get(
            "file://%s" % self._pathToURL(os.path.abspath(__file__))
        )

        # Try closing it
        response.close()

    def test_missing_close(self):
        # Make sure non-200 responses can be closed
        response = self._session.get("file:///no/such/path")
        response.close()

    @unittest.skipIf(platform.system() != "Windows", "skipping windows URL test")
    def test_windows_legacy(self):
        """Test |-encoded drive characters on Windows"""
        with open(__file__, "rb") as f:
            testdata = f.read()

        drive, path = os.path.splitdrive(os.path.abspath(__file__))
        response = self._session.get(
            "file:///%s|%s" % (drive[:-1], path.replace(os.sep, "/"))
        )
        self.assertEqual(response.status_code, requests.codes.ok)
        self.assertEqual(response.headers["Content-Length"], len(testdata))
        self.assertEqual(response.content, testdata)
        response.close()