# -*- coding: utf-8 -*-
''' Tests for the BaseRequest and BaseResponse objects and their subclasses. '''

import unittest
import sys

import bottle
from bottle import request, tob, touni, tonat, json_dumps, _e, HTTPError, parse_date
import tools
import wsgiref.util
import base64

from bottle import BaseRequest, BaseResponse, LocalRequest


try:
    from itertools import product
except ImportError:
    def product(*args):
        pools = map(tuple, args)
        result = [[]]
        for pool in pools:
            result = [x + [y] for x in result for y in pool]
        for prod in result:
            yield tuple(prod)

class TestRequest(unittest.TestCase):

    def test_app_property(self):
        e = {}
        r = BaseRequest(e)
        self.assertRaises(RuntimeError, lambda: r.app)
        e.update({'bottle.app': 5})
        self.assertEqual(r.app, 5)

    def test_route_property(self):
        e = {'bottle.route': 5}
        r = BaseRequest(e)
        self.assertEqual(r.route, 5)

    def test_url_for_property(self):
        e = {}
        r = BaseRequest(e)
        self.assertRaises(RuntimeError, lambda: r.url_args)
        e.update({'route.url_args': {'a': 5}})
        self.assertEqual(r.url_args, {'a': 5})

    def test_path(self):
        """ PATH_INFO normalization. """
        # Legal paths
        tests = [('', '/'), ('x','/x'), ('x/', '/x/'), ('/x', '/x'), ('/x/', '/x/')]
        for raw, norm in tests:
            self.assertEqual(norm, BaseRequest({'PATH_INFO': raw}).path)
        # Strange paths
        tests = [('///', '/'), ('//x','/x')]
        for raw, norm in tests:
            self.assertEqual(norm, BaseRequest({'PATH_INFO': raw}).path)
        # No path at all
        self.assertEqual('/', BaseRequest({}).path)

    def test_method(self):
        self.assertEqual(BaseRequest({}).method, 'GET')
        self.assertEqual(BaseRequest({'REQUEST_METHOD':'GET'}).method, 'GET')
        self.assertEqual(BaseRequest({'REQUEST_METHOD':'GeT'}).method, 'GET')
        self.assertEqual(BaseRequest({'REQUEST_METHOD':'get'}).method, 'GET')
        self.assertEqual(BaseRequest({'REQUEST_METHOD':'POst'}).method, 'POST')
        self.assertEqual(BaseRequest({'REQUEST_METHOD':'FanTASY'}).method, 'FANTASY')

    def test_script_name(self):
        """ SCRIPT_NAME normalization. """
        # Legal paths
        tests = [('', '/'), ('x','/x/'), ('x/', '/x/'), ('/x', '/x/'), ('/x/', '/x/')]
        for raw, norm in tests:
            self.assertEqual(norm, BaseRequest({'SCRIPT_NAME': raw}).script_name)
        # Strange paths
        tests = [('///', '/'), ('///x///','/x/')]
        for raw, norm in tests:
            self.assertEqual(norm, BaseRequest({'SCRIPT_NAME': raw}).script_name)
        # No path at all
        self.assertEqual('/', BaseRequest({}).script_name)

    def test_pathshift(self):
        """ Request.path_shift() """
        def test_shift(s, p, c):
            request = BaseRequest({'SCRIPT_NAME': s, 'PATH_INFO': p})
            request.path_shift(c)
            return [request['SCRIPT_NAME'], request.path]
        self.assertEqual(['/a/b', '/c/d'], test_shift('/a/b', '/c/d', 0))
        self.assertEqual(['/a/b', '/c/d/'], test_shift('/a/b', '/c/d/', 0))
        self.assertEqual(['/a/b/c', '/d'], test_shift('/a/b', '/c/d', 1))
        self.assertEqual(['/a', '/b/c/d'], test_shift('/a/b', '/c/d', -1))
        self.assertEqual(['/a/b/c', '/d/'], test_shift('/a/b', '/c/d/', 1))
        self.assertEqual(['/a', '/b/c/d/'], test_shift('/a/b', '/c/d/', -1))
        self.assertEqual(['/a/b/c', '/d/'], test_shift('/a/b/', '/c/d/', 1))
        self.assertEqual(['/a', '/b/c/d/'], test_shift('/a/b/', '/c/d/', -1))
        self.assertEqual(['/a/b/c/d', '/'], test_shift('/', '/a/b/c/d', 4))
        self.assertEqual(['/', '/a/b/c/d/'], test_shift('/a/b/c/d', '/', -4))
        self.assertRaises(AssertionError, test_shift, '/a/b', '/c/d', 3)
        self.assertRaises(AssertionError, test_shift, '/a/b', '/c/d', -3)

    def test_url(self):
        """ Environ: URL building """
        request = BaseRequest({'HTTP_HOST':'example.com'})
        self.assertEqual('http://example.com/', request.url)
        request = BaseRequest({'SERVER_NAME':'example.com'})
        self.assertEqual('http://example.com/', request.url)
        request = BaseRequest({'SERVER_NAME':'example.com', 'SERVER_PORT':'81'})
        self.assertEqual('http://example.com:81/', request.url)
        request = BaseRequest({'wsgi.url_scheme':'https', 'SERVER_NAME':'example.com'})
        self.assertEqual('https://example.com/', request.url)
        request = BaseRequest({'HTTP_HOST':'example.com', 'PATH_INFO':'/path',
                               'QUERY_STRING':'1=b&c=d', 'SCRIPT_NAME':'/sp'})
        self.assertEqual('http://example.com/sp/path?1=b&c=d', request.url)
        request = BaseRequest({'HTTP_HOST':'example.com', 'PATH_INFO':'/pa th',
                               'SCRIPT_NAME':'/s p'})
        self.assertEqual('http://example.com/s%20p/pa%20th', request.url)

    def test_dict_access(self):
        """ Environ: request objects are environment dicts """
        e = {}
        wsgiref.util.setup_testing_defaults(e)
        request = BaseRequest(e)
        self.assertEqual(list(request), list(e.keys()))
        self.assertEqual(len(request), len(e))
        for k, v in e.items():
            self.assertTrue(k in request)
            self.assertEqual(request[k], v)
            request[k] = 'test'
            self.assertEqual(request[k], 'test')
        del request['PATH_INFO']
        self.assertTrue('PATH_INFO' not in request)

    def test_readonly_environ(self):
        request = BaseRequest({'bottle.request.readonly':True})
        def test(): request['x']='y'
        self.assertRaises(KeyError, test)

    def test_header_access(self):
        """ Environ: Request objects decode headers """
        e = {}
        wsgiref.util.setup_testing_defaults(e)
        e['HTTP_SOME_HEADER'] = 'some value'
        request = BaseRequest(e)
        request['HTTP_SOME_OTHER_HEADER'] = 'some other value'
        self.assertTrue('Some-Header' in request.headers)
        self.assertTrue(request.headers['Some-Header'] == 'some value')
        self.assertTrue(request.headers['Some-Other-Header'] == 'some other value')

    def test_header_access_special(self):
        e = {}
        wsgiref.util.setup_testing_defaults(e)
        request = BaseRequest(e)
        request['CONTENT_TYPE'] = 'test'
        request['CONTENT_LENGTH'] = '123'
        self.assertEqual(request.headers['Content-Type'], 'test')
        self.assertEqual(request.headers['Content-Length'], '123')

    def test_cookie_dict(self):
        """ Environ: Cookie dict """
        t = dict()
        t['a=a']      = {'a': 'a'}
        t['a=a; b=b'] = {'a': 'a', 'b':'b'}
        t['a=a; a=b'] = {'a': 'b'}
        for k, v in t.items():
            request = BaseRequest({'HTTP_COOKIE': k})
            for n in v:
                self.assertEqual(v[n], request.cookies[n])
                self.assertEqual(v[n], request.get_cookie(n))

    def test_get(self):
        """ Environ: GET data """
        qs = tonat(tob('a=a&a=1&b=b&c=c&cn=%e7%93%b6'), 'latin1')
        request = BaseRequest({'QUERY_STRING':qs})
        self.assertTrue('a' in request.query)
        self.assertTrue('b' in request.query)
        self.assertEqual(['a','1'], request.query.getall('a'))
        self.assertEqual(['b'], request.query.getall('b'))
        self.assertEqual('1', request.query['a'])
        self.assertEqual('b', request.query['b'])
        self.assertEqual(tonat(tob('瓶'), 'latin1'), request.query['cn'])
        self.assertEqual(touni('瓶'), request.query.cn)

    def test_post(self):
        """ Environ: POST data """
        sq = tob('a=a&a=1&b=b&c=&d&cn=%e7%93%b6')
        e = {}
        wsgiref.util.setup_testing_defaults(e)
        e['wsgi.input'].write(sq)
        e['wsgi.input'].seek(0)
        e['CONTENT_LENGTH'] = str(len(sq))
        e['REQUEST_METHOD'] = "POST"
        request = BaseRequest(e)
        self.assertTrue('a' in request.POST)
        self.assertTrue('b' in request.POST)
        self.assertEqual(['a','1'], request.POST.getall('a'))
        self.assertEqual(['b'], request.POST.getall('b'))
        self.assertEqual('1', request.POST['a'])
        self.assertEqual('b', request.POST['b'])
        self.assertEqual('', request.POST['c'])
        self.assertEqual('', request.POST['d'])
        self.assertEqual(tonat(tob('瓶'), 'latin1'), request.POST['cn'])
        self.assertEqual(touni('瓶'), request.POST.cn)

    def test_bodypost(self):
        sq = tob('foobar')
        e = {}
        wsgiref.util.setup_testing_defaults(e)
        e['wsgi.input'].write(sq)
        e['wsgi.input'].seek(0)
        e['CONTENT_LENGTH'] = str(len(sq))
        e['REQUEST_METHOD'] = "POST"
        request = BaseRequest(e)
        self.assertEqual('', request.POST['foobar'])

    def test_body_noclose(self):
        """ Test that the body file handler is not closed after request.POST """
        sq = tob('a=a&a=1&b=b&c=&d')
        e = {}
        wsgiref.util.setup_testing_defaults(e)
        e['wsgi.input'].write(sq)
        e['wsgi.input'].seek(0)
        e['CONTENT_LENGTH'] = str(len(sq))
        e['REQUEST_METHOD'] = "POST"
        request = BaseRequest(e)
        self.assertEqual(sq, request.body.read())
        request.POST # This caused a body.close() with Python 3.x
        self.assertEqual(sq, request.body.read())

    def test_params(self):
        """ Environ: GET and POST are combined in request.param """
        e = {}
        wsgiref.util.setup_testing_defaults(e)
        e['wsgi.input'].write(tob('b=b&c=p'))
        e['wsgi.input'].seek(0)
        e['CONTENT_LENGTH'] = '7'
        e['QUERY_STRING'] = 'a=a&c=g'
        e['REQUEST_METHOD'] = "POST"
        request = BaseRequest(e)
        self.assertEqual(['a','b','c'], sorted(request.params.keys()))
        self.assertEqual('p', request.params['c'])

    def test_getpostleak(self):
        """ Environ: GET and POST should not leak into each other """
        e = {}
        wsgiref.util.setup_testing_defaults(e)
        e['wsgi.input'].write(tob('b=b'))
        e['wsgi.input'].seek(0)
        e['CONTENT_LENGTH'] = '3'
        e['QUERY_STRING'] = 'a=a'
        e['REQUEST_METHOD'] = "POST"
        request = BaseRequest(e)
        self.assertEqual(['a'], list(request.GET.keys()))
        self.assertEqual(['b'], list(request.POST.keys()))

    def test_body(self):
        """ Environ: Request.body should behave like a file object factory """
        e = {}
        wsgiref.util.setup_testing_defaults(e)
        e['wsgi.input'].write(tob('abc'))
        e['wsgi.input'].seek(0)
        e['CONTENT_LENGTH'] = str(3)
        request = BaseRequest(e)
        self.assertEqual(tob('abc'), request.body.read())
        self.assertEqual(tob('abc'), request.body.read(3))
        self.assertEqual(tob('abc'), request.body.readline())
        self.assertEqual(tob('abc'), request.body.readline(3))

    def test_bigbody(self):
        """ Environ: Request.body should handle big uploads using files """
        e = {}
        wsgiref.util.setup_testing_defaults(e)
        e['wsgi.input'].write(tob('x')*1024*1000)
        e['wsgi.input'].seek(0)
        e['CONTENT_LENGTH'] = str(1024*1000)
        request = BaseRequest(e)
        self.assertTrue(hasattr(request.body, 'fileno'))
        self.assertEqual(1024*1000, len(request.body.read()))
        self.assertEqual(1024, len(request.body.read(1024)))
        self.assertEqual(1024*1000, len(request.body.readline()))
        self.assertEqual(1024, len(request.body.readline(1024)))

    def test_tobigbody(self):
        """ Environ: Request.body should truncate to Content-Length bytes """
        e = {}
        wsgiref.util.setup_testing_defaults(e)
        e['wsgi.input'].write(tob('x')*1024)
        e['wsgi.input'].seek(0)
        e['CONTENT_LENGTH'] = '42'
        request = BaseRequest(e)
        self.assertEqual(42, len(request.body.read()))
        self.assertEqual(42, len(request.body.read(1024)))
        self.assertEqual(42, len(request.body.readline()))
        self.assertEqual(42, len(request.body.readline(1024)))

    def _test_chunked(self, body, expect):
        e = {}
        wsgiref.util.setup_testing_defaults(e)
        e['wsgi.input'].write(tob(body))
        e['wsgi.input'].seek(0)
        e['HTTP_TRANSFER_ENCODING'] = 'chunked'
        if isinstance(expect, str):
            self.assertEquals(tob(expect), BaseRequest(e).body.read())
        else:
            self.assertRaises(expect, lambda: BaseRequest(e).body)

    def test_chunked(self):
        self._test_chunked('1\r\nx\r\nff\r\n' + 'y'*255 + '\r\n0\r\n',
                           'x' + 'y'*255)
        self._test_chunked('8\r\nxxxxxxxx\r\n0\r\n','xxxxxxxx')
        self._test_chunked('0\r\n', '')

    def test_chunked_meta_fields(self):
        self._test_chunked('8 ; foo\r\nxxxxxxxx\r\n0\r\n','xxxxxxxx')
        self._test_chunked('8;foo\r\nxxxxxxxx\r\n0\r\n','xxxxxxxx')
        self._test_chunked('8;foo=bar\r\nxxxxxxxx\r\n0\r\n','xxxxxxxx')

    def test_chunked_not_terminated(self):
        self._test_chunked('1\r\nx\r\n', HTTPError)

    def test_chunked_wrong_size(self):
        self._test_chunked('2\r\nx\r\n', HTTPError)

    def test_chunked_illegal_size(self):
        self._test_chunked('x\r\nx\r\n', HTTPError)

    def test_chunked_not_chunked_at_all(self):
        self._test_chunked('abcdef', HTTPError)

    def test_multipart(self):
        """ Environ: POST (multipart files and multible values per key) """
        fields = [('field1','value1'), ('field2','value2'), ('field2','value3')]
        files = [('file1','filename1.txt','content1'), ('万难','万难foo.py', 'ä\nö\rü')]
        e = tools.multipart_environ(fields=fields, files=files)
        request = BaseRequest(e)
        # File content
        self.assertTrue('file1' in request.POST)
        self.assertTrue('file1' in request.files)
        self.assertTrue('file1' not in request.forms)
        cmp = tob('content1') if sys.version_info >= (3,2,0) else 'content1'
        self.assertEqual(cmp, request.POST['file1'].file.read())
        # File name and meta data
        self.assertTrue('万难' in request.POST)
        self.assertTrue('万难' in request.files)
        self.assertTrue('万难' not in request.forms)
        self.assertEqual('foo.py', request.POST['万难'].filename)
        self.assertTrue(request.files['万难'])
        self.assertFalse(request.files.file77)
        # UTF-8 files
        x = request.POST['万难'].file.read()
        if (3,2,0) > sys.version_info >= (3,0,0):
            x = x.encode('utf8')
        self.assertEqual(tob('ä\nö\rü'), x)
        # No file
        self.assertTrue('file3' not in request.POST)
        self.assertTrue('file3' not in request.files)
        self.assertTrue('file3' not in request.forms)
        # Field (single)
        self.assertEqual('value1', request.POST['field1'])
        self.assertTrue('field1' not in request.files)
        self.assertEqual('value1', request.forms['field1'])
        # Field (multi)
        self.assertEqual(2, len(request.POST.getall('field2')))
        self.assertEqual(['value2', 'value3'], request.POST.getall('field2'))
        self.assertEqual(['value2', 'value3'], request.forms.getall('field2'))
        self.assertTrue('field2' not in request.files)

    def test_json_empty(self):
        """ Environ: Request.json property with empty body. """
        self.assertEqual(BaseRequest({}).json, None)

    def test_json_noheader(self):
        """ Environ: Request.json property with missing content-type header. """
        test = dict(a=5, b='test', c=[1,2,3])
        e = {}
        wsgiref.util.setup_testing_defaults(e)
        e['wsgi.input'].write(tob(json_dumps(test)))
        e['wsgi.input'].seek(0)
        e['CONTENT_LENGTH'] = str(len(json_dumps(test)))
        self.assertEqual(BaseRequest(e).json, None)

    def test_json_tobig(self):
        """ Environ: Request.json property with huge body. """
        test = dict(a=5, tobig='x' * bottle.BaseRequest.MEMFILE_MAX)
        e = {'CONTENT_TYPE': 'application/json'}
        wsgiref.util.setup_testing_defaults(e)
        e['wsgi.input'].write(tob(json_dumps(test)))
        e['wsgi.input'].seek(0)
        e['CONTENT_LENGTH'] = str(len(json_dumps(test)))
        self.assertRaises(HTTPError, lambda: BaseRequest(e).json)

    def test_json_valid(self):
        """ Environ: Request.json property. """
        test = dict(a=5, b='test', c=[1,2,3])
        e = {'CONTENT_TYPE': 'application/json; charset=UTF-8'}
        wsgiref.util.setup_testing_defaults(e)
        e['wsgi.input'].write(tob(json_dumps(test)))
        e['wsgi.input'].seek(0)
        e['CONTENT_LENGTH'] = str(len(json_dumps(test)))
        self.assertEqual(BaseRequest(e).json, test)

    def test_json_forged_header_issue616(self):
        test = dict(a=5, b='test', c=[1,2,3])
        e = {'CONTENT_TYPE': 'text/plain;application/json'}
        wsgiref.util.setup_testing_defaults(e)
        e['wsgi.input'].write(tob(json_dumps(test)))
        e['wsgi.input'].seek(0)
        e['CONTENT_LENGTH'] = str(len(json_dumps(test)))
        self.assertEqual(BaseRequest(e).json, None)

    def test_json_header_empty_body(self):
        """Request Content-Type is application/json but body is empty"""
        e = {'CONTENT_TYPE': 'application/json'}
        wsgiref.util.setup_testing_defaults(e)
        wsgiref.util.setup_testing_defaults(e)
        e['CONTENT_LENGTH'] = "0"
        self.assertEqual(BaseRequest(e).json, None)

    def test_isajax(self):
        e = {}
        wsgiref.util.setup_testing_defaults(e)
        self.assertFalse(BaseRequest(e.copy()).is_ajax)
        e['HTTP_X_REQUESTED_WITH'] = 'XMLHttpRequest'
        self.assertTrue(BaseRequest(e.copy()).is_ajax)

    def test_auth(self):
        user, pwd = 'marc', 'secret'
        basic = touni(base64.b64encode(tob('%s:%s' % (user, pwd))))
        r = BaseRequest({})
        self.assertEqual(r.auth, None)
        r.environ['HTTP_AUTHORIZATION'] = 'basic %s' % basic
        self.assertEqual(r.auth, (user, pwd))
        r.environ['REMOTE_USER'] = user
        self.assertEqual(r.auth, (user, pwd))
        del r.environ['HTTP_AUTHORIZATION']
        self.assertEqual(r.auth, (user, None))

    def test_remote_route(self):
        ips = ['1.2.3.4', '2.3.4.5', '3.4.5.6']
        r = BaseRequest({})
        self.assertEqual(r.remote_route, [])
        r.environ['HTTP_X_FORWARDED_FOR'] = ', '.join(ips)
        self.assertEqual(r.remote_route, ips)
        r.environ['REMOTE_ADDR'] = ips[1]
        self.assertEqual(r.remote_route, ips)
        del r.environ['HTTP_X_FORWARDED_FOR']
        self.assertEqual(r.remote_route, [ips[1]])

    def test_remote_addr(self):
        ips = ['1.2.3.4', '2.3.4.5', '3.4.5.6']
        r = BaseRequest({})
        self.assertEqual(r.remote_addr, None)
        r.environ['HTTP_X_FORWARDED_FOR'] = ', '.join(ips)
        self.assertEqual(r.remote_addr, ips[0])
        r.environ['REMOTE_ADDR'] = ips[1]
        self.assertEqual(r.remote_addr, ips[0])
        del r.environ['HTTP_X_FORWARDED_FOR']
        self.assertEqual(r.remote_addr, ips[1])

    def test_user_defined_attributes(self):
        for cls in (BaseRequest, LocalRequest):
            r = cls()

            # New attributes go to the environ dict.
            r.foo = 'somevalue'
            self.assertEqual(r.foo, 'somevalue')
            self.assertTrue('somevalue' in r.environ.values())

            # Unknown attributes raise AttributeError.
            self.assertRaises(AttributeError, getattr, r, 'somevalue')



