#!/usr/bin/env python
# -*- coding: utf-8 -*-

import base64
import urllib
import urlparse
from saml2.authn_context import INTERNETPROTOCOLPASSWORD
from saml2.response import LogoutResponse

from saml2.client import Saml2Client
from saml2 import samlp, BINDING_HTTP_POST, BINDING_HTTP_REDIRECT
from saml2 import saml, config, class_name
from saml2.config import SPConfig
from saml2.saml import NAMEID_FORMAT_PERSISTENT
from saml2.saml import NAMEID_FORMAT_TRANSIENT
from saml2.saml import NameID
from saml2.server import Server
from saml2.time_util import in_a_while

from py.test import raises
from fakeIDP import FakeIDP, unpack_form


AUTHN = {
    "class_ref": INTERNETPROTOCOLPASSWORD,
    "authn_auth": "http://www.example.com/login"
}


def for_me(condition, me):
    for restriction in condition.audience_restriction:
        audience = restriction.audience
        if audience.text.strip() == me:
            return True


def ava(attribute_statement):
    result = {}
    for attribute in attribute_statement.attribute:
        # Check name_format ??
        name = attribute.name.strip()
        result[name] = []
        for value in attribute.attribute_value:
            result[name].append(value.text.strip())
    return result


def _leq(l1, l2):
    return set(l1) == set(l2)

# def test_parse_3():
#     xml_response = open(XML_RESPONSE_FILE3).read()
#     response = samlp.response_from_string(xml_response)
#     client = Saml2Client({})
#     (ava, name_id, real_uri) = \
#             client.do_response(response, "xenosmilus.umdc.umu.se")
#     print 40*"="
#     print ava
#     print 40*","
#     print name_id
#     assert False

REQ1 = {"1.2.14": """<?xml version='1.0' encoding='UTF-8'?>
<ns0:AttributeQuery Destination="https://idp.example.com/idp/" ID="id1"
IssueInstant="%s" Version="2.0" xmlns:ns0="urn:oasis:names:tc:SAML:2
.0:protocol"><ns1:Issuer Format="urn:oasis:names:tc:SAML:2
.0:nameid-format:entity" xmlns:ns1="urn:oasis:names:tc:SAML:2
.0:assertion">urn:mace:example.com:saml:roland:sp</ns1:Issuer><ns1:Subject
xmlns:ns1="urn:oasis:names:tc:SAML:2.0:assertion"><ns1:NameID
Format="urn:oasis:names:tc:SAML:2
.0:nameid-format:persistent">E8042FB4-4D5B-48C3-8E14-8EDD852790DD</ns1:NameID
></ns1:Subject></ns0:AttributeQuery>""",
        "1.2.16": """<?xml version='1.0' encoding='UTF-8'?>
<ns0:AttributeQuery xmlns:ns0="urn:oasis:names:tc:SAML:2.0:protocol"
xmlns:ns1="urn:oasis:names:tc:SAML:2.0:assertion" Destination="https://idp
.example.com/idp/" ID="id1" IssueInstant="%s" Version="2.0"><ns1:Issuer
Format="urn:oasis:names:tc:SAML:2.0:nameid-format:entity">urn:mace:example
.com:saml:roland:sp</ns1:Issuer><ns1:Subject><ns1:NameID
Format="urn:oasis:names:tc:SAML:2
.0:nameid-format:persistent">E8042FB4-4D5B-48C3-8E14-8EDD852790DD</ns1:NameID
></ns1:Subject></ns0:AttributeQuery>"""}

nid = NameID(name_qualifier="foo", format=NAMEID_FORMAT_TRANSIENT,
             text="123456")


