File: test__socket_dns.py

package info (click to toggle)
python-gevent 0.13.6-1%2Bnmu3
  • links: PTS
  • area: main
  • in suites: wheezy
  • size: 2,324 kB
  • sloc: python: 13,296; makefile: 95; ansic: 37
file content (204 lines) | stat: -rw-r--r-- 5,961 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
#!/usr/bin/python
# -*- coding: utf-8 -*-
import sys
import re
import greentest
import socket as real_socket
from gevent.socket import *


ACCEPTED_GAIERROR_MISMATCH = {
    "gaierror(-5, 'No address associated with hostname')":
    ("DNSError(3, 'name does not exist')", "DNSError(66, 'unknown')"),
}

assert gaierror is real_socket.gaierror
assert error is real_socket.error

VERBOSE = '-v' in sys.argv
IGNORE_ERRORS = '--ignore' in sys.argv
DEBUG = '-d' in sys.argv
for arg in ['--ignore', '-d']:
    try:
        sys.argv.remove(arg)
    except ValueError:
        pass


class TestCase(greentest.TestCase):
    __timeout__ = 30

    if IGNORE_ERRORS:
        def assertEqual(self, a, b):
            if a != b:
                print 'ERROR: %r != %r' % (a, b)

    def _test(self, hostname, check_ip=None):
        self._test_gethostbyname(hostname, check_ip=check_ip)
        self._test_getaddrinfo(hostname)

    def _test_gethostbyname(self, hostname, check_ip=None):
        try:
            if VERBOSE:
                print 'real_socket.gethostbyname(%r)' % (hostname, )
            real_ip = real_socket.gethostbyname(hostname)
        except Exception, ex:
            if DEBUG:
                raise
            real_ip = ex
        try:
            if VERBOSE:
                print 'gevent.socket.gethostbyname(%r)' % (hostname, )
            ip = gethostbyname(hostname)
        except Exception, ex:
            if DEBUG:
                raise
            ip = ex
        if self.equal(real_ip, ip):
            return ip
        self.assertEqual(real_ip, ip)
        if check_ip is not None:
            self.assertEqual(check_ip, ip)
        return ip

    PORTS = [80, 0, 53, 'http']
    getaddrinfo_args = [(),
                        (AF_UNSPEC, ),
                        (AF_UNSPEC, SOCK_STREAM, 0, 0),
                        (AF_INET, SOCK_STREAM, ),
                        (AF_UNSPEC, SOCK_DGRAM, ),
                        (AF_INET, SOCK_RAW, ),
                        (AF_UNSPEC, SOCK_STREAM, 6),
                        (AF_INET, SOCK_DGRAM, 17)]

    def _test_getaddrinfo(self, hostname):
        for port in self.PORTS:
            for args in self.getaddrinfo_args:
                if VERBOSE:
                    print
                    print 'real_socket.getaddrinfo(%r, %r, %r)' % (hostname, port, args)
                try:
                    real_ip = real_socket.getaddrinfo(hostname, port, *args)
                    if VERBOSE:
                        print '    returned %r' % (real_ip, )
                except Exception, ex:
                    if DEBUG:
                        raise
                    real_ip = ex
                if VERBOSE:
                    print 'gevent.socket.getaddrinfo(%r, %r, %r)' % (hostname, port, args)
                try:
                    ip = getaddrinfo(hostname, port, *args)
                    if VERBOSE:
                        print '    returned %r' % (ip, )
                except Exception, ex:
                    if DEBUG:
                        raise
                    ip = ex
                if not self.equal(real_ip, ip):
                    args_str = ', '.join(repr(x) for x in (hostname, port) + args)
                    print 'WARNING: getaddrinfo(%s):\n    %r    (stdlib)\n != %r    (gevent)' % (args_str, real_ip, ip)
        # QQQ switch_expected becomes useless when a bunch of unrelated tests are merged
        #     into a single one like above. Generate individual test cases instead?

    def equal(self, a, b):
        if a == b:
            return True
        if isinstance(a, Exception) and isinstance(b, Exception):
            if repr(a) == repr(b):
                return True
            if repr(b) in ACCEPTED_GAIERROR_MISMATCH.get(repr(a), (repr(b), )):
                return True

    def checkEqual(self, a, b):
        if a == b:
            return
        print 'WARNING: %s.%s:\n    %r\n != %r' % (self.__class__.__name__, self.testname, a, b)


def get_test(ip, host):

    def test(self):
        self._test(host, check_ip=ip)
    test.__name__ = 'test_' + re.sub('[^\w]', '_', host + '__' + ip)

    return test


class TestLocal(TestCase):

    switch_expected = False

    def test_hostname(self):
        hostname = real_socket.gethostname()
        self._test(hostname)

    def test_localhost(self):
        self._test('localhost')

    def test_127_0_0_1(self):
        self._test('127.0.0.1')

    def test_1_2_3_4(self):
        self._test('1.2.3.4')

    def SKIP_test_notexistent(self):
        # not really interesting because the original gethostbyname() is called for everything without dots
        # disabled because it takes too much time on windows for some reason
        self._test('notexistent')

    def test_None(self):
        self._test(None)

    def test_25(self):
        self._test(25)

    try:
        etc_hosts = open('/etc/hosts').read()
    except IOError:
        etc_hosts = ''

    for ip, host in re.findall(r'^\s*(\d+\.\d+\.\d+\.\d+)\s+([^\s]+)', etc_hosts, re.M)[:10]:
        func = get_test(ip, host)
        print 'Adding %s' % func.__name__
        locals()[func.__name__] = func
        del func


class TestRemote(TestCase):

    switch_expected = True

    def test_www_python_org(self):
        self._test('www.python.org')

    def test_notexistent_tld(self):
        self._test('myhost.mytld')

    def test_notexistent_dot_com(self):
        self._test('sdfsdfgu5e66098032453245wfdggd.com')


class TestInternational(TestCase):

    def test(self):
        self._test(u'президент.рф')


class TestIPv6(TestCase):

    def test(self):
        #self.PORTS = ['http']
        #self.getaddrinfo_args = [(), (AF_UNSPEC, ), (AF_INET, ), (AF_INET6, )]
        self._test('aaaa.test-ipv6.com')


class TestBadPort(TestCase):

    def test(self):
        self.PORTS = ['xxxxxx']
        self._test('www.google.com')


if __name__ == '__main__':
    greentest.main()