class TestResponse(unittest.TestCase):

    def test_constructor_body(self):
        self.assertEqual('',
            BaseResponse('').body)

        self.assertEqual('YAY',
            BaseResponse('YAY').body)

    def test_constructor_status(self):
        self.assertEqual(200,
            BaseResponse('YAY', 200).status_code)

        self.assertEqual('200 OK',
            BaseResponse('YAY', 200).status_line)

        self.assertEqual('200 YAY',
            BaseResponse('YAY', '200 YAY').status_line)

        self.assertEqual('200 YAY',
            BaseResponse('YAY', '200 YAY').status_line)

    def test_constructor_headerlist(self):
        from functools import partial
        make_res = partial(BaseResponse, '', 200)

        self.assertTrue('yay',
            make_res([('x-test','yay')])['x-test'])

    def test_constructor_headerlist(self):
        from functools import partial
        make_res = partial(BaseResponse, '', 200)

        self.assertTrue('yay', make_res(x_test='yay')['x-test'])


    def test_set_status(self):
        rs = BaseResponse()

        rs.status = 200
        self.assertEqual(rs.status, rs.status_line)
        self.assertEqual(rs.status_code, 200)
        self.assertEqual(rs.status_line, '200 OK')

        rs.status = 999
        self.assertEqual(rs.status, rs.status_line)
        self.assertEqual(rs.status_code, 999)
        self.assertEqual(rs.status_line, '999 Unknown')

        rs.status = 404
        self.assertEqual(rs.status, rs.status_line)
        self.assertEqual(rs.status_code, 404)
        self.assertEqual(rs.status_line, '404 Not Found')

        def test(): rs.status = -200
        self.assertRaises(ValueError, test)
        self.assertEqual(rs.status, rs.status_line) # last value
        self.assertEqual(rs.status_code, 404) # last value
        self.assertEqual(rs.status_line, '404 Not Found') # last value

        def test(): rs.status = 5
        self.assertRaises(ValueError, test)
        self.assertEqual(rs.status, rs.status_line) # last value
        self.assertEqual(rs.status_code, 404) # last value
        self.assertEqual(rs.status_line, '404 Not Found') # last value

        rs.status = '999 Who knows?' # Illegal, but acceptable three digit code
        self.assertEqual(rs.status, rs.status_line)
        self.assertEqual(rs.status_code, 999)
        self.assertEqual(rs.status_line, '999 Who knows?')

        rs.status = 555 # Strange code
        self.assertEqual(rs.status, rs.status_line)
        self.assertEqual(rs.status_code, 555)
        self.assertEqual(rs.status_line, '555 Unknown')

        rs.status = '404 Brain not Found' # Custom reason
        self.assertEqual(rs.status, rs.status_line)
        self.assertEqual(rs.status_code, 404)
        self.assertEqual(rs.status_line, '404 Brain not Found')

        def test(): rs.status = '5 Illegal Code'
        self.assertRaises(ValueError, test)
        self.assertEqual(rs.status, rs.status_line) # last value
        self.assertEqual(rs.status_code, 404) # last value
        self.assertEqual(rs.status_line, '404 Brain not Found') # last value

        def test(): rs.status = '-99 Illegal Code'
        self.assertRaises(ValueError, test)
        self.assertEqual(rs.status, rs.status_line) # last value
        self.assertEqual(rs.status_code, 404) # last value
        self.assertEqual(rs.status_line, '404 Brain not Found') # last value

        def test(): rs.status = '1000 Illegal Code'
        self.assertRaises(ValueError, test)
        self.assertEqual(rs.status, rs.status_line) # last value
        self.assertEqual(rs.status_code, 404) # last value
        self.assertEqual(rs.status_line, '404 Brain not Found') # last value

        def test(): rs.status = '555' # No reason
        self.assertRaises(ValueError, test)
        self.assertEqual(rs.status, rs.status_line) # last value
        self.assertEqual(rs.status_code, 404) # last value
        self.assertEqual(rs.status_line, '404 Brain not Found') # last value

    def test_content_type(self):
        rs = BaseResponse()
        rs.content_type = 'test/some'
        self.assertEquals('test/some', rs.headers.get('Content-Type'))

    def test_charset(self):
        rs = BaseResponse()
        self.assertEqual(rs.charset, 'UTF-8')
        rs.content_type = 'text/html; charset=latin9'
        self.assertEqual(rs.charset, 'latin9')
        rs.content_type = 'text/html'
        self.assertEqual(rs.charset, 'UTF-8')

    def test_set_cookie(self):
        r = BaseResponse()
        r.set_cookie('name1', 'value', max_age=5)
        r.set_cookie('name2', 'value 2', path='/foo')
        cookies = [value for name, value in r.headerlist
                   if name.title() == 'Set-Cookie']
        cookies.sort()
        self.assertEqual(cookies[0], 'name1=value; Max-Age=5')
        self.assertEqual(cookies[1], 'name2="value 2"; Path=/foo')

    def test_set_cookie_maxage(self):
        import datetime
        r = BaseResponse()
        r.set_cookie('name1', 'value', max_age=5)
        r.set_cookie('name2', 'value', max_age=datetime.timedelta(days=1))
        cookies = sorted([value for name, value in r.headerlist
                   if name.title() == 'Set-Cookie'])
        self.assertEqual(cookies[0], 'name1=value; Max-Age=5')
        self.assertEqual(cookies[1], 'name2=value; Max-Age=86400')

    def test_set_cookie_expires(self):
        import datetime
        r = BaseResponse()
        r.set_cookie('name1', 'value', expires=42)
        r.set_cookie('name2', 'value', expires=datetime.datetime(1970,1,1,0,0,43))
        cookies = sorted([value for name, value in r.headerlist
                   if name.title() == 'Set-Cookie'])
        self.assertEqual(cookies[0], 'name1=value; expires=Thu, 01 Jan 1970 00:00:42 GMT')
        self.assertEqual(cookies[1], 'name2=value; expires=Thu, 01 Jan 1970 00:00:43 GMT')

    @unittest.skip("broken")
    def test_delete_cookie(self):
        response = BaseResponse()
        response.set_cookie('name', 'value')
        response.delete_cookie('name')
        cookies = [value for name, value in response.headerlist
                   if name.title() == 'Set-Cookie']
        self.assertTrue('name=;' in cookies[0])

    def test_set_header(self):
        response = BaseResponse()
        response['x-test'] = 'foo'
        headers = [value for name, value in response.headerlist
                   if name.title() == 'X-Test']
        self.assertEqual(['foo'], headers)
        self.assertEqual('foo', response['x-test'])

        response['X-Test'] = 'bar'
        headers = [value for name, value in response.headerlist
                   if name.title() == 'X-Test']
        self.assertEqual(['bar'], headers)
        self.assertEqual('bar', response['x-test'])

    def test_append_header(self):
        response = BaseResponse()
        response.set_header('x-test', 'foo')
        headers = [value for name, value in response.headerlist
                   if name.title() == 'X-Test']
        self.assertEqual(['foo'], headers)
        self.assertEqual('foo', response['x-test'])

        response.add_header('X-Test', 'bar')
        headers = [value for name, value in response.headerlist
                   if name.title() == 'X-Test']
        self.assertEqual(['foo', 'bar'], headers)
        self.assertEqual('bar', response['x-test'])

    def test_delete_header(self):
        response = BaseResponse()
        response['x-test'] = 'foo'
        self.assertEqual('foo', response['x-test'])
        del response['X-tESt']
        self.assertRaises(KeyError, lambda: response['x-test'])

    def test_non_string_header(self):
        response = BaseResponse()
        response['x-test'] = 5
        self.assertEqual('5', response['x-test'])
        response['x-test'] = None
        self.assertEqual('None', response['x-test'])
        response['x-test'] = touni('瓶')
        self.assertEqual(tonat(touni('瓶')), response['x-test'])

    def test_prevent_control_characters_in_headers(self):
        masks = '{}test', 'test{}', 'te{}st'
        tests = '\n', '\r', '\n\r', '\0'

        # Test HeaderDict
        apis = 'append', 'replace', '__setitem__', 'setdefault'
        for api, mask, test in product(apis, masks, tests):
            hd = bottle.HeaderDict()
            func = getattr(hd, api)
            value = mask.replace("{}", test)
            self.assertRaises(ValueError, func, value, "test-value")
            self.assertRaises(ValueError, func, "test-name", value)

        # Test functions on BaseResponse
        apis = 'add_header', 'set_header', '__setitem__'
        for api, mask, test in product(apis, masks, tests):
            rs = bottle.BaseResponse()
            func = getattr(rs, api)
            value = mask.replace("{}", test)
            self.assertRaises(ValueError, func, value, "test-value")
            self.assertRaises(ValueError, func, "test-name", value)

    def test_expires_header(self):
        import datetime
        response = BaseResponse()
        now = datetime.datetime.now()
        response.expires = now

        def seconds(a, b):
            td = max(a,b) - min(a,b)
            return td.days*360*24 + td.seconds

        self.assertEqual(0, seconds(response.expires, now))
        now2 = datetime.datetime.utcfromtimestamp(
            parse_date(response.headers['Expires']))
        self.assertEqual(0, seconds(now, now2))

