from io import BytesIO
from os.path import getmtime
import tempfile
from time import gmtime
import os
import shutil
import unittest

from webob import static
from webob.compat import bytes_
from webob.request import Request, environ_from_url
from webob.response import Response


def get_response(app, path='/', **req_kw):
    """Convenient function to query an application"""
    req = Request(environ_from_url(path), **req_kw)
    return req.get_response(app)


def create_file(content, *paths):
    """Convenient function to create a new file with some content"""
    path = os.path.join(*paths)
    with open(path, 'wb') as fp:
        fp.write(bytes_(content))
    return path


class TestFileApp(unittest.TestCase):
    def setUp(self):
        fp = tempfile.NamedTemporaryFile(suffix=".py", delete=False)
        self.tempfile = fp.name
        fp.write(b"import this\n")
        fp.close()

    def tearDown(self):
        os.unlink(self.tempfile)

    def test_fileapp(self):
        app = static.FileApp(self.tempfile)
        resp1 = get_response(app)
        assert resp1.content_type in ('text/x-python', 'text/plain')
        self.assertEqual(resp1.charset, 'UTF-8')
        self.assertEqual(resp1.last_modified.timetuple(), gmtime(getmtime(self.tempfile)))
        self.assertEqual(resp1.body, b"import this\n")

        resp2 = get_response(app)
        assert resp2.content_type in ('text/x-python', 'text/plain')
        self.assertEqual(resp2.last_modified.timetuple(), gmtime(getmtime(self.tempfile)))
        self.assertEqual(resp2.body, b"import this\n")

        resp3 = get_response(app, range=(7, 11))
        self.assertEqual(resp3.status_code, 206)
        self.assertEqual(tuple(resp3.content_range)[:2], (7, 11))
        self.assertEqual(resp3.last_modified.timetuple(), gmtime(getmtime(self.tempfile)))
        self.assertEqual(resp3.body, bytes_('this'))

    def test_unexisting_file(self):
        app = static.FileApp('/tmp/this/doesnt/exist')
        self.assertEqual(404, get_response(app).status_code)

    def test_allowed_methods(self):
        app = static.FileApp(self.tempfile)

        # Alias
        resp = lambda method: get_response(app, method=method)

        self.assertEqual(200, resp(method='GET').status_code)
        self.assertEqual(200, resp(method='HEAD').status_code)
        self.assertEqual(405, resp(method='POST').status_code)
        # Actually any other method is not allowed
        self.assertEqual(405, resp(method='xxx').status_code)

    def test_exception_while_opening_file(self):
        # Mock the built-in ``open()`` function to allow finner control about
        # what we are testing.
        def open_ioerror(*args, **kwargs):
            raise IOError()

        def open_oserror(*args, **kwargs):
            raise OSError()

        app = static.FileApp(self.tempfile)

        app._open = open_ioerror
        self.assertEqual(403, get_response(app).status_code)

        app._open = open_oserror
        self.assertEqual(403, get_response(app).status_code)

    def test_use_wsgi_filewrapper(self):
        class TestWrapper(object):
            def __init__(self, file, block_size):
                self.file = file
                self.block_size = block_size

        environ = environ_from_url('/')
        environ['wsgi.file_wrapper'] = TestWrapper
        app = static.FileApp(self.tempfile)
        app_iter = Request(environ).get_response(app).app_iter

        self.assertTrue(isinstance(app_iter, TestWrapper))
        self.assertEqual(bytes_('import this\n'), app_iter.file.read())
        self.assertEqual(static.BLOCK_SIZE, app_iter.block_size)


class TestFileIter(unittest.TestCase):
    def test_empty_file(self):
        fp = BytesIO()
        fi = static.FileIter(fp)
        self.assertRaises(StopIteration, next, iter(fi))

    def test_seek(self):
        fp = BytesIO(bytes_("0123456789"))
        i = static.FileIter(fp).app_iter_range(seek=4)

        self.assertEqual(bytes_("456789"), next(i))
        self.assertRaises(StopIteration, next, i)

    def test_limit(self):
        fp = BytesIO(bytes_("0123456789"))
        i = static.FileIter(fp).app_iter_range(limit=4)

        self.assertEqual(bytes_("0123"), next(i))
        self.assertRaises(StopIteration, next, i)

    def test_limit_and_seek(self):
        fp = BytesIO(bytes_("0123456789"))
        i = static.FileIter(fp).app_iter_range(limit=4, seek=1)

        self.assertEqual(bytes_("123"), next(i))
        self.assertRaises(StopIteration, next, i)

    def test_multiple_reads(self):
        fp = BytesIO(bytes_("012"))
        i = static.FileIter(fp).app_iter_range(block_size=1)

        self.assertEqual(bytes_("0"), next(i))
        self.assertEqual(bytes_("1"), next(i))
        self.assertEqual(bytes_("2"), next(i))
        self.assertRaises(StopIteration, next, i)

    def test_seek_bigger_than_limit(self):
        fp = BytesIO(bytes_("0123456789"))
        i = static.FileIter(fp).app_iter_range(limit=1, seek=2)

        # XXX: this should not return anything actually, since we are starting
        # to read after the place we wanted to stop.
        self.assertEqual(bytes_("23456789"), next(i))
        self.assertRaises(StopIteration, next, i)

    def test_limit_is_zero(self):
        fp = BytesIO(bytes_("0123456789"))
        i = static.FileIter(fp).app_iter_range(limit=0)

        self.assertRaises(StopIteration, next, i)



