1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
|
#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vi:ts=4:et
from . import localhost
import pycurl
import pytest
import unittest
import select
import sys
import flaky
from . import appmanager
from . import util
setup_module_1, teardown_module_1 = appmanager.setup(('app', 8380))
setup_module_2, teardown_module_2 = appmanager.setup(('app', 8381))
setup_module_3, teardown_module_3 = appmanager.setup(('app', 8382))
def setup_module(mod):
setup_module_1(mod)
setup_module_2(mod)
setup_module_3(mod)
def teardown_module(mod):
teardown_module_3(mod)
teardown_module_2(mod)
teardown_module_1(mod)
@flaky.flaky(max_runs=3)
class MultiSocketSelectTest(unittest.TestCase):
def setUp(self):
self.m = pycurl.CurlMulti()
self.m.handles = []
def tearDown(self):
for c in self.m.handles:
self.m.remove_handle(c)
c.close()
self.m.close()
@pytest.mark.skipif(sys.platform == 'win32', reason='https://github.com/pycurl/pycurl/issues/819')
def test_multi_socket_select(self):
sockets = set()
timeout = 0
urls = [
# we need libcurl to actually wait on the handles,
# and initiate polling.
# thus use urls that sleep for a bit.
'http://%s:8380/short_wait' % localhost,
'http://%s:8381/short_wait' % localhost,
'http://%s:8382/short_wait' % localhost,
]
socket_events = []
# socket callback
def socket(event, socket, multi, data):
if event == pycurl.POLL_REMOVE:
#print("Remove Socket %d"%socket)
sockets.remove(socket)
else:
if socket not in sockets:
#print("Add socket %d"%socket)
sockets.add(socket)
socket_events.append((event, multi))
# init
m = self.m
m.setopt(pycurl.M_SOCKETFUNCTION, socket)
for url in urls:
c = util.DefaultCurl()
# save info in standard Python attributes
c.url = url
c.body = util.BytesIO()
c.http_code = -1
m.handles.append(c)
# pycurl API calls
c.setopt(c.URL, c.url)
c.setopt(c.WRITEFUNCTION, c.body.write)
m.add_handle(c)
# get data
#num_handles = len(m.handles)
while (pycurl.E_CALL_MULTI_PERFORM==m.socket_all()[0]):
pass
timeout = m.timeout()
if timeout == -1:
timeout = 1000
# timeout might be -1, indicating that all work is done
# XXX make sure there is always work to be done here?
while timeout >= 0:
(rr, wr, er) = select.select(sockets,sockets,sockets,timeout/1000.0)
socketSet = set(rr+wr+er)
if socketSet:
for s in socketSet:
while True:
(ret,running) = m.socket_action(s,0)
if ret!=pycurl.E_CALL_MULTI_PERFORM:
break
else:
(ret,running) = m.socket_action(pycurl.SOCKET_TIMEOUT,0)
if running==0:
break
for c in m.handles:
# save info in standard Python attributes
c.http_code = c.getinfo(c.HTTP_CODE)
# at least in and remove events per socket
assert len(socket_events) >= 6, 'Less than 6 socket events: %s' % repr(socket_events)
# print result
for c in m.handles:
self.assertEqual('success', c.body.getvalue().decode())
self.assertEqual(200, c.http_code)
# multi, not curl handle
self.check(pycurl.POLL_IN, m, socket_events)
self.check(pycurl.POLL_REMOVE, m, socket_events)
def check(self, event, multi, socket_events):
for event_, multi_ in socket_events:
if event == event_ and multi == multi_:
return
assert False, '%d %s not found in socket events' % (event, multi)
|