class TestRedirect(unittest.TestCase):

    def assertRedirect(self, target, result, query=None, status=303, **args):
        env = {'SERVER_PROTOCOL':'HTTP/1.1'}
        for key in list(args):
            if key.startswith('wsgi'):
                args[key.replace('_', '.', 1)] = args[key]
                del args[key]
        env.update(args)
        request.bind(env)
        bottle.response.bind()
        try:
            bottle.redirect(target, **(query or {}))
        except bottle.HTTPResponse:
            r = _e()
            self.assertEqual(status, r.status_code)
            self.assertTrue(r.headers)
            self.assertEqual(result, r.headers['Location'])

    def test_absolute_path(self):
        self.assertRedirect('/', 'http://127.0.0.1/')
        self.assertRedirect('/test.html', 'http://127.0.0.1/test.html')
        self.assertRedirect('/test.html', 'http://127.0.0.1/test.html',
                            PATH_INFO='/some/sub/path/')
        self.assertRedirect('/test.html', 'http://127.0.0.1/test.html',
                            PATH_INFO='/some/sub/file.html')
        self.assertRedirect('/test.html', 'http://127.0.0.1/test.html',
                            SCRIPT_NAME='/some/sub/path/')
        self.assertRedirect('/foo/test.html', 'http://127.0.0.1/foo/test.html')
        self.assertRedirect('/foo/test.html', 'http://127.0.0.1/foo/test.html',
                            PATH_INFO='/some/sub/file.html')

    def test_relative_path(self):
        self.assertRedirect('./', 'http://127.0.0.1/')
        self.assertRedirect('./test.html', 'http://127.0.0.1/test.html')
        self.assertRedirect('./test.html', 'http://127.0.0.1/foo/test.html',
                            PATH_INFO='/foo/')
        self.assertRedirect('./test.html', 'http://127.0.0.1/foo/test.html',
                            PATH_INFO='/foo/bar.html')
        self.assertRedirect('./test.html', 'http://127.0.0.1/foo/test.html',
                            SCRIPT_NAME='/foo/')
        self.assertRedirect('./test.html', 'http://127.0.0.1/foo/bar/test.html',
                            SCRIPT_NAME='/foo/', PATH_INFO='/bar/baz.html')
        self.assertRedirect('./foo/test.html', 'http://127.0.0.1/foo/test.html')
        self.assertRedirect('./foo/test.html', 'http://127.0.0.1/bar/foo/test.html',
                            PATH_INFO='/bar/file.html')
        self.assertRedirect('../test.html', 'http://127.0.0.1/test.html',
                            PATH_INFO='/foo/')
        self.assertRedirect('../test.html', 'http://127.0.0.1/foo/test.html',
                            PATH_INFO='/foo/bar/')
        self.assertRedirect('../test.html', 'http://127.0.0.1/test.html',
                            PATH_INFO='/foo/bar.html')
        self.assertRedirect('../test.html', 'http://127.0.0.1/test.html',
                            SCRIPT_NAME='/foo/')
        self.assertRedirect('../test.html', 'http://127.0.0.1/foo/test.html',
                            SCRIPT_NAME='/foo/', PATH_INFO='/bar/baz.html')
        self.assertRedirect('../baz/../test.html', 'http://127.0.0.1/foo/test.html',
                            PATH_INFO='/foo/bar/')

    def test_sheme(self):
        self.assertRedirect('./test.html', 'https://127.0.0.1/test.html',
                            wsgi_url_scheme='https')
        self.assertRedirect('./test.html', 'https://127.0.0.1:80/test.html',
                            wsgi_url_scheme='https', SERVER_PORT='80')

    def test_host_http_1_0(self):
        # No HTTP_HOST, just SERVER_NAME and SERVER_PORT.
        self.assertRedirect('./test.html', 'http://example.com/test.html',
                            SERVER_NAME='example.com',
                            SERVER_PROTOCOL='HTTP/1.0', status=302)
        self.assertRedirect('./test.html', 'http://127.0.0.1:81/test.html',
                            SERVER_PORT='81',
                            SERVER_PROTOCOL='HTTP/1.0', status=302)

    def test_host_http_1_1(self):
        self.assertRedirect('./test.html', 'http://example.com/test.html',
                            HTTP_HOST='example.com')
        self.assertRedirect('./test.html', 'http://example.com:81/test.html',
                            HTTP_HOST='example.com:81')
        # Trust HTTP_HOST over SERVER_NAME and PORT.
        self.assertRedirect('./test.html', 'http://example.com:81/test.html',
                            HTTP_HOST='example.com:81', SERVER_NAME='foobar')
        self.assertRedirect('./test.html', 'http://example.com:81/test.html',
                            HTTP_HOST='example.com:81', SERVER_PORT='80')

    def test_host_http_proxy(self):
        # Trust proxy headers over original header.
        self.assertRedirect('./test.html', 'http://example.com/test.html',
                            HTTP_X_FORWARDED_HOST='example.com',
                            HTTP_HOST='127.0.0.1')

    def test_specialchars(self):
        ''' The target URL is not quoted automatically. '''
        self.assertRedirect('./te st.html',
                            'http://example.com/a%20a/b%20b/te st.html',
                            HTTP_HOST='example.com', SCRIPT_NAME='/a a/', PATH_INFO='/b b/')

    def test_redirect_preserve_cookies(self):
        env = {'SERVER_PROTOCOL':'HTTP/1.1'}
        request.bind(env)
        bottle.response.bind()
        try:
            bottle.response.set_cookie('xxx', 'yyy')
            bottle.redirect('...')
        except bottle.HTTPResponse:
            h = [v for (k, v) in _e().headerlist if k == 'Set-Cookie']
            self.assertEqual(h, ['xxx=yyy'])

