File: test_wsgi.py

package info (click to toggle)
python-pbr 5.5.0-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye, sid
  • size: 772 kB
  • sloc: python: 4,499; sh: 117; ansic: 24; makefile: 23
file content (165 lines) | stat: -rw-r--r-- 5,835 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
# Copyright (c) 2015 Hewlett-Packard Development Company, L.P. (HP)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import os
import re
import subprocess
import sys
try:
    # python 2
    from urllib2 import urlopen
except ImportError:
    # python 3
    from urllib.request import urlopen

from pbr.tests import base


class TestWsgiScripts(base.BaseTestCase):

    cmd_names = ('pbr_test_wsgi', 'pbr_test_wsgi_with_class')

    def _get_path(self):
        if os.path.isdir("%s/lib64" % self.temp_dir):
            path = "%s/lib64" % self.temp_dir
        elif os.path.isdir("%s/lib" % self.temp_dir):
            path = "%s/lib" % self.temp_dir
        elif os.path.isdir("%s/site-packages" % self.temp_dir):
            return ".:%s/site-packages" % self.temp_dir
        else:
            raise Exception("Could not determine path for test")
        return ".:%s/python%s.%s/site-packages" % (
            path,
            sys.version_info[0],
            sys.version_info[1])

    def test_wsgi_script_install(self):
        """Test that we install a non-pkg-resources wsgi script."""
        if os.name == 'nt':
            self.skipTest('Windows support is passthrough')

        stdout, _, return_code = self.run_setup(
            'install', '--prefix=%s' % self.temp_dir)

        self._check_wsgi_install_content(stdout)

    def test_wsgi_script_run(self):
        """Test that we install a runnable wsgi script.

        This test actually attempts to start and interact with the
        wsgi script in question to demonstrate that it's a working
        wsgi script using simple server.

        """
        self.skipTest('Not working on Debian')
        if os.name == 'nt':
            self.skipTest('Windows support is passthrough')

        stdout, _, return_code = self.run_setup(
            'install', '--prefix=%s' % self.temp_dir)

        self._check_wsgi_install_content(stdout)

        # Live test run the scripts and see that they respond to wsgi
        # requests.
        for cmd_name in self.cmd_names:
            self._test_wsgi(cmd_name, b'Hello World')

    def _test_wsgi(self, cmd_name, output, extra_args=None):
        cmd = os.path.join(self.temp_dir, 'bin', cmd_name)
        print("Running %s -p 0 -b 127.0.0.1" % cmd)
        popen_cmd = [cmd, '-p', '0', '-b', '127.0.0.1']
        if extra_args:
            popen_cmd.extend(extra_args)

        env = {'PYTHONPATH': self._get_path()}

        p = subprocess.Popen(popen_cmd, stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE, cwd=self.temp_dir,
                             env=env)
        self.addCleanup(p.kill)

        stdoutdata = p.stdout.readline()  # ****...

        stdoutdata = p.stdout.readline()  # STARTING test server...
        self.assertIn(
            b"STARTING test server pbr_testpackage.wsgi",
            stdoutdata)

        stdoutdata = p.stdout.readline()  # Available at ...
        print(stdoutdata)
        m = re.search(br'(http://[^:]+:\d+)/', stdoutdata)
        self.assertIsNotNone(m, "Regex failed to match on %s" % stdoutdata)

        stdoutdata = p.stdout.readline()  # DANGER! ...
        self.assertIn(
            b"DANGER! For testing only, do not use in production",
            stdoutdata)

        stdoutdata = p.stdout.readline()  # ***...

        f = urlopen(m.group(1).decode('utf-8'))
        self.assertEqual(output, f.read())

        # Request again so that the application can force stderr.flush(),
        # otherwise the log is buffered and the next readline() will hang.
        urlopen(m.group(1).decode('utf-8'))

        stdoutdata = p.stderr.readline()
        # we should have logged an HTTP request, return code 200, that
        # returned the right amount of bytes
        status = '"GET / HTTP/1.1" 200 %d' % len(output)
        self.assertIn(status.encode('utf-8'), stdoutdata)

    def _check_wsgi_install_content(self, install_stdout):
        for cmd_name in self.cmd_names:
            install_txt = 'Installing %s script to %s' % (cmd_name,
                                                          self.temp_dir)
            self.assertIn(install_txt, install_stdout)

            cmd_filename = os.path.join(self.temp_dir, 'bin', cmd_name)

            script_txt = open(cmd_filename, 'r').read()
            self.assertNotIn('pkg_resources', script_txt)

            main_block = """if __name__ == "__main__":
    import argparse
    import socket
    import sys
    import wsgiref.simple_server as wss"""

            if cmd_name == 'pbr_test_wsgi':
                app_name = "main"
            else:
                app_name = "WSGI.app"

            starting_block = ("STARTING test server pbr_testpackage.wsgi."
                              "%s" % app_name)

            else_block = """else:
    application = None"""

            self.assertIn(main_block, script_txt)
            self.assertIn(starting_block, script_txt)
            self.assertIn(else_block, script_txt)

    def test_with_argument(self):
        self.skipTest('Not working on Debian')
        if os.name == 'nt':
            self.skipTest('Windows support is passthrough')

        stdout, _, return_code = self.run_setup(
            'install', '--prefix=%s' % self.temp_dir)

        self._test_wsgi('pbr_test_wsgi', b'Foo Bar', ["--", "-c", "Foo Bar"])