1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239
|
from __future__ import annotations
import shutil
from pathlib import Path
from tempfile import mkdtemp
from typing import TYPE_CHECKING, Any
from testfixtures import LogCapture
from twisted.internet import defer
from twisted.trial.unittest import TestCase
from w3lib.url import add_or_replace_parameter
from scrapy import Spider, signals
from scrapy.utils.misc import load_object
from scrapy.utils.test import get_crawler
from tests.mockserver import MockServer
from tests.spiders import SimpleSpider
if TYPE_CHECKING:
from scrapy.crawler import Crawler
class MediaDownloadSpider(SimpleSpider):
name = "mediadownload"
def _process_url(self, url):
return url
def parse(self, response):
self.logger.info(response.headers)
self.logger.info(response.text)
item = {
self.media_key: [],
self.media_urls_key: [
self._process_url(response.urljoin(href))
for href in response.xpath(
'//table[thead/tr/th="Filename"]/tbody//a/@href'
).getall()
],
}
yield item
class BrokenLinksMediaDownloadSpider(MediaDownloadSpider):
name = "brokenmedia"
def _process_url(self, url):
return url + ".foo"
class RedirectedMediaDownloadSpider(MediaDownloadSpider):
name = "redirectedmedia"
def _process_url(self, url):
return add_or_replace_parameter(
self.mockserver.url("/redirect-to"), "goto", url
)
class TestFileDownloadCrawl(TestCase):
pipeline_class = "scrapy.pipelines.files.FilesPipeline"
store_setting_key = "FILES_STORE"
media_key = "files"
media_urls_key = "file_urls"
expected_checksums: set[str] | None = {
"5547178b89448faf0015a13f904c936e",
"c2281c83670e31d8aaab7cb642b824db",
"ed3f6538dc15d4d9179dae57319edc5f",
}
@classmethod
def setUpClass(cls):
cls.mockserver = MockServer()
cls.mockserver.__enter__()
@classmethod
def tearDownClass(cls):
cls.mockserver.__exit__(None, None, None)
def setUp(self):
# prepare a directory for storing files
self.tmpmediastore = Path(mkdtemp())
self.settings = {
"ITEM_PIPELINES": {self.pipeline_class: 1},
self.store_setting_key: str(self.tmpmediastore),
}
self.items = []
def tearDown(self):
shutil.rmtree(self.tmpmediastore)
self.items = []
def _on_item_scraped(self, item):
self.items.append(item)
def _create_crawler(
self, spider_class: type[Spider], settings: dict[str, Any] | None = None
) -> Crawler:
if settings is None:
settings = self.settings
crawler = get_crawler(spider_class, settings)
crawler.signals.connect(self._on_item_scraped, signals.item_scraped)
return crawler
def _assert_files_downloaded(self, items, logs):
assert len(items) == 1
assert self.media_key in items[0]
# check that logs show the expected number of successful file downloads
file_dl_success = "File (downloaded): Downloaded file from"
assert logs.count(file_dl_success) == 3
# check that the images/files status is `downloaded`
for item in items:
for i in item[self.media_key]:
assert i["status"] == "downloaded"
# check that the images/files checksums are what we know they should be
if self.expected_checksums is not None:
checksums = {i["checksum"] for item in items for i in item[self.media_key]}
assert checksums == self.expected_checksums
# check that the image files where actually written to the media store
for item in items:
for i in item[self.media_key]:
assert (self.tmpmediastore / i["path"]).exists()
def _assert_files_download_failure(self, crawler, items, code, logs):
# check that the item does NOT have the "images/files" field populated
assert len(items) == 1
assert self.media_key in items[0]
assert not items[0][self.media_key]
# check that there was 1 successful fetch and 3 other responses with non-200 code
assert crawler.stats.get_value("downloader/request_method_count/GET") == 4
assert crawler.stats.get_value("downloader/response_count") == 4
assert crawler.stats.get_value("downloader/response_status_count/200") == 1
assert crawler.stats.get_value(f"downloader/response_status_count/{code}") == 3
# check that logs do show the failure on the file downloads
file_dl_failure = f"File (code: {code}): Error downloading file from"
assert logs.count(file_dl_failure) == 3
# check that no files were written to the media store
assert not list(self.tmpmediastore.iterdir())
@defer.inlineCallbacks
def test_download_media(self):
crawler = self._create_crawler(MediaDownloadSpider)
with LogCapture() as log:
yield crawler.crawl(
self.mockserver.url("/files/images/"),
media_key=self.media_key,
media_urls_key=self.media_urls_key,
)
self._assert_files_downloaded(self.items, str(log))
@defer.inlineCallbacks
def test_download_media_wrong_urls(self):
crawler = self._create_crawler(BrokenLinksMediaDownloadSpider)
with LogCapture() as log:
yield crawler.crawl(
self.mockserver.url("/files/images/"),
media_key=self.media_key,
media_urls_key=self.media_urls_key,
)
self._assert_files_download_failure(crawler, self.items, 404, str(log))
@defer.inlineCallbacks
def test_download_media_redirected_default_failure(self):
crawler = self._create_crawler(RedirectedMediaDownloadSpider)
with LogCapture() as log:
yield crawler.crawl(
self.mockserver.url("/files/images/"),
media_key=self.media_key,
media_urls_key=self.media_urls_key,
mockserver=self.mockserver,
)
self._assert_files_download_failure(crawler, self.items, 302, str(log))
@defer.inlineCallbacks
def test_download_media_redirected_allowed(self):
settings = {
**self.settings,
"MEDIA_ALLOW_REDIRECTS": True,
}
crawler = self._create_crawler(RedirectedMediaDownloadSpider, settings)
with LogCapture() as log:
yield crawler.crawl(
self.mockserver.url("/files/images/"),
media_key=self.media_key,
media_urls_key=self.media_urls_key,
mockserver=self.mockserver,
)
self._assert_files_downloaded(self.items, str(log))
assert crawler.stats.get_value("downloader/response_status_count/302") == 3
@defer.inlineCallbacks
def test_download_media_file_path_error(self):
cls = load_object(self.pipeline_class)
class ExceptionRaisingMediaPipeline(cls):
def file_path(self, request, response=None, info=None, *, item=None):
return 1 / 0
settings = {
**self.settings,
"ITEM_PIPELINES": {ExceptionRaisingMediaPipeline: 1},
}
crawler = self._create_crawler(MediaDownloadSpider, settings)
with LogCapture() as log:
yield crawler.crawl(
self.mockserver.url("/files/images/"),
media_key=self.media_key,
media_urls_key=self.media_urls_key,
mockserver=self.mockserver,
)
assert "ZeroDivisionError" in str(log)
skip_pillow: str | None
try:
from PIL import Image # noqa: F401
except ImportError:
skip_pillow = "Missing Python Imaging Library, install https://pypi.org/pypi/Pillow"
else:
skip_pillow = None
class ImageDownloadCrawlTestCase(TestFileDownloadCrawl):
skip = skip_pillow
pipeline_class = "scrapy.pipelines.images.ImagesPipeline"
store_setting_key = "IMAGES_STORE"
media_key = "images"
media_urls_key = "image_urls"
# somehow checksums for images are different for Python 3.3
expected_checksums = None
|