1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
|
"""
Scrapy engine tests
"""
import sys, os, re, urlparse, unittest
from twisted.internet import reactor
from twisted.web import server, resource, static, util
from scrapy.core import signals
from scrapy.core.manager import scrapymanager
from scrapy.xlib.pydispatch import dispatcher
from scrapy.tests import tests_datadir
from scrapy.spider import BaseSpider
from scrapy.item import Item, Field
from scrapy.contrib.linkextractors.sgml import SgmlLinkExtractor
from scrapy.http import Request
class TestItem(Item):
name = Field()
url = Field()
price = Field()
class TestSpider(BaseSpider):
domain_name = "scrapytest.org"
extra_domain_names = ["localhost"]
start_urls = ['http://localhost']
itemurl_re = re.compile("item\d+.html")
name_re = re.compile("<h1>(.*?)</h1>", re.M)
price_re = re.compile(">Price: \$(.*?)<", re.M)
def parse(self, response):
xlink = SgmlLinkExtractor()
itemre = re.compile(self.itemurl_re)
for link in xlink.extract_links(response):
if itemre.search(link.url):
yield Request(url=link.url, callback=self.parse_item)
def parse_item(self, response):
item = TestItem()
m = self.name_re.search(response.body)
if m:
item['name'] = m.group(1)
item['url'] = response.url
m = self.price_re.search(response.body)
if m:
item['price'] = m.group(1)
return item
#class TestResource(resource.Resource):
# isLeaf = True
#
# def render_GET(self, request):
# return "hello world!"
def start_test_site():
root_dir = os.path.join(tests_datadir, "test_site")
r = static.File(root_dir)
# r.putChild("test", TestResource())
r.putChild("redirect", util.Redirect("/redirected"))
r.putChild("redirected", static.Data("Redirected here", "text/plain"))
port = reactor.listenTCP(0, server.Site(r), interface="127.0.0.1")
return port
class CrawlingSession(object):
def __init__(self):
self.domain = 'scrapytest.org'
self.spider = None
self.respplug = []
self.reqplug = []
self.itemresp = []
self.signals_catched = {}
self.wasrun = False
def run(self):
self.port = start_test_site()
self.portno = self.port.getHost().port
self.spider = TestSpider()
if self.spider:
self.spider.start_urls = [
self.geturl("/"),
self.geturl("/redirect"),
]
dispatcher.connect(self.record_signal, signals.engine_started)
dispatcher.connect(self.record_signal, signals.engine_stopped)
dispatcher.connect(self.record_signal, signals.spider_opened)
dispatcher.connect(self.record_signal, signals.spider_idle)
dispatcher.connect(self.record_signal, signals.spider_closed)
dispatcher.connect(self.item_scraped, signals.item_scraped)
dispatcher.connect(self.request_received, signals.request_received)
dispatcher.connect(self.response_downloaded, signals.response_downloaded)
scrapymanager.configure()
scrapymanager.runonce(self.spider)
self.port.stopListening()
self.wasrun = True
def geturl(self, path):
return "http://localhost:%s%s" % (self.portno, path)
def getpath(self, url):
u = urlparse.urlparse(url)
return u.path
def item_scraped(self, item, spider, response):
self.itemresp.append((item, response))
def request_received(self, request, spider):
self.reqplug.append((request, spider))
def response_downloaded(self, response, spider):
self.respplug.append((response, spider))
def record_signal(self, *args, **kwargs):
"""Record a signal and its parameters"""
signalargs = kwargs.copy()
sig = signalargs.pop('signal')
signalargs.pop('sender', None)
self.signals_catched[sig] = signalargs
session = CrawlingSession()
class EngineTest(unittest.TestCase):
def setUp(self):
if not session.wasrun:
session.run()
def test_spider_locator(self):
"""
Check the spider is loaded and located properly via the SpiderLocator
"""
assert session.spider is not None
self.assertEqual(session.spider.domain_name, session.domain)
def test_visited_urls(self):
"""
Make sure certain URls were actually visited
"""
# expected urls that should be visited
must_be_visited = ["/", "/redirect", "/redirected",
"/item1.html", "/item2.html", "/item999.html"]
urls_visited = set([rp[0].url for rp in session.respplug])
urls_expected = set([session.geturl(p) for p in must_be_visited])
assert urls_expected <= urls_visited, "URLs not visited: %s" % list(urls_expected - urls_visited)
def test_requests_received(self):
"""
Check requests received
"""
# 3 requests should be received from the spider. start_urls and redirects don't count
self.assertEqual(3, len(session.reqplug))
paths_expected = ['/item999.html', '/item2.html', '/item1.html']
urls_requested = set([rq[0].url for rq in session.reqplug])
urls_expected = set([session.geturl(p) for p in paths_expected])
assert urls_expected <= urls_requested
def test_responses_downloaded(self):
"""
Check responses downloaded
"""
# response tests
self.assertEqual(6, len(session.respplug))
for response, spider in session.respplug:
if session.getpath(response.url) == '/item999.html':
self.assertEqual(404, response.status)
if session.getpath(response.url) == '/redirect':
self.assertEqual(302, response.status)
def test_item_data(self):
"""
Check item data
"""
# item tests
self.assertEqual(2, len(session.itemresp))
for item, response in session.itemresp:
self.assertEqual(item['url'], response.url)
if 'item1.html' in item['url']:
self.assertEqual('Item 1 name', item['name'])
self.assertEqual('100', item['price'])
if 'item2.html' in item['url']:
self.assertEqual('Item 2 name', item['name'])
self.assertEqual('200', item['price'])
def test_signals(self):
"""
Check signals were sent properly
"""
from scrapy.core import signals
assert signals.engine_started in session.signals_catched
assert signals.engine_stopped in session.signals_catched
assert signals.spider_opened in session.signals_catched
assert signals.spider_idle in session.signals_catched
assert signals.spider_closed in session.signals_catched
self.assertEqual({'spider': session.spider},
session.signals_catched[signals.spider_opened])
self.assertEqual({'spider': session.spider},
session.signals_catched[signals.spider_idle])
self.assertEqual({'spider': session.spider, 'reason': 'finished'},
session.signals_catched[signals.spider_closed])
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == 'runserver':
port = start_test_site()
print "Test server running at http://localhost:%d/ - hit Ctrl-C to finish." % port.getHost().port
reactor.run()
else:
unittest.main()
|