File: shell.py

package info (click to toggle)
python-scrapy 1.5.1-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, buster, sid
  • size: 4,308 kB
  • sloc: python: 25,583; xml: 199; makefile: 95; sh: 33
file content (207 lines) | stat: -rw-r--r-- 7,545 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
"""Scrapy Shell

See documentation in docs/topics/shell.rst

"""
from __future__ import print_function

import os
import signal
import warnings

from twisted.internet import reactor, threads, defer
from twisted.python import threadable
from w3lib.url import any_to_uri

from scrapy.crawler import Crawler
from scrapy.exceptions import IgnoreRequest, ScrapyDeprecationWarning
from scrapy.http import Request, Response
from scrapy.item import BaseItem
from scrapy.settings import Settings
from scrapy.spiders import Spider
from scrapy.utils.console import start_python_console
from scrapy.utils.datatypes import SequenceExclude
from scrapy.utils.misc import load_object
from scrapy.utils.response import open_in_browser
from scrapy.utils.conf import get_config
from scrapy.utils.console import DEFAULT_PYTHON_SHELLS


class Shell(object):

    relevant_classes = (Crawler, Spider, Request, Response, BaseItem,
                        Settings)

    def __init__(self, crawler, update_vars=None, code=None):
        self.crawler = crawler
        self.update_vars = update_vars or (lambda x: None)
        self.item_class = load_object(crawler.settings['DEFAULT_ITEM_CLASS'])
        self.spider = None
        self.inthread = not threadable.isInIOThread()
        self.code = code
        self.vars = {}

    def start(self, url=None, request=None, response=None, spider=None, redirect=True):
        # disable accidental Ctrl-C key press from shutting down the engine
        signal.signal(signal.SIGINT, signal.SIG_IGN)
        if url:
            self.fetch(url, spider, redirect=redirect)
        elif request:
            self.fetch(request, spider)
        elif response:
            request = response.request
            self.populate_vars(response, request, spider)
        else:
            self.populate_vars()
        if self.code:
            print(eval(self.code, globals(), self.vars))
        else:
            """
            Detect interactive shell setting in scrapy.cfg
            e.g.: ~/.config/scrapy.cfg or ~/.scrapy.cfg
            [settings]
            # shell can be one of ipython, bpython or python;
            # to be used as the interactive python console, if available.
            # (default is ipython, fallbacks in the order listed above)
            shell = python
            """
            cfg = get_config()
            section, option = 'settings', 'shell'
            env = os.environ.get('SCRAPY_PYTHON_SHELL')
            shells = []
            if env:
                shells += env.strip().lower().split(',')
            elif cfg.has_option(section, option):
                shells += [cfg.get(section, option).strip().lower()]
            else:  # try all by default
                shells += DEFAULT_PYTHON_SHELLS.keys()
            # always add standard shell as fallback
            shells += ['python']
            start_python_console(self.vars, shells=shells,
                                 banner=self.vars.pop('banner', ''))

    def _schedule(self, request, spider):
        spider = self._open_spider(request, spider)
        d = _request_deferred(request)
        d.addCallback(lambda x: (x, spider))
        self.crawler.engine.crawl(request, spider)
        return d

    def _open_spider(self, request, spider):
        if self.spider:
            return self.spider

        if spider is None:
            spider = self.crawler.spider or self.crawler._create_spider()

        self.crawler.spider = spider
        self.crawler.engine.open_spider(spider, close_if_idle=False)
        self.spider = spider
        return spider

    def fetch(self, request_or_url, spider=None, redirect=True, **kwargs):
        if isinstance(request_or_url, Request):
            request = request_or_url
        else:
            url = any_to_uri(request_or_url)
            request = Request(url, dont_filter=True, **kwargs)
            if redirect:
                request.meta['handle_httpstatus_list'] = SequenceExclude(range(300, 400))
            else:
                request.meta['handle_httpstatus_all'] = True
        response = None
        try:
            response, spider = threads.blockingCallFromThread(
                reactor, self._schedule, request, spider)
        except IgnoreRequest:
            pass
        self.populate_vars(response, request, spider)

    def populate_vars(self, response=None, request=None, spider=None):
        import scrapy

        self.vars['scrapy'] = scrapy
        self.vars['crawler'] = self.crawler
        self.vars['item'] = self.item_class()
        self.vars['settings'] = self.crawler.settings
        self.vars['spider'] = spider
        self.vars['request'] = request
        self.vars['response'] = response
        self.vars['sel'] = _SelectorProxy(response)
        if self.inthread:
            self.vars['fetch'] = self.fetch
        self.vars['view'] = open_in_browser
        self.vars['shelp'] = self.print_help
        self.update_vars(self.vars)
        if not self.code:
            self.vars['banner'] = self.get_help()

    def print_help(self):
        print(self.get_help())

    def get_help(self):
        b = []
        b.append("Available Scrapy objects:")
        b.append("  scrapy     scrapy module (contains scrapy.Request, scrapy.Selector, etc)")
        for k, v in sorted(self.vars.items()):
            if self._is_relevant(v):
                b.append("  %-10s %s" % (k, v))
        b.append("Useful shortcuts:")
        if self.inthread:
            b.append("  fetch(url[, redirect=True]) "
                     "Fetch URL and update local objects "
                     "(by default, redirects are followed)")
            b.append("  fetch(req)                  "
                     "Fetch a scrapy.Request and update local objects ")
        b.append("  shelp()           Shell help (print this help)")
        b.append("  view(response)    View response in a browser")

        return "\n".join("[s] %s" % l for l in b)

    def _is_relevant(self, value):
        return isinstance(value, self.relevant_classes)


def inspect_response(response, spider):
    """Open a shell to inspect the given response"""
    Shell(spider.crawler).start(response=response, spider=spider)


def _request_deferred(request):
    """Wrap a request inside a Deferred.

    This function is harmful, do not use it until you know what you are doing.

    This returns a Deferred whose first pair of callbacks are the request
    callback and errback. The Deferred also triggers when the request
    callback/errback is executed (ie. when the request is downloaded)

    WARNING: Do not call request.replace() until after the deferred is called.
    """
    request_callback = request.callback
    request_errback = request.errback

    def _restore_callbacks(result):
        request.callback = request_callback
        request.errback = request_errback
        return result

    d = defer.Deferred()
    d.addBoth(_restore_callbacks)
    if request.callback:
        d.addCallbacks(request.callback, request.errback)

    request.callback, request.errback = d.callback, d.errback
    return d


class _SelectorProxy(object):

    def __init__(self, response):
        self._proxiedresponse = response

    def __getattr__(self, name):
        warnings.warn('"sel" shortcut is deprecated. Use "response.xpath()", '
                      '"response.css()" or "response.selector" instead',
                      category=ScrapyDeprecationWarning, stacklevel=2)
        return getattr(self._proxiedresponse.selector, name)