1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
|
# Copyright (c) 2015 Hewlett-Packard Development Company, L.P. (HP)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
from __future__ import print_function
import os
import re
import subprocess
import sys
import sysconfig
from pbr._compat.five import urlopen
from pbr.tests.functional import base
class TestPackaging(base.BaseWheelTestCase):
def test_data_directory_has_wsgi_scripts(self):
# Build the path to the scripts directory
scripts_dir = os.path.join(
self.extracted_wheel_dir, 'pbr_testpackage-0.0.data/scripts'
)
self.assertTrue(os.path.exists(scripts_dir))
scripts = os.listdir(scripts_dir)
self.assertIn('pbr_test_wsgi', scripts)
self.assertIn('pbr_test_wsgi_with_class', scripts)
self.assertNotIn('pbr_test_cmd', scripts)
self.assertNotIn('pbr_test_cmd_with_class', scripts)
class TestWsgiScripts(base.BaseTestCase):
cmd_names = ('pbr_test_wsgi', 'pbr_test_wsgi_with_class')
def _get_path(self):
if sys.version_info[0] > 3:
return sysconfig.get_path("platlib", vars={"base": self.temp_dir})
if os.path.isdir("%s/lib64" % self.temp_dir):
path = "%s/lib64" % self.temp_dir
elif os.path.isdir("%s/lib" % self.temp_dir):
path = "%s/lib" % self.temp_dir
elif os.path.isdir("%s/site-packages" % self.temp_dir):
return ".:%s/site-packages" % self.temp_dir
else:
raise Exception("Could not determine path for test")
return ".:%s/python%s.%s/site-packages" % (
path,
sys.version_info[0],
sys.version_info[1],
)
def test_wsgi_script_install(self):
"""Test that we install a non-pkg-resources wsgi script."""
if os.name == 'nt':
self.skipTest('Windows support is passthrough')
stdout, _, return_code = self.run_setup(
'install', '--prefix=%s' % self.temp_dir
)
self._check_wsgi_install_content(stdout)
def test_wsgi_script_run(self):
"""Test that we install a runnable wsgi script.
This test actually attempts to start and interact with the
wsgi script in question to demonstrate that it's a working
wsgi script using simple server.
"""
if os.name == 'nt':
self.skipTest('Windows support is passthrough')
stdout, _, return_code = self.run_setup(
'install', '--prefix=%s' % self.temp_dir
)
self._check_wsgi_install_content(stdout)
# Live test run the scripts and see that they respond to wsgi
# requests.
for cmd_name in self.cmd_names:
self._test_wsgi(cmd_name, b'Hello World')
def _test_wsgi(self, cmd_name, output, extra_args=None):
cmd = os.path.join(self.temp_dir, 'bin', cmd_name)
print("Running %s -p 0 -b 127.0.0.1" % cmd)
popen_cmd = [cmd, '-p', '0', '-b', '127.0.0.1']
if extra_args:
popen_cmd.extend(extra_args)
env = {'PYTHONPATH': self._get_path()}
p = subprocess.Popen(
popen_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=self.temp_dir,
env=env,
)
self.addCleanup(p.kill)
stdoutdata = p.stdout.readline() # ****...
stdoutdata = p.stdout.readline() # STARTING test server...
self.assertIn(b"STARTING test server pbr_testpackage.wsgi", stdoutdata)
stdoutdata = p.stdout.readline() # Available at ...
print(stdoutdata)
m = re.search(br'(http://[^:]+:\d+)/', stdoutdata)
self.assertIsNotNone(m, "Regex failed to match on %s" % stdoutdata)
stdoutdata = p.stdout.readline() # DANGER! ...
self.assertIn(
b"DANGER! For testing only, do not use in production", stdoutdata
)
stdoutdata = p.stdout.readline() # ***...
f = urlopen(m.group(1).decode('utf-8'))
self.assertEqual(output, f.read())
# Request again so that the application can force stderr.flush(),
# otherwise the log is buffered and the next readline() will hang.
urlopen(m.group(1).decode('utf-8'))
stdoutdata = p.stderr.readline()
# we should have logged an HTTP request, return code 200, that
# returned the right amount of bytes
status = '"GET / HTTP/1.1" 200 %d' % len(output)
self.assertIn(status.encode('utf-8'), stdoutdata)
def _check_wsgi_install_content(self, install_stdout):
for cmd_name in self.cmd_names:
install_txt = 'Installing %s script to %s' % (
cmd_name,
self.temp_dir,
)
self.assertIn(install_txt, install_stdout)
cmd_filename = os.path.join(self.temp_dir, 'bin', cmd_name)
script_txt = open(cmd_filename, 'r').read()
self.assertNotIn('pkg_resources', script_txt)
main_block = """if __name__ == "__main__":
import argparse
import socket
import sys
import wsgiref.simple_server as wss"""
if cmd_name == 'pbr_test_wsgi':
app_name = "main"
else:
app_name = "WSGI.app"
starting_block = (
"STARTING test server pbr_testpackage.wsgi.%s" % app_name
)
else_block = """else:
application = None"""
self.assertIn(main_block, script_txt)
self.assertIn(starting_block, script_txt)
self.assertIn(else_block, script_txt)
def test_with_argument(self):
if os.name == 'nt':
self.skipTest('Windows support is passthrough')
stdout, _, return_code = self.run_setup(
'install', '--prefix=%s' % self.temp_dir
)
self._test_wsgi('pbr_test_wsgi', b'Foo Bar', ["--", "-c", "Foo Bar"])
|