File: item-pipeline.rst

package info (click to toggle)
python-scrapy 2.13.3-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 5,664 kB
  • sloc: python: 52,028; xml: 199; makefile: 25; sh: 7
file content (268 lines) | stat: -rw-r--r-- 8,272 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
.. _topics-item-pipeline:

=============
Item Pipeline
=============

After an item has been scraped by a spider, it is sent to the Item Pipeline
which processes it through several components that are executed sequentially.

Each item pipeline component (sometimes referred as just "Item Pipeline") is a
Python class that implements a simple method. They receive an item and perform
an action over it, also deciding if the item should continue through the
pipeline or be dropped and no longer processed.

Typical uses of item pipelines are:

* cleansing HTML data
* validating scraped data (checking that the items contain certain fields)
* checking for duplicates (and dropping them)
* storing the scraped item in a database


Writing your own item pipeline
==============================

Each item pipeline is a :ref:`component <topics-components>` that must
implement the following method:

.. method:: process_item(self, item, spider)

   This method is called for every item pipeline component.

   `item` is an :ref:`item object <item-types>`, see
   :ref:`supporting-item-types`.

   :meth:`process_item` must either: return an :ref:`item object <item-types>`,
   return a :class:`~twisted.internet.defer.Deferred` or raise a
   :exc:`~scrapy.exceptions.DropItem` exception.

   Dropped items are no longer processed by further pipeline components.

   :param item: the scraped item
   :type item: :ref:`item object <item-types>`

   :param spider: the spider which scraped the item
   :type spider: :class:`~scrapy.Spider` object

Additionally, they may also implement the following methods:

.. method:: open_spider(self, spider)

   This method is called when the spider is opened.

   :param spider: the spider which was opened
   :type spider: :class:`~scrapy.Spider` object

.. method:: close_spider(self, spider)

   This method is called when the spider is closed.

   :param spider: the spider which was closed
   :type spider: :class:`~scrapy.Spider` object


Item pipeline example
=====================

Price validation and dropping items with no prices
--------------------------------------------------

Let's take a look at the following hypothetical pipeline that adjusts the
``price`` attribute for those items that do not include VAT
(``price_excludes_vat`` attribute), and drops those items which don't
contain a price:

.. code-block:: python

    from itemadapter import ItemAdapter
    from scrapy.exceptions import DropItem


    class PricePipeline:
        vat_factor = 1.15

        def process_item(self, item, spider):
            adapter = ItemAdapter(item)
            if adapter.get("price"):
                if adapter.get("price_excludes_vat"):
                    adapter["price"] = adapter["price"] * self.vat_factor
                return item
            else:
                raise DropItem("Missing price")


Write items to a JSON lines file
--------------------------------

The following pipeline stores all scraped items (from all spiders) into a
single ``items.jsonl`` file, containing one item per line serialized in JSON
format:

.. code-block:: python

   import json

   from itemadapter import ItemAdapter


   class JsonWriterPipeline:
       def open_spider(self, spider):
           self.file = open("items.jsonl", "w")

       def close_spider(self, spider):
           self.file.close()

       def process_item(self, item, spider):
           line = json.dumps(ItemAdapter(item).asdict()) + "\n"
           self.file.write(line)
           return item

.. note:: The purpose of JsonWriterPipeline is just to introduce how to write
   item pipelines. If you really want to store all scraped items into a JSON
   file you should use the :ref:`Feed exports <topics-feed-exports>`.

Write items to MongoDB
----------------------

In this example we'll write items to MongoDB_ using pymongo_.
MongoDB address and database name are specified in Scrapy settings;
MongoDB collection is named after item class.

The main point of this example is to show how to :ref:`get the crawler
<from-crawler>` and how to clean up the resources properly.

.. skip: next
.. code-block:: python

    import pymongo
    from itemadapter import ItemAdapter


    class MongoPipeline:
        collection_name = "scrapy_items"

        def __init__(self, mongo_uri, mongo_db):
            self.mongo_uri = mongo_uri
            self.mongo_db = mongo_db

        @classmethod
        def from_crawler(cls, crawler):
            return cls(
                mongo_uri=crawler.settings.get("MONGO_URI"),
                mongo_db=crawler.settings.get("MONGO_DATABASE", "items"),
            )

        def open_spider(self, spider):
            self.client = pymongo.MongoClient(self.mongo_uri)
            self.db = self.client[self.mongo_db]

        def close_spider(self, spider):
            self.client.close()

        def process_item(self, item, spider):
            self.db[self.collection_name].insert_one(ItemAdapter(item).asdict())
            return item

.. _MongoDB: https://www.mongodb.com/
.. _pymongo: https://pymongo.readthedocs.io/en/stable/


.. _ScreenshotPipeline:

Take screenshot of item
-----------------------

This example demonstrates how to use :doc:`coroutine syntax <coroutines>` in
the :meth:`process_item` method.

This item pipeline makes a request to a locally-running instance of Splash_ to
render a screenshot of the item URL. After the request response is downloaded,
the item pipeline saves the screenshot to a file and adds the filename to the
item.

.. code-block:: python

    import hashlib
    from pathlib import Path
    from urllib.parse import quote

    import scrapy
    from itemadapter import ItemAdapter
    from scrapy.http.request import NO_CALLBACK
    from scrapy.utils.defer import maybe_deferred_to_future


    class ScreenshotPipeline:
        """Pipeline that uses Splash to render screenshot of
        every Scrapy item."""

        SPLASH_URL = "http://localhost:8050/render.png?url={}"

        async def process_item(self, item, spider):
            adapter = ItemAdapter(item)
            encoded_item_url = quote(adapter["url"])
            screenshot_url = self.SPLASH_URL.format(encoded_item_url)
            request = scrapy.Request(screenshot_url, callback=NO_CALLBACK)
            response = await maybe_deferred_to_future(
                spider.crawler.engine.download(request)
            )

            if response.status != 200:
                # Error happened, return item.
                return item

            # Save screenshot to file, filename will be hash of url.
            url = adapter["url"]
            url_hash = hashlib.md5(url.encode("utf8")).hexdigest()
            filename = f"{url_hash}.png"
            Path(filename).write_bytes(response.body)

            # Store filename in item.
            adapter["screenshot_filename"] = filename
            return item

.. _Splash: https://splash.readthedocs.io/en/stable/

Duplicates filter
-----------------

A filter that looks for duplicate items, and drops those items that were
already processed. Let's say that our items have a unique id, but our spider
returns multiples items with the same id:

.. code-block:: python

    from itemadapter import ItemAdapter
    from scrapy.exceptions import DropItem


    class DuplicatesPipeline:
        def __init__(self):
            self.ids_seen = set()

        def process_item(self, item, spider):
            adapter = ItemAdapter(item)
            if adapter["id"] in self.ids_seen:
                raise DropItem(f"Item ID already seen: {adapter['id']}")
            else:
                self.ids_seen.add(adapter["id"])
                return item


Activating an Item Pipeline component
=====================================

To activate an Item Pipeline component you must add its class to the
:setting:`ITEM_PIPELINES` setting, like in the following example:

.. code-block:: python

   ITEM_PIPELINES = {
       "myproject.pipelines.PricePipeline": 300,
       "myproject.pipelines.JsonWriterPipeline": 800,
   }

The integer values you assign to classes in this setting determine the
order in which they run: items go through from lower valued to higher
valued classes. It's customary to define these numbers in the 0-1000 range.