File: testproc.py

package info (click to toggle)
python-scrapy 0.24.2-1
  • links: PTS, VCS
  • area: main
  • in suites: jessie, jessie-kfreebsd
  • size: 3,240 kB
  • ctags: 4,259
  • sloc: python: 21,170; xml: 199; makefile: 67; sh: 44
file content (48 lines) | stat: -rw-r--r-- 1,465 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import sys
import os

from twisted.internet import reactor, defer, protocol

class ProcessTest(object):

    command = None
    prefix = [sys.executable, '-m', 'scrapy.cmdline']
    cwd = os.getcwd() # trial chdirs to temp dir

    def execute(self, args, check_code=True, settings=None):
        env = os.environ.copy()
        if settings is not None:
            env['SCRAPY_SETTINGS_MODULE'] = settings
        cmd = self.prefix + [self.command] + list(args)
        pp = TestProcessProtocol()
        pp.deferred.addBoth(self._process_finished, cmd, check_code)
        reactor.spawnProcess(pp, cmd[0], cmd, env=env, path=self.cwd)
        return pp.deferred

    def _process_finished(self, pp, cmd, check_code):
        if pp.exitcode and check_code:
            msg = "process %s exit with code %d" % (cmd, pp.exitcode)
            msg += "\n>>> stdout <<<\n%s" % pp.out
            msg += "\n"
            msg += "\n>>> stderr <<<\n%s" % pp.err
            raise RuntimeError(msg)
        return pp.exitcode, pp.out, pp.err


class TestProcessProtocol(protocol.ProcessProtocol):

    def __init__(self):
        self.deferred = defer.Deferred()
        self.out = ''
        self.err = ''
        self.exitcode = None

    def outReceived(self, data):
        self.out += data

    def errReceived(self, data):
        self.err += data

    def processEnded(self, status):
        self.exitcode = status.value.exitCode
        self.deferred.callback(self)