#!/usr/bin/env python
#
# Copyright 2011 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""WSGI utility library tests."""

__author__ = 'rafe@google.com (Rafe Kaplan)'


import httplib
import unittest

from protorpc import test_util
from protorpc import util
from protorpc import webapp_test_util
from protorpc.wsgi import util as wsgi_util

APP1 = wsgi_util.static_page('App1')
APP2 = wsgi_util.static_page('App2')
NOT_FOUND = wsgi_util.error(httplib.NOT_FOUND)


class WsgiTestBase(webapp_test_util.WebServerTestBase):

  server_thread = None

  def CreateWsgiApplication(self):
    return None

  def DoHttpRequest(self,
                    path='/',
                    content=None,
                    content_type='text/plain; charset=utf-8',
                    headers=None):
    connection = httplib.HTTPConnection('localhost', self.port)
    if content is None:
      method = 'GET'
    else:
      method = 'POST'
    headers = {'content=type': content_type}
    headers.update(headers)
    connection.request(method, path, content, headers)
    response = connection.getresponse()

    not_date_or_server = lambda header: header[0] not in ('date', 'server')
    headers = filter(not_date_or_server, response.getheaders())

    return response.status, response.reason, response.read(), dict(headers)


class StaticPageBase(WsgiTestBase):

  def testDefault(self):
    default_page = wsgi_util.static_page()
    self.ResetServer(default_page)
    status, reason, content, headers = self.DoHttpRequest()
    self.assertEquals(200, status)
    self.assertEquals('OK', reason)
    self.assertEquals('', content)
    self.assertEquals({'content-length': '0',
                       'content-type': 'text/html; charset=utf-8',
                      },
                      headers)

  def testHasContent(self):
    default_page = wsgi_util.static_page('my content')
    self.ResetServer(default_page)
    status, reason, content, headers = self.DoHttpRequest()
    self.assertEquals(200, status)
    self.assertEquals('OK', reason)
    self.assertEquals('my content', content)
    self.assertEquals({'content-length': str(len('my content')),
                       'content-type': 'text/html; charset=utf-8',
                      },
                      headers)

  def testHasContentType(self):
    default_page = wsgi_util.static_page(content_type='text/plain')
    self.ResetServer(default_page)
    status, reason, content, headers = self.DoHttpRequest()
    self.assertEquals(200, status)
    self.assertEquals('OK', reason)
    self.assertEquals('', content)
    self.assertEquals({'content-length': '0',
                       'content-type': 'text/plain',
                      },
                      headers)

  def testHasStatus(self):
    default_page = wsgi_util.static_page(status='400 Not Good Request')
    self.ResetServer(default_page)
    status, reason, content, headers = self.DoHttpRequest()
    self.assertEquals(400, status)
    self.assertEquals('Not Good Request', reason)
    self.assertEquals('', content)
    self.assertEquals({'content-length': '0',
                       'content-type': 'text/html; charset=utf-8',
                      },
                      headers)

  def testHasStatusInt(self):
    default_page = wsgi_util.static_page(status=401)
    self.ResetServer(default_page)
    status, reason, content, headers = self.DoHttpRequest()
    self.assertEquals(401, status)
    self.assertEquals('Unauthorized', reason)
    self.assertEquals('', content)
    self.assertEquals({'content-length': '0',
                       'content-type': 'text/html; charset=utf-8',
                      },
                      headers)

  def testHasStatusUnknown(self):
    default_page = wsgi_util.static_page(status=909)
    self.ResetServer(default_page)
    status, reason, content, headers = self.DoHttpRequest()
    self.assertEquals(909, status)
    self.assertEquals('Unknown Error', reason)
    self.assertEquals('', content)
    self.assertEquals({'content-length': '0',
                       'content-type': 'text/html; charset=utf-8',
                      },
                      headers)

  def testHasStatusTuple(self):
    default_page = wsgi_util.static_page(status=(500, 'Bad Thing'))
    self.ResetServer(default_page)
    status, reason, content, headers = self.DoHttpRequest()
    self.assertEquals(500, status)
    self.assertEquals('Bad Thing', reason)
    self.assertEquals('', content)
    self.assertEquals({'content-length': '0',
                       'content-type': 'text/html; charset=utf-8',
                      },
                      headers)

  def testHasHeaders(self):
    default_page = wsgi_util.static_page(headers=[('x', 'foo'),
                                                  ('a', 'bar'),
                                                  ('z', 'bin')])
    self.ResetServer(default_page)
    status, reason, content, headers = self.DoHttpRequest()
    self.assertEquals(200, status)
    self.assertEquals('OK', reason)
    self.assertEquals('', content)
    self.assertEquals({'content-length': '0',
                       'content-type': 'text/html; charset=utf-8',
                       'x': 'foo',
                       'a': 'bar',
                       'z': 'bin',
                      },
                      headers)

  def testHeadersUnicodeSafe(self):
    default_page = wsgi_util.static_page(headers=[('x', u'foo')])
    self.ResetServer(default_page)
    status, reason, content, headers = self.DoHttpRequest()
    self.assertEquals(200, status)
    self.assertEquals('OK', reason)
    self.assertEquals('', content)
    self.assertEquals({'content-length': '0',
                       'content-type': 'text/html; charset=utf-8',
                       'x': 'foo',
                      },
                      headers)
    self.assertTrue(isinstance(headers['x'], str))

  def testHasHeadersDict(self):
    default_page = wsgi_util.static_page(headers={'x': 'foo',
                                                  'a': 'bar',
                                                  'z': 'bin'})
    self.ResetServer(default_page)
    status, reason, content, headers = self.DoHttpRequest()
    self.assertEquals(200, status)
    self.assertEquals('OK', reason)
    self.assertEquals('', content)
    self.assertEquals({'content-length': '0',
                       'content-type': 'text/html; charset=utf-8',
                       'x': 'foo',
                       'a': 'bar',
                       'z': 'bin',
                      },
                      headers)


class FirstFoundTest(WsgiTestBase):

  def testEmptyConfiguration(self):
    self.ResetServer(wsgi_util.first_found([]))
    status, status_text, content, headers = self.DoHttpRequest('/')
    self.assertEquals(httplib.NOT_FOUND, status)
    self.assertEquals(httplib.responses[httplib.NOT_FOUND], status_text)
    self.assertEquals(util.pad_string(httplib.responses[httplib.NOT_FOUND]),
                      content)
    self.assertEquals({'content-length': '512',
                       'content-type': 'text/plain; charset=utf-8',
                      },
                      headers)

  def testOneApp(self):
    self.ResetServer(wsgi_util.first_found([APP1]))

    status, status_text, content, headers = self.DoHttpRequest('/')
    self.assertEquals(httplib.OK, status)
    self.assertEquals(httplib.responses[httplib.OK], status_text)
    self.assertEquals('App1', content)
    self.assertEquals({'content-length': '4',
                       'content-type': 'text/html; charset=utf-8',
                      },
                      headers)

  def testIterator(self):
    self.ResetServer(wsgi_util.first_found(iter([APP1])))

    status, status_text, content, headers = self.DoHttpRequest('/')
    self.assertEquals(httplib.OK, status)
    self.assertEquals(httplib.responses[httplib.OK], status_text)
    self.assertEquals('App1', content)
    self.assertEquals({'content-length': '4',
                       'content-type': 'text/html; charset=utf-8',
                      },
                      headers)

    # Do request again to make sure iterator was properly copied.
    status, status_text, content, headers = self.DoHttpRequest('/')
    self.assertEquals(httplib.OK, status)
    self.assertEquals(httplib.responses[httplib.OK], status_text)
    self.assertEquals('App1', content)
    self.assertEquals({'content-length': '4',
                       'content-type': 'text/html; charset=utf-8',
                      },
                      headers)

  def testTwoApps(self):
    self.ResetServer(wsgi_util.first_found([APP1, APP2]))

    status, status_text, content, headers = self.DoHttpRequest('/')
    self.assertEquals(httplib.OK, status)
    self.assertEquals(httplib.responses[httplib.OK], status_text)
    self.assertEquals('App1', content)
    self.assertEquals({'content-length': '4',
                       'content-type': 'text/html; charset=utf-8',
                      },
                      headers)

  def testFirstNotFound(self):
    self.ResetServer(wsgi_util.first_found([NOT_FOUND, APP2]))

    status, status_text, content, headers = self.DoHttpRequest('/')
    self.assertEquals(httplib.OK, status)
    self.assertEquals(httplib.responses[httplib.OK], status_text)
    self.assertEquals('App2', content)
    self.assertEquals({'content-length': '4',
                       'content-type': 'text/html; charset=utf-8',
                      },
                      headers)

  def testOnlyNotFound(self):
    def current_error(environ, start_response):
      """The variable current_status is defined in loop after ResetServer."""
      headers = [('content-type', 'text/plain')]
      status_line = '%03d Whatever' % current_status
      start_response(status_line, headers)
      return []

    self.ResetServer(wsgi_util.first_found([current_error, APP2]))

    statuses_to_check = sorted(httplib.responses.iterkeys())
    # 100, 204 and 304 have slightly different expectations, so they are left
    # out of this test in order to keep the code simple.
    for dont_check in (100, 200, 204, 304, 404):
      statuses_to_check.remove(dont_check)
    for current_status in statuses_to_check:
      status, status_text, content, headers = self.DoHttpRequest('/')
      self.assertEquals(current_status, status)
      self.assertEquals('Whatever', status_text)


if __name__ == '__main__':
  unittest.main()
