File: test_pipeline_crawl.py

package info (click to toggle)
python-scrapy 2.13.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 5,664 kB
  • sloc: python: 52,028; xml: 199; makefile: 25; sh: 7
file content (239 lines) | stat: -rw-r--r-- 8,313 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
from __future__ import annotations

import shutil
from pathlib import Path
from tempfile import mkdtemp
from typing import TYPE_CHECKING, Any

from testfixtures import LogCapture
from twisted.internet import defer
from twisted.trial.unittest import TestCase
from w3lib.url import add_or_replace_parameter

from scrapy import Spider, signals
from scrapy.utils.misc import load_object
from scrapy.utils.test import get_crawler
from tests.mockserver import MockServer
from tests.spiders import SimpleSpider

if TYPE_CHECKING:
    from scrapy.crawler import Crawler


class MediaDownloadSpider(SimpleSpider):
    name = "mediadownload"

    def _process_url(self, url):
        return url

    def parse(self, response):
        self.logger.info(response.headers)
        self.logger.info(response.text)
        item = {
            self.media_key: [],
            self.media_urls_key: [
                self._process_url(response.urljoin(href))
                for href in response.xpath(
                    '//table[thead/tr/th="Filename"]/tbody//a/@href'
                ).getall()
            ],
        }
        yield item


class BrokenLinksMediaDownloadSpider(MediaDownloadSpider):
    name = "brokenmedia"

    def _process_url(self, url):
        return url + ".foo"


class RedirectedMediaDownloadSpider(MediaDownloadSpider):
    name = "redirectedmedia"

    def _process_url(self, url):
        return add_or_replace_parameter(
            self.mockserver.url("/redirect-to"), "goto", url
        )