class TestClient:
    def setup_class(self):
        self.server = Server("idp_conf")

        conf = config.SPConfig()
        conf.load_file("server_conf")
        self.client = Saml2Client(conf)

    def test_create_attribute_query1(self):
        req_id, req = self.client.create_attribute_query(
            "https://idp.example.com/idp/",
            "E8042FB4-4D5B-48C3-8E14-8EDD852790DD",
            format=saml.NAMEID_FORMAT_PERSISTENT,
            message_id="id1")
        reqstr = "%s" % req.to_string()

        assert req.destination == "https://idp.example.com/idp/"
        assert req.id == "id1"
        assert req.version == "2.0"
        subject = req.subject
        name_id = subject.name_id
        assert name_id.format == saml.NAMEID_FORMAT_PERSISTENT
        assert name_id.text == "E8042FB4-4D5B-48C3-8E14-8EDD852790DD"
        issuer = req.issuer
        assert issuer.text == "urn:mace:example.com:saml:roland:sp"

        attrq = samlp.attribute_query_from_string(reqstr)

        print attrq.keyswv()
        assert _leq(attrq.keyswv(), ['destination', 'subject', 'issue_instant',
                                     'version', 'id', 'issuer'])

        assert attrq.destination == req.destination
        assert attrq.id == req.id
        assert attrq.version == req.version
        assert attrq.issuer.text == issuer.text
        assert attrq.issue_instant == req.issue_instant
        assert attrq.subject.name_id.format == name_id.format
        assert attrq.subject.name_id.text == name_id.text

    def test_create_attribute_query2(self):
        req_id, req = self.client.create_attribute_query(
            "https://idp.example.com/idp/",
            "E8042FB4-4D5B-48C3-8E14-8EDD852790DD",
            attribute={
                ("urn:oid:2.5.4.42",
                 "urn:oasis:names:tc:SAML:2.0:attrname-format:uri",
                 "givenName"): None,
                ("urn:oid:2.5.4.4",
                 "urn:oasis:names:tc:SAML:2.0:attrname-format:uri",
                 "surname"): None,
                ("urn:oid:1.2.840.113549.1.9.1",
                 "urn:oasis:names:tc:SAML:2.0:attrname-format:uri"): None,
            },
            format=saml.NAMEID_FORMAT_PERSISTENT,
            message_id="id1")

        print req.to_string()
        assert req.destination == "https://idp.example.com/idp/"
        assert req.id == "id1"
        assert req.version == "2.0"
        subject = req.subject
        name_id = subject.name_id
        assert name_id.format == saml.NAMEID_FORMAT_PERSISTENT
        assert name_id.text == "E8042FB4-4D5B-48C3-8E14-8EDD852790DD"
        assert len(req.attribute) == 3
        # one is givenName
        seen = []
        for attribute in req.attribute:
            if attribute.name == "urn:oid:2.5.4.42":
                assert attribute.name_format == saml.NAME_FORMAT_URI
                assert attribute.friendly_name == "givenName"
                seen.append("givenName")
            elif attribute.name == "urn:oid:2.5.4.4":
                assert attribute.name_format == saml.NAME_FORMAT_URI
                assert attribute.friendly_name == "surname"
                seen.append("surname")
            elif attribute.name == "urn:oid:1.2.840.113549.1.9.1":
                assert attribute.name_format == saml.NAME_FORMAT_URI
                if getattr(attribute, "friendly_name"):
                    assert False
                seen.append("email")
        assert _leq(seen, ["givenName", "surname", "email"])

    def test_create_attribute_query_3(self):
        req_id, req = self.client.create_attribute_query(
            "https://aai-demo-idp.switch.ch/idp/shibboleth",
            "_e7b68a04488f715cda642fbdd90099f5",
            format=saml.NAMEID_FORMAT_TRANSIENT,
            message_id="id1")

        assert isinstance(req, samlp.AttributeQuery)
        assert req.destination == "https://aai-demo-idp.switch" \
                                  ".ch/idp/shibboleth"
        assert req.id == "id1"
        assert req.version == "2.0"
        assert req.issue_instant
        assert req.issuer.text == "urn:mace:example.com:saml:roland:sp"
        nameid = req.subject.name_id
        assert nameid.format == saml.NAMEID_FORMAT_TRANSIENT
        assert nameid.text == "_e7b68a04488f715cda642fbdd90099f5"

    def test_create_auth_request_0(self):
        ar_str = "%s" % self.client.create_authn_request(
            "http://www.example.com/sso", message_id="id1")[1]

        ar = samlp.authn_request_from_string(ar_str)
        print ar
        assert ar.assertion_consumer_service_url == ("http://lingon.catalogix"
                                                     ".se:8087/")
        assert ar.destination == "http://www.example.com/sso"
        assert ar.protocol_binding == BINDING_HTTP_POST
        assert ar.version == "2.0"
        assert ar.provider_name == "urn:mace:example.com:saml:roland:sp"
        assert ar.issuer.text == "urn:mace:example.com:saml:roland:sp"
        nid_policy = ar.name_id_policy
        assert nid_policy.allow_create == "false"
        assert nid_policy.format == saml.NAMEID_FORMAT_TRANSIENT

    def test_create_auth_request_vo(self):
        assert self.client.config.vorg.keys() == [
            "urn:mace:example.com:it:tek"]

        ar_str = "%s" % self.client.create_authn_request(
            "http://www.example.com/sso",
            "urn:mace:example.com:it:tek",  # vo
            nameid_format=NAMEID_FORMAT_PERSISTENT,
            message_id="666")[1]

        ar = samlp.authn_request_from_string(ar_str)
        print ar
        assert ar.id == "666"
        assert ar.assertion_consumer_service_url == "http://lingon.catalogix" \
                                                    ".se:8087/"
        assert ar.destination == "http://www.example.com/sso"
        assert ar.protocol_binding == BINDING_HTTP_POST
        assert ar.version == "2.0"
        assert ar.provider_name == "urn:mace:example.com:saml:roland:sp"
        assert ar.issuer.text == "urn:mace:example.com:saml:roland:sp"
        nid_policy = ar.name_id_policy
        assert nid_policy.allow_create == "false"
        assert nid_policy.format == saml.NAMEID_FORMAT_PERSISTENT
        assert nid_policy.sp_name_qualifier == "urn:mace:example.com:it:tek"

    def test_sign_auth_request_0(self):
        #print self.client.config

        req_id, areq = self.client.create_authn_request(
            "http://www.example.com/sso", sign=True, message_id="id1")

        ar_str = "%s" % areq
        ar = samlp.authn_request_from_string(ar_str)

        assert ar
        assert ar.signature
        assert ar.signature.signature_value
        signed_info = ar.signature.signed_info
        #print signed_info
        assert len(signed_info.reference) == 1
        assert signed_info.reference[0].uri == "#id1"
        assert signed_info.reference[0].digest_value
        print "------------------------------------------------"
        try:
            assert self.client.sec.correctly_signed_authn_request(
                ar_str, self.client.config.xmlsec_binary,
                self.client.config.metadata)
        except Exception:  # missing certificate
            self.client.sec.verify_signature(ar_str, node_name=class_name(ar))

    def test_response(self):
        IDP = "urn:mace:example.com:saml:roland:idp"

        ava = {"givenName": ["Derek"], "surName": ["Jeter"],
               "mail": ["derek@nyy.mlb.com"], "title": ["The man"]}

        nameid_policy = samlp.NameIDPolicy(allow_create="false",
                                           format=saml.NAMEID_FORMAT_PERSISTENT)

        resp = self.server.create_authn_response(
            identity=ava,
            in_response_to="id1",
            destination="http://lingon.catalogix.se:8087/",
            sp_entity_id="urn:mace:example.com:saml:roland:sp",
            name_id_policy=nameid_policy,
            userid="foba0001@example.com",
            authn=AUTHN)

        resp_str = "%s" % resp

        resp_str = base64.encodestring(resp_str)

        authn_response = self.client.parse_authn_request_response(
            resp_str, BINDING_HTTP_POST,
            {"id1": "http://foo.example.com/service"})

        assert authn_response is not None
        assert authn_response.issuer() == IDP
        assert authn_response.response.assertion[0].issuer.text == IDP
        session_info = authn_response.session_info()

        print session_info
        assert session_info["ava"] == {'mail': ['derek@nyy.mlb.com'],
                                       'givenName': ['Derek'],
                                       'sn': ['Jeter'],
                                       'title': ["The man"]}
        assert session_info["issuer"] == IDP
        assert session_info["came_from"] == "http://foo.example.com/service"
        response = samlp.response_from_string(authn_response.xmlstr)
        assert response.destination == "http://lingon.catalogix.se:8087/"

        # One person in the cache
        assert len(self.client.users.subjects()) == 1
        subject_id = self.client.users.subjects()[0]
        print "||||", self.client.users.get_info_from(subject_id, IDP)
        # The information I have about the subject comes from one source
        assert self.client.users.issuers_of_info(subject_id) == [IDP]

        # --- authenticate another person

        ava = {"givenName": ["Alfonson"], "surName": ["Soriano"],
               "mail": ["alfonson@chc.mlb.com"], "title": ["outfielder"]}

        resp_str = "%s" % self.server.create_authn_response(
            identity=ava,
            in_response_to="id2",
            destination="http://lingon.catalogix.se:8087/",
            sp_entity_id="urn:mace:example.com:saml:roland:sp",
            name_id_policy=nameid_policy,
            userid="also0001@example.com",
            authn=AUTHN)

        resp_str = base64.encodestring(resp_str)

        self.client.parse_authn_request_response(
            resp_str, BINDING_HTTP_POST,
            {"id2": "http://foo.example.com/service"})

        # Two persons in the cache
        assert len(self.client.users.subjects()) == 2
        issuers = [self.client.users.issuers_of_info(s) for s in
                   self.client.users.subjects()]
        # The information I have about the subjects comes from the same source
        print issuers
        assert issuers == [[IDP], [IDP]]

    def test_init_values(self):
        entityid = self.client.config.entityid
        print entityid
        assert entityid == "urn:mace:example.com:saml:roland:sp"
        print self.client.metadata.with_descriptor("idpsso")
        location = self.client._sso_location()
        print location
        assert location == 'http://localhost:8088/sso'
        my_name = self.client._my_name()
        print my_name
        assert my_name == "urn:mace:example.com:saml:roland:sp"

