1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
|
import asyncio
from pytest import mark
from twisted.internet import defer
from twisted.internet.defer import Deferred
from twisted.trial import unittest
from scrapy import Spider, signals, Request
from scrapy.utils.test import get_crawler, get_from_asyncio_queue
from tests.mockserver import MockServer
class SimplePipeline:
def process_item(self, item, spider):
item['pipeline_passed'] = True
return item
class DeferredPipeline:
def cb(self, item):
item['pipeline_passed'] = True
return item
def process_item(self, item, spider):
d = Deferred()
d.addCallback(self.cb)
d.callback(item)
return d
class AsyncDefPipeline:
async def process_item(self, item, spider):
await defer.succeed(42)
item['pipeline_passed'] = True
return item
class AsyncDefAsyncioPipeline:
async def process_item(self, item, spider):
await asyncio.sleep(0.2)
item['pipeline_passed'] = await get_from_asyncio_queue(True)
return item
class ItemSpider(Spider):
name = 'itemspider'
def start_requests(self):
yield Request(self.mockserver.url('/status?n=200'))
def parse(self, response):
return {'field': 42}
class PipelineTestCase(unittest.TestCase):
def setUp(self):
self.mockserver = MockServer()
self.mockserver.__enter__()
def tearDown(self):
self.mockserver.__exit__(None, None, None)
def _on_item_scraped(self, item):
self.assertIsInstance(item, dict)
self.assertTrue(item.get('pipeline_passed'))
self.items.append(item)
def _create_crawler(self, pipeline_class):
settings = {
'ITEM_PIPELINES': {pipeline_class: 1},
}
crawler = get_crawler(ItemSpider, settings)
crawler.signals.connect(self._on_item_scraped, signals.item_scraped)
self.items = []
return crawler
@defer.inlineCallbacks
def test_simple_pipeline(self):
crawler = self._create_crawler(SimplePipeline)
yield crawler.crawl(mockserver=self.mockserver)
self.assertEqual(len(self.items), 1)
@defer.inlineCallbacks
def test_deferred_pipeline(self):
crawler = self._create_crawler(DeferredPipeline)
yield crawler.crawl(mockserver=self.mockserver)
self.assertEqual(len(self.items), 1)
@defer.inlineCallbacks
def test_asyncdef_pipeline(self):
crawler = self._create_crawler(AsyncDefPipeline)
yield crawler.crawl(mockserver=self.mockserver)
self.assertEqual(len(self.items), 1)
@mark.only_asyncio()
@defer.inlineCallbacks
def test_asyncdef_asyncio_pipeline(self):
crawler = self._create_crawler(AsyncDefAsyncioPipeline)
yield crawler.crawl(mockserver=self.mockserver)
self.assertEqual(len(self.items), 1)
|