1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
|
import asyncio
import pytest
from twisted.internet import defer
from twisted.internet.defer import Deferred
from twisted.trial import unittest
from scrapy import Request, Spider, signals
from scrapy.utils.defer import deferred_to_future, maybe_deferred_to_future
from scrapy.utils.test import get_crawler, get_from_asyncio_queue
from tests.mockserver import MockServer
class SimplePipeline:
def process_item(self, item, spider):
item["pipeline_passed"] = True
return item
class DeferredPipeline:
def cb(self, item):
item["pipeline_passed"] = True
return item
def process_item(self, item, spider):
d = Deferred()
d.addCallback(self.cb)
d.callback(item)
return d
class AsyncDefPipeline:
async def process_item(self, item, spider):
d = Deferred()
from twisted.internet import reactor
reactor.callLater(0, d.callback, None)
await maybe_deferred_to_future(d)
item["pipeline_passed"] = True
return item
class AsyncDefAsyncioPipeline:
async def process_item(self, item, spider):
d = Deferred()
from twisted.internet import reactor
reactor.callLater(0, d.callback, None)
await deferred_to_future(d)
await asyncio.sleep(0.2)
item["pipeline_passed"] = await get_from_asyncio_queue(True)
return item
class AsyncDefNotAsyncioPipeline:
async def process_item(self, item, spider):
d1 = Deferred()
from twisted.internet import reactor
reactor.callLater(0, d1.callback, None)
await d1
d2 = Deferred()
reactor.callLater(0, d2.callback, None)
await maybe_deferred_to_future(d2)
item["pipeline_passed"] = True
return item
class ItemSpider(Spider):
name = "itemspider"
async def start(self):
yield Request(self.mockserver.url("/status?n=200"))
def parse(self, response):
return {"field": 42}
class TestPipeline(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.mockserver = MockServer()
cls.mockserver.__enter__()
@classmethod
def tearDownClass(cls):
cls.mockserver.__exit__(None, None, None)
def _on_item_scraped(self, item):
assert isinstance(item, dict)
assert item.get("pipeline_passed")
self.items.append(item)
def _create_crawler(self, pipeline_class):
settings = {
"ITEM_PIPELINES": {pipeline_class: 1},
}
crawler = get_crawler(ItemSpider, settings)
crawler.signals.connect(self._on_item_scraped, signals.item_scraped)
self.items = []
return crawler
@defer.inlineCallbacks
def test_simple_pipeline(self):
crawler = self._create_crawler(SimplePipeline)
yield crawler.crawl(mockserver=self.mockserver)
assert len(self.items) == 1
@defer.inlineCallbacks
def test_deferred_pipeline(self):
crawler = self._create_crawler(DeferredPipeline)
yield crawler.crawl(mockserver=self.mockserver)
assert len(self.items) == 1
@defer.inlineCallbacks
def test_asyncdef_pipeline(self):
crawler = self._create_crawler(AsyncDefPipeline)
yield crawler.crawl(mockserver=self.mockserver)
assert len(self.items) == 1
@pytest.mark.only_asyncio
@defer.inlineCallbacks
def test_asyncdef_asyncio_pipeline(self):
crawler = self._create_crawler(AsyncDefAsyncioPipeline)
yield crawler.crawl(mockserver=self.mockserver)
assert len(self.items) == 1
@pytest.mark.only_not_asyncio
@defer.inlineCallbacks
def test_asyncdef_not_asyncio_pipeline(self):
crawler = self._create_crawler(AsyncDefNotAsyncioPipeline)
yield crawler.crawl(mockserver=self.mockserver)
assert len(self.items) == 1
|