class TestDirectoryApp(unittest.TestCase):
    def setUp(self):
        self.test_dir = tempfile.mkdtemp()

    def tearDown(self):
        shutil.rmtree(self.test_dir)

    def test_empty_directory(self):
        app = static.DirectoryApp(self.test_dir)
        self.assertEqual(404, get_response(app).status_code)
        self.assertEqual(404, get_response(app, '/foo').status_code)

    def test_serve_file(self):
        app = static.DirectoryApp(self.test_dir)
        create_file('abcde', self.test_dir, 'bar')
        self.assertEqual(404, get_response(app).status_code)
        self.assertEqual(404, get_response(app, '/foo').status_code)

        resp = get_response(app, '/bar')
        self.assertEqual(200, resp.status_code)
        self.assertEqual(bytes_('abcde'), resp.body)

    def test_dont_serve_file_in_parent_directory(self):
        # We'll have:
        #   /TEST_DIR/
        #   /TEST_DIR/bar
        #   /TEST_DIR/foo/   <- serve this directory
        create_file('abcde', self.test_dir, 'bar')
        serve_path = os.path.join(self.test_dir, 'foo')
        os.mkdir(serve_path)
        app = static.DirectoryApp(serve_path)

        # The file exists, but is outside the served dir.
        self.assertEqual(403, get_response(app, '/../bar').status_code)

    def test_dont_leak_parent_directory_file_existance(self):
        # We'll have:
        #   /TEST_DIR/
        #   /TEST_DIR/foo/   <- serve this directory
        serve_path = os.path.join(self.test_dir, 'foo')
        os.mkdir(serve_path)
        app = static.DirectoryApp(serve_path)

        # The file exists, but is outside the served dir.
        self.assertEqual(403, get_response(app, '/../bar2').status_code)

    def test_file_app_arguments(self):
        app = static.DirectoryApp(self.test_dir, content_type='xxx/yyy')
        create_file('abcde', self.test_dir, 'bar')

        resp = get_response(app, '/bar')
        self.assertEqual(200, resp.status_code)
        self.assertEqual('xxx/yyy', resp.content_type)

    def test_file_app_factory(self):
        def make_fileapp(*args, **kwargs):
            make_fileapp.called = True
            return Response()
        make_fileapp.called = False

        app = static.DirectoryApp(self.test_dir)
        app.make_fileapp = make_fileapp
        create_file('abcde', self.test_dir, 'bar')

        get_response(app, '/bar')
        self.assertTrue(make_fileapp.called)

    def test_must_serve_directory(self):
        serve_path = create_file('abcde', self.test_dir, 'bar')
        self.assertRaises(IOError, static.DirectoryApp, serve_path)

    def test_index_page(self):
        os.mkdir(os.path.join(self.test_dir, 'index-test'))
        create_file(bytes_('index'), self.test_dir, 'index-test', 'index.html')
        app = static.DirectoryApp(self.test_dir)
        resp = get_response(app, '/index-test')
        self.assertEqual(resp.status_code, 301)
        self.assertTrue(resp.location.endswith('/index-test/'))
        resp = get_response(app, '/index-test?test')
        self.assertTrue(resp.location.endswith('/index-test/?test'))
        resp = get_response(app, '/index-test/')
        self.assertEqual(resp.status_code, 200)
        self.assertEqual(resp.body, bytes_('index'))
        self.assertEqual(resp.content_type, 'text/html')
        resp = get_response(app, '/index-test/index.html')
        self.assertEqual(resp.status_code, 200)
        self.assertEqual(resp.body, bytes_('index'))
        redir_app = static.DirectoryApp(self.test_dir, hide_index_with_redirect=True)
        resp = get_response(redir_app, '/index-test/index.html')
        self.assertEqual(resp.status_code, 301)
        self.assertTrue(resp.location.endswith('/index-test/'))
        resp = get_response(redir_app, '/index-test/index.html?test')
        self.assertTrue(resp.location.endswith('/index-test/?test'))
        page_app = static.DirectoryApp(self.test_dir, index_page='something-else.html')
        self.assertEqual(get_response(page_app, '/index-test/').status_code, 404)