class TestWSGIHeaderDict(unittest.TestCase):
    def setUp(self):
        self.env = {}
        self.headers = bottle.WSGIHeaderDict(self.env)

    def test_empty(self):
        self.assertEqual(0, len(bottle.WSGIHeaderDict({})))

    def test_native(self):
        self.env['HTTP_TEST_HEADER'] = 'foobar'
        self.assertEqual(self.headers['Test-header'], 'foobar')

    def test_bytes(self):
        self.env['HTTP_TEST_HEADER'] = tob('foobar')
        self.assertEqual(self.headers['Test-Header'], 'foobar')

    def test_unicode(self):
        self.env['HTTP_TEST_HEADER'] = touni('foobar')
        self.assertEqual(self.headers['Test-Header'], 'foobar')

    def test_dict(self):
        for key in 'foo-bar Foo-Bar foo-Bar FOO-BAR'.split():
            self.assertTrue(key not in self.headers)
            self.assertEqual(self.headers.get(key), None)
            self.assertEqual(self.headers.get(key, 5), 5)
            self.assertRaises(KeyError, lambda x: self.headers[x], key)
        self.env['HTTP_FOO_BAR'] = 'test'
        for key in 'foo-bar Foo-Bar foo-Bar FOO-BAR'.split():
            self.assertTrue(key in self.headers)
            self.assertEqual(self.headers.get(key), 'test')
            self.assertEqual(self.headers.get(key, 5), 'test')



if __name__ == '__main__': #pragma: no cover
    unittest.main()