class TestFileDownloadCrawl(TestCase):
    pipeline_class = "scrapy.pipelines.files.FilesPipeline"
    store_setting_key = "FILES_STORE"
    media_key = "files"
    media_urls_key = "file_urls"
    expected_checksums: set[str] | None = {
        "5547178b89448faf0015a13f904c936e",
        "c2281c83670e31d8aaab7cb642b824db",
        "ed3f6538dc15d4d9179dae57319edc5f",
    }

    @classmethod
    def setUpClass(cls):
        cls.mockserver = MockServer()
        cls.mockserver.__enter__()

    @classmethod
    def tearDownClass(cls):
        cls.mockserver.__exit__(None, None, None)

    def setUp(self):
        # prepare a directory for storing files
        self.tmpmediastore = Path(mkdtemp())
        self.settings = {
            "ITEM_PIPELINES": {self.pipeline_class: 1},
            self.store_setting_key: str(self.tmpmediastore),
        }
        self.items = []

    def tearDown(self):
        shutil.rmtree(self.tmpmediastore)
        self.items = []

    def _on_item_scraped(self, item):
        self.items.append(item)

    def _create_crawler(
        self, spider_class: type[Spider], settings: dict[str, Any] | None = None
    ) -> Crawler:
        if settings is None:
            settings = self.settings
        crawler = get_crawler(spider_class, settings)
        crawler.signals.connect(self._on_item_scraped, signals.item_scraped)
        return crawler

    def _assert_files_downloaded(self, items, logs):
        assert len(items) == 1
        assert self.media_key in items[0]

        # check that logs show the expected number of successful file downloads
        file_dl_success = "File (downloaded): Downloaded file from"
        assert logs.count(file_dl_success) == 3

        # check that the images/files status is `downloaded`
        for item in items:
            for i in item[self.media_key]:
                assert i["status"] == "downloaded"

        # check that the images/files checksums are what we know they should be
        if self.expected_checksums is not None:
            checksums = {i["checksum"] for item in items for i in item[self.media_key]}
            assert checksums == self.expected_checksums

        # check that the image files where actually written to the media store
        for item in items:
            for i in item[self.media_key]:
                assert (self.tmpmediastore / i["path"]).exists()

    def _assert_files_download_failure(self, crawler, items, code, logs):
        # check that the item does NOT have the "images/files" field populated
        assert len(items) == 1
        assert self.media_key in items[0]
        assert not items[0][self.media_key]

        # check that there was 1 successful fetch and 3 other responses with non-200 code
        assert crawler.stats.get_value("downloader/request_method_count/GET") == 4
        assert crawler.stats.get_value("downloader/response_count") == 4
        assert crawler.stats.get_value("downloader/response_status_count/200") == 1
        assert crawler.stats.get_value(f"downloader/response_status_count/{code}") == 3

        # check that logs do show the failure on the file downloads
        file_dl_failure = f"File (code: {code}): Error downloading file from"
        assert logs.count(file_dl_failure) == 3

        # check that no files were written to the media store
        assert not list(self.tmpmediastore.iterdir())

    @defer.inlineCallbacks
    def test_download_media(self):
        crawler = self._create_crawler(MediaDownloadSpider)
        with LogCapture() as log:
            yield crawler.crawl(
                self.mockserver.url("/files/images/"),
                media_key=self.media_key,
                media_urls_key=self.media_urls_key,
            )
        self._assert_files_downloaded(self.items, str(log))

    @defer.inlineCallbacks
    def test_download_media_wrong_urls(self):
        crawler = self._create_crawler(BrokenLinksMediaDownloadSpider)
        with LogCapture() as log:
            yield crawler.crawl(
                self.mockserver.url("/files/images/"),
                media_key=self.media_key,
                media_urls_key=self.media_urls_key,
            )
        self._assert_files_download_failure(crawler, self.items, 404, str(log))

    @defer.inlineCallbacks
    def test_download_media_redirected_default_failure(self):
        crawler = self._create_crawler(RedirectedMediaDownloadSpider)
        with LogCapture() as log:
            yield crawler.crawl(
                self.mockserver.url("/files/images/"),
                media_key=self.media_key,
                media_urls_key=self.media_urls_key,
                mockserver=self.mockserver,
            )
        self._assert_files_download_failure(crawler, self.items, 302, str(log))

    @defer.inlineCallbacks
    def test_download_media_redirected_allowed(self):
        settings = {
            **self.settings,
            "MEDIA_ALLOW_REDIRECTS": True,
        }
        crawler = self._create_crawler(RedirectedMediaDownloadSpider, settings)
        with LogCapture() as log:
            yield crawler.crawl(
                self.mockserver.url("/files/images/"),
                media_key=self.media_key,
                media_urls_key=self.media_urls_key,
                mockserver=self.mockserver,
            )
        self._assert_files_downloaded(self.items, str(log))
        assert crawler.stats.get_value("downloader/response_status_count/302") == 3

    @defer.inlineCallbacks
    def test_download_media_file_path_error(self):
        cls = load_object(self.pipeline_class)

        class ExceptionRaisingMediaPipeline(cls):
            def file_path(self, request, response=None, info=None, *, item=None):
                return 1 / 0

        settings = {
            **self.settings,
            "ITEM_PIPELINES": {ExceptionRaisingMediaPipeline: 1},
        }
        crawler = self._create_crawler(MediaDownloadSpider, settings)
        with LogCapture() as log:
            yield crawler.crawl(
                self.mockserver.url("/files/images/"),
                media_key=self.media_key,
                media_urls_key=self.media_urls_key,
                mockserver=self.mockserver,
            )
        assert "ZeroDivisionError" in str(log)


skip_pillow: str | None
try:
    from PIL import Image  # noqa: F401
except ImportError:
    skip_pillow = "Missing Python Imaging Library, install https://pypi.org/pypi/Pillow"
else:
    skip_pillow = None


class ImageDownloadCrawlTestCase(TestFileDownloadCrawl):
    skip = skip_pillow

    pipeline_class = "scrapy.pipelines.images.ImagesPipeline"
    store_setting_key = "IMAGES_STORE"
    media_key = "images"
    media_urls_key = "image_urls"

    # somehow checksums for images are different for Python 3.3
    expected_checksums = None