File: test_pipelines.py

package info (click to toggle)
python-scrapy 2.13.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 5,664 kB
  • sloc: python: 52,028; xml: 199; makefile: 25; sh: 7
file content (133 lines) | stat: -rw-r--r-- 3,866 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import asyncio

import pytest
from twisted.internet import defer
from twisted.internet.defer import Deferred
from twisted.trial import unittest

from scrapy import Request, Spider, signals
from scrapy.utils.defer import deferred_to_future, maybe_deferred_to_future
from scrapy.utils.test import get_crawler, get_from_asyncio_queue
from tests.mockserver import MockServer


class SimplePipeline:
    def process_item(self, item, spider):
        item["pipeline_passed"] = True
        return item


class DeferredPipeline:
    def cb(self, item):
        item["pipeline_passed"] = True
        return item

    def process_item(self, item, spider):
        d = Deferred()
        d.addCallback(self.cb)
        d.callback(item)
        return d


class AsyncDefPipeline:
    async def process_item(self, item, spider):
        d = Deferred()
        from twisted.internet import reactor

        reactor.callLater(0, d.callback, None)
        await maybe_deferred_to_future(d)
        item["pipeline_passed"] = True
        return item


class AsyncDefAsyncioPipeline:
    async def process_item(self, item, spider):
        d = Deferred()
        from twisted.internet import reactor

        reactor.callLater(0, d.callback, None)
        await deferred_to_future(d)
        await asyncio.sleep(0.2)
        item["pipeline_passed"] = await get_from_asyncio_queue(True)
        return item


class AsyncDefNotAsyncioPipeline:
    async def process_item(self, item, spider):
        d1 = Deferred()
        from twisted.internet import reactor

        reactor.callLater(0, d1.callback, None)
        await d1
        d2 = Deferred()
        reactor.callLater(0, d2.callback, None)
        await maybe_deferred_to_future(d2)
        item["pipeline_passed"] = True
        return item


class ItemSpider(Spider):
    name = "itemspider"

    async def start(self):
        yield Request(self.mockserver.url("/status?n=200"))

    def parse(self, response):
        return {"field": 42}


class TestPipeline(unittest.TestCase):
    @classmethod
    def setUpClass(cls):
        cls.mockserver = MockServer()
        cls.mockserver.__enter__()

    @classmethod
    def tearDownClass(cls):
        cls.mockserver.__exit__(None, None, None)

    def _on_item_scraped(self, item):
        assert isinstance(item, dict)
        assert item.get("pipeline_passed")
        self.items.append(item)

    def _create_crawler(self, pipeline_class):
        settings = {
            "ITEM_PIPELINES": {pipeline_class: 1},
        }
        crawler = get_crawler(ItemSpider, settings)
        crawler.signals.connect(self._on_item_scraped, signals.item_scraped)
        self.items = []
        return crawler

    @defer.inlineCallbacks
    def test_simple_pipeline(self):
        crawler = self._create_crawler(SimplePipeline)
        yield crawler.crawl(mockserver=self.mockserver)
        assert len(self.items) == 1

    @defer.inlineCallbacks
    def test_deferred_pipeline(self):
        crawler = self._create_crawler(DeferredPipeline)
        yield crawler.crawl(mockserver=self.mockserver)
        assert len(self.items) == 1

    @defer.inlineCallbacks
    def test_asyncdef_pipeline(self):
        crawler = self._create_crawler(AsyncDefPipeline)
        yield crawler.crawl(mockserver=self.mockserver)
        assert len(self.items) == 1

    @pytest.mark.only_asyncio
    @defer.inlineCallbacks
    def test_asyncdef_asyncio_pipeline(self):
        crawler = self._create_crawler(AsyncDefAsyncioPipeline)
        yield crawler.crawl(mockserver=self.mockserver)
        assert len(self.items) == 1

    @pytest.mark.only_not_asyncio
    @defer.inlineCallbacks
    def test_asyncdef_not_asyncio_pipeline(self):
        crawler = self._create_crawler(AsyncDefNotAsyncioPipeline)
        yield crawler.crawl(mockserver=self.mockserver)
        assert len(self.items) == 1