File: test_pipelines.py

package info (click to toggle)
python-scrapy 2.4.1-2%2Bdeb11u1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 4,748 kB
  • sloc: python: 32,888; xml: 199; makefile: 90; sh: 7
file content (101 lines) | stat: -rw-r--r-- 2,872 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import asyncio

from pytest import mark
from twisted.internet import defer
from twisted.internet.defer import Deferred
from twisted.trial import unittest

from scrapy import Spider, signals, Request
from scrapy.utils.test import get_crawler, get_from_asyncio_queue

from tests.mockserver import MockServer


class SimplePipeline:
    def process_item(self, item, spider):
        item['pipeline_passed'] = True
        return item


class DeferredPipeline:
    def cb(self, item):
        item['pipeline_passed'] = True
        return item

    def process_item(self, item, spider):
        d = Deferred()
        d.addCallback(self.cb)
        d.callback(item)
        return d


class AsyncDefPipeline:
    async def process_item(self, item, spider):
        await defer.succeed(42)
        item['pipeline_passed'] = True
        return item


class AsyncDefAsyncioPipeline:
    async def process_item(self, item, spider):
        await asyncio.sleep(0.2)
        item['pipeline_passed'] = await get_from_asyncio_queue(True)
        return item


class ItemSpider(Spider):
    name = 'itemspider'

    def start_requests(self):
        yield Request(self.mockserver.url('/status?n=200'))

    def parse(self, response):
        return {'field': 42}


class PipelineTestCase(unittest.TestCase):
    def setUp(self):
        self.mockserver = MockServer()
        self.mockserver.__enter__()

    def tearDown(self):
        self.mockserver.__exit__(None, None, None)

    def _on_item_scraped(self, item):
        self.assertIsInstance(item, dict)
        self.assertTrue(item.get('pipeline_passed'))
        self.items.append(item)

    def _create_crawler(self, pipeline_class):
        settings = {
            'ITEM_PIPELINES': {pipeline_class: 1},
        }
        crawler = get_crawler(ItemSpider, settings)
        crawler.signals.connect(self._on_item_scraped, signals.item_scraped)
        self.items = []
        return crawler

    @defer.inlineCallbacks
    def test_simple_pipeline(self):
        crawler = self._create_crawler(SimplePipeline)
        yield crawler.crawl(mockserver=self.mockserver)
        self.assertEqual(len(self.items), 1)

    @defer.inlineCallbacks
    def test_deferred_pipeline(self):
        crawler = self._create_crawler(DeferredPipeline)
        yield crawler.crawl(mockserver=self.mockserver)
        self.assertEqual(len(self.items), 1)

    @defer.inlineCallbacks
    def test_asyncdef_pipeline(self):
        crawler = self._create_crawler(AsyncDefPipeline)
        yield crawler.crawl(mockserver=self.mockserver)
        self.assertEqual(len(self.items), 1)

    @mark.only_asyncio()
    @defer.inlineCallbacks
    def test_asyncdef_asyncio_pipeline(self):
        crawler = self._create_crawler(AsyncDefAsyncioPipeline)
        yield crawler.crawl(mockserver=self.mockserver)
        self.assertEqual(len(self.items), 1)