File: testproc.py

package info (click to toggle)
python-scrapy 2.4.1-2%2Bdeb11u1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 4,748 kB
  • sloc: python: 32,888; xml: 199; makefile: 90; sh: 7
file content (50 lines) | stat: -rw-r--r-- 1,487 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import sys
import os

from twisted.internet import defer, protocol


class ProcessTest:

    command = None
    prefix = [sys.executable, '-m', 'scrapy.cmdline']
    cwd = os.getcwd()  # trial chdirs to temp dir

    def execute(self, args, check_code=True, settings=None):
        from twisted.internet import reactor
        env = os.environ.copy()
        if settings is not None:
            env['SCRAPY_SETTINGS_MODULE'] = settings
        cmd = self.prefix + [self.command] + list(args)
        pp = TestProcessProtocol()
        pp.deferred.addBoth(self._process_finished, cmd, check_code)
        reactor.spawnProcess(pp, cmd[0], cmd, env=env, path=self.cwd)
        return pp.deferred

    def _process_finished(self, pp, cmd, check_code):
        if pp.exitcode and check_code:
            msg = f"process {cmd} exit with code {pp.exitcode}"
            msg += f"\n>>> stdout <<<\n{pp.out}"
            msg += "\n"
            msg += f"\n>>> stderr <<<\n{pp.err}"
            raise RuntimeError(msg)
        return pp.exitcode, pp.out, pp.err


class TestProcessProtocol(protocol.ProcessProtocol):

    def __init__(self):
        self.deferred = defer.Deferred()
        self.out = b''
        self.err = b''
        self.exitcode = None

    def outReceived(self, data):
        self.out += data

    def errReceived(self, data):
        self.err += data

    def processEnded(self, status):
        self.exitcode = status.value.exitCode
        self.deferred.callback(self)