| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 
 | import unittest
from test import support
from test.support import os_helper
from test.support import socket_helper
from test.support.testcase import ExtraAssertions
import contextlib
import socket
import urllib.parse
import urllib.request
import os
import email.message
import time
support.requires('network')
class URLTimeoutTest(unittest.TestCase):
    # XXX this test doesn't seem to test anything useful.
    def setUp(self):
        socket.setdefaulttimeout(support.INTERNET_TIMEOUT)
    def tearDown(self):
        socket.setdefaulttimeout(None)
    def testURLread(self):
        # clear _opener global variable
        self.addCleanup(urllib.request.urlcleanup)
        domain = urllib.parse.urlparse(support.TEST_HTTP_URL).netloc
        with socket_helper.transient_internet(domain):
            f = urllib.request.urlopen(support.TEST_HTTP_URL)
            f.read()
class urlopenNetworkTests(unittest.TestCase, ExtraAssertions):
    """Tests urllib.request.urlopen using the network.
    These tests are not exhaustive.  Assuming that testing using files does a
    good job overall of some of the basic interface features.  There are no
    tests exercising the optional 'data' and 'proxies' arguments.  No tests
    for transparent redirection have been written.
    setUp is not used for always constructing a connection to
    http://www.pythontest.net/ since there a few tests that don't use that address
    and making a connection is expensive enough to warrant minimizing unneeded
    connections.
    """
    url = 'http://www.pythontest.net/'
    def setUp(self):
        # clear _opener global variable
        self.addCleanup(urllib.request.urlcleanup)
    @contextlib.contextmanager
    def urlopen(self, *args, **kwargs):
        resource = args[0]
        with socket_helper.transient_internet(resource):
            r = urllib.request.urlopen(*args, **kwargs)
            try:
                yield r
            finally:
                r.close()
    def test_basic(self):
        # Simple test expected to pass.
        with self.urlopen(self.url) as open_url:
            for attr in ("read", "readline", "readlines", "fileno", "close",
                         "info", "geturl"):
                self.assertHasAttr(open_url, attr)
            self.assertTrue(open_url.read(), "calling 'read' failed")
    def test_readlines(self):
        # Test both readline and readlines.
        with self.urlopen(self.url) as open_url:
            self.assertIsInstance(open_url.readline(), bytes,
                                  "readline did not return a string")
            self.assertIsInstance(open_url.readlines(), list,
                                  "readlines did not return a list")
    def test_info(self):
        # Test 'info'.
        with self.urlopen(self.url) as open_url:
            info_obj = open_url.info()
            self.assertIsInstance(info_obj, email.message.Message,
                                  "object returned by 'info' is not an "
                                  "instance of email.message.Message")
            self.assertEqual(info_obj.get_content_subtype(), "html")
    def test_geturl(self):
        # Make sure same URL as opened is returned by geturl.
        with self.urlopen(self.url) as open_url:
            gotten_url = open_url.geturl()
            self.assertEqual(gotten_url, self.url)
    def test_getcode(self):
        # test getcode() with the fancy opener to get 404 error codes
        URL = self.url + "XXXinvalidXXX"
        with socket_helper.transient_internet(URL):
            with self.assertWarns(DeprecationWarning):
                open_url = urllib.request.FancyURLopener().open(URL)
            try:
                code = open_url.getcode()
            finally:
                open_url.close()
            self.assertEqual(code, 404)
    @support.requires_resource('walltime')
    def test_bad_address(self):
        # Make sure proper exception is raised when connecting to a bogus
        # address.
        # Given that both VeriSign and various ISPs have in
        # the past or are presently hijacking various invalid
        # domain name requests in an attempt to boost traffic
        # to their own sites, finding a domain name to use
        # for this test is difficult.  RFC2606 leads one to
        # believe that '.invalid' should work, but experience
        # seemed to indicate otherwise.  Single character
        # TLDs are likely to remain invalid, so this seems to
        # be the best choice. The trailing '.' prevents a
        # related problem: The normal DNS resolver appends
        # the domain names from the search path if there is
        # no '.' the end and, and if one of those domains
        # implements a '*' rule a result is returned.
        # However, none of this will prevent the test from
        # failing if the ISP hijacks all invalid domain
        # requests.  The real solution would be to be able to
        # parameterize the framework with a mock resolver.
        bogus_domain = "sadflkjsasf.i.nvali.d."
        try:
            socket.gethostbyname(bogus_domain)
        except OSError:
            # socket.gaierror is too narrow, since getaddrinfo() may also
            # fail with EAI_SYSTEM and ETIMEDOUT (seen on Ubuntu 13.04),
            # i.e. Python's TimeoutError.
            pass
        else:
            # This happens with some overzealous DNS providers such as OpenDNS
            self.skipTest("%r should not resolve for test to work" % bogus_domain)
        failure_explanation = ('opening an invalid URL did not raise OSError; '
                               'can be caused by a broken DNS server '
                               '(e.g. returns 404 or hijacks page)')
        with self.assertRaises(OSError, msg=failure_explanation):
            urllib.request.urlopen("http://{}/".format(bogus_domain))
class urlretrieveNetworkTests(unittest.TestCase):
    """Tests urllib.request.urlretrieve using the network."""
    def setUp(self):
        # remove temporary files created by urlretrieve()
        self.addCleanup(urllib.request.urlcleanup)
    @contextlib.contextmanager
    def urlretrieve(self, *args, **kwargs):
        resource = args[0]
        with socket_helper.transient_internet(resource):
            file_location, info = urllib.request.urlretrieve(*args, **kwargs)
            try:
                yield file_location, info
            finally:
                os_helper.unlink(file_location)
    def test_basic(self):
        # Test basic functionality.
        with self.urlretrieve(self.logo) as (file_location, info):
            self.assertTrue(os.path.exists(file_location), "file location returned by"
                            " urlretrieve is not a valid path")
            with open(file_location, 'rb') as f:
                self.assertTrue(f.read(), "reading from the file location returned"
                                " by urlretrieve failed")
    def test_specified_path(self):
        # Make sure that specifying the location of the file to write to works.
        with self.urlretrieve(self.logo,
                              os_helper.TESTFN) as (file_location, info):
            self.assertEqual(file_location, os_helper.TESTFN)
            self.assertTrue(os.path.exists(file_location))
            with open(file_location, 'rb') as f:
                self.assertTrue(f.read(), "reading from temporary file failed")
    def test_header(self):
        # Make sure header returned as 2nd value from urlretrieve is good.
        with self.urlretrieve(self.logo) as (file_location, info):
            self.assertIsInstance(info, email.message.Message,
                                  "info is not an instance of email.message.Message")
    logo = "http://www.pythontest.net/"
    @support.requires_resource('walltime')
    def test_data_header(self):
        with self.urlretrieve(self.logo) as (file_location, fileheaders):
            datevalue = fileheaders.get('Date')
            dateformat = '%a, %d %b %Y %H:%M:%S GMT'
            try:
                time.strptime(datevalue, dateformat)
            except ValueError:
                self.fail('Date value not in %r format' % dateformat)
    def test_reporthook(self):
        records = []
        def recording_reporthook(blocks, block_size, total_size):
            records.append((blocks, block_size, total_size))
        with self.urlretrieve(self.logo, reporthook=recording_reporthook) as (
                file_location, fileheaders):
            expected_size = int(fileheaders['Content-Length'])
        records_repr = repr(records)  # For use in error messages.
        self.assertGreater(len(records), 1, msg="There should always be two "
                           "calls; the first one before the transfer starts.")
        self.assertEqual(records[0][0], 0)
        self.assertGreater(records[0][1], 0,
                           msg="block size can't be 0 in %s" % records_repr)
        self.assertEqual(records[0][2], expected_size)
        self.assertEqual(records[-1][2], expected_size)
        block_sizes = {block_size for _, block_size, _ in records}
        self.assertEqual({records[0][1]}, block_sizes,
                         msg="block sizes in %s must be equal" % records_repr)
        self.assertGreaterEqual(records[-1][0]*records[0][1], expected_size,
                                msg="number of blocks * block size must be"
                                " >= total size in %s" % records_repr)
if __name__ == "__main__":
    unittest.main()
 |