# Below can only be done with dummy Server

IDP = "urn:mace:example.com:saml:roland:idp"


class TestClientWithDummy():
    def setup_class(self):
        self.server = FakeIDP("idp_all_conf")

        conf = SPConfig()
        conf.load_file("servera_conf")
        self.client = Saml2Client(conf)

        self.client.send = self.server.receive

    def test_do_authn(self):
        binding = BINDING_HTTP_REDIRECT
        response_binding = BINDING_HTTP_POST
        sid, http_args = self.client.prepare_for_authenticate(
            IDP, "http://www.example.com/relay_state",
            binding=binding, response_binding=response_binding)

        assert isinstance(sid, basestring)
        assert len(http_args) == 4
        assert http_args["headers"][0][0] == "Location"
        assert http_args["data"] == []
        redirect_url = http_args["headers"][0][1]
        _, _, _, _, qs, _ = urlparse.urlparse(redirect_url)
        qs_dict = urlparse.parse_qs(qs)
        req = self.server.parse_authn_request(qs_dict["SAMLRequest"][0],
                                              binding)
        resp_args = self.server.response_args(req.message, [response_binding])
        assert resp_args["binding"] == response_binding

    def test_do_attribute_query(self):
        response = self.client.do_attribute_query(
            IDP, "_e7b68a04488f715cda642fbdd90099f5",
            attribute={"eduPersonAffiliation": None},
            nameid_format=NAMEID_FORMAT_TRANSIENT)

    def test_logout_1(self):
        """ one IdP/AA logout from"""

        # information about the user from an IdP
        session_info = {
            "name_id": nid,
            "issuer": "urn:mace:example.com:saml:roland:idp",
            "not_on_or_after": in_a_while(minutes=15),
            "ava": {
                "givenName": "Anders",
                "surName": "Andersson",
                "mail": "anders.andersson@example.com"
            }
        }
        self.client.users.add_information_about_person(session_info)
        entity_ids = self.client.users.issuers_of_info(nid)
        assert entity_ids == ["urn:mace:example.com:saml:roland:idp"]
        resp = self.client.global_logout(nid, "Tired", in_a_while(minutes=5))
        print resp
        assert resp
        assert len(resp) == 1
        assert resp.keys() == entity_ids
        response = resp[entity_ids[0]]
        assert isinstance(response, LogoutResponse)

    def test_post_sso(self):
        binding = BINDING_HTTP_POST
        response_binding = BINDING_HTTP_POST
        sid, http_args = self.client.prepare_for_authenticate(
            "urn:mace:example.com:saml:roland:idp", relay_state="really",
            binding=binding, response_binding=response_binding)
        _dic = unpack_form(http_args["data"][3])

        req = self.server.parse_authn_request(_dic["SAMLRequest"], binding)
        resp_args = self.server.response_args(req.message, [response_binding])
        assert resp_args["binding"] == response_binding

        # Normally a response would now be sent back to the users web client
        # Here I fake what the client will do
        # create the form post

        http_args["data"] = urllib.urlencode(_dic)
        http_args["method"] = "POST"
        http_args["dummy"] = _dic["SAMLRequest"]
        http_args["headers"] = [('Content-type',
                                 'application/x-www-form-urlencoded')]

        response = self.client.send(**http_args)
        print response.text
        _dic = unpack_form(response.text[3], "SAMLResponse")
        resp = self.client.parse_authn_request_response(_dic["SAMLResponse"],
                                                        BINDING_HTTP_POST,
                                                        {sid: "/"})
        ac = resp.assertion.authn_statement[0].authn_context
        assert ac.authenticating_authority[0].text == \
               'http://www.example.com/login'
        assert ac.authn_context_class_ref.text == INTERNETPROTOCOLPASSWORD


# if __name__ == "__main__":
#     tc = TestClient()
#     tc.setup_class()
#     tc.test_response()

if __name__ == "__main__":
    tc = TestClient()
    tc.setup_class()
    tc.test_sign_auth_request_0()
