#! /usr/bin/env python
# -*- coding: utf-8 -*-
#
# $Id: profiles_tests.py 3254 2007-06-05 21:23:57Z fpeters $
#
# Python unit tests for Lasso library
#
# Copyright (C) 2004-2007 Entr'ouvert
# http://lasso.entrouvert.org
#
# Authors: See AUTHORS file in top-level directory.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.


import base64
import os
import unittest
import sys

if not '..' in sys.path:
    sys.path.insert(0, '..')
if not '../.libs' in sys.path:
    sys.path.insert(0, '../.libs')

import lasso
import logging

logging.basicConfig()


try:
    dataDir
except NameError:
    srcdir = os.environ.get('TOP_SRCDIR', '.')
    dataDir = '%s/tests/data' % srcdir


def server(local_name, remote_role, remote_name):
    pwd = os.path.join(dataDir, local_name, 'password')
    password = None
    if os.path.exists(pwd):
        password = open(pwd).read()
    s = lasso.Server(
        os.path.join(dataDir, local_name, 'metadata.xml'),
        os.path.join(dataDir, local_name, 'private-key.pem'),
        password)
    s.addProvider(remote_role, os.path.join(dataDir, remote_name, 'metadata.xml'))
    return s


class ServerTestCase(unittest.TestCase):
    def test01(self):
        """Server construction, dump & newFromDump."""

        lassoServer = lasso.Server(
            os.path.join(dataDir, 'sp1-la/metadata.xml'),
            os.path.join(dataDir, 'sp1-la/private-key-raw.pem'),
            None,
            os.path.join(dataDir, 'sp1-la/certificate.pem'))
        lassoServer.addProvider(
            lasso.PROVIDER_ROLE_IDP,
            os.path.join(dataDir, 'idp1-la/metadata.xml'),
            os.path.join(dataDir, 'idp1-la/public-key.pem'),
            os.path.join(dataDir, 'idp1-la/certificate.pem'))
        dump = lassoServer.dump()
        lassoServer2 = lassoServer.newFromDump(dump)
        dump2 = lassoServer2.dump()
        self.assertEqual(dump, dump2)

    def test02(self):
        """Server construction without argument, dump & newFromDump."""

        lassoServer = lasso.Server(
            os.path.join(dataDir, 'sp1-la/metadata.xml'))
        lassoServer.addProvider(
            lasso.PROVIDER_ROLE_IDP,
            os.path.join(dataDir, 'idp1-la/metadata.xml'),
            os.path.join(dataDir, 'idp1-la/public-key.pem'))
        dump = lassoServer.dump()
        lassoServer2 = lassoServer.newFromDump(dump)
        dump2 = lassoServer2.dump()
        self.assertEqual(dump, dump2)


class LoginTestCase(unittest.TestCase):
    def test01(self):
        """SP login; testing access to authentication request."""

        lassoServer = lasso.Server(
            os.path.join(dataDir, 'sp1-la/metadata.xml'),
            os.path.join(dataDir, 'sp1-la/private-key-raw.pem'),
            None,
            os.path.join(dataDir, 'sp1-la/certificate.pem'))
        lassoServer.addProvider(
            lasso.PROVIDER_ROLE_IDP,
            os.path.join(dataDir, 'idp1-la/metadata.xml'),
            os.path.join(dataDir, 'idp1-la/public-key.pem'),
            os.path.join(dataDir, 'idp1-la/certificate.pem'))
        login = lasso.Login(lassoServer)
        login.initAuthnRequest()
        login.request
        login.request.protocolProfile = lasso.LIB_PROTOCOL_PROFILE_BRWS_ART
        self.assertEqual(login.request.protocolProfile, lasso.LIB_PROTOCOL_PROFILE_BRWS_ART)

    def test02(self):
        """SP login; testing processing of an empty Response."""

        lassoServer = lasso.Server(
            os.path.join(dataDir, 'sp1-la/metadata.xml'),
            os.path.join(dataDir, 'sp1-la/private-key-raw.pem'),
            None,
            os.path.join(dataDir, 'sp1-la/certificate.pem'))
        lassoServer.addProvider(
            lasso.PROVIDER_ROLE_IDP,
            os.path.join(dataDir, 'idp1-la/metadata.xml'),
            os.path.join(dataDir, 'idp1-la/public-key.pem'),
            os.path.join(dataDir, 'idp1-la/certificate.pem'))
        login = lasso.Login(lassoServer)
        try:
            login.processResponseMsg('')
        except lasso.Error as error:
            if error[0] != lasso.PROFILE_ERROR_INVALID_MSG:
                raise

    def test03(self):
        """Conversion of a lib:AuthnRequest with an AuthnContext into a query and back."""

        sp = lasso.Server(
            os.path.join(dataDir, 'sp1-la/metadata.xml'),
            os.path.join(dataDir, 'sp1-la/private-key-raw.pem'),
            None,
            os.path.join(dataDir, 'sp1-la/certificate.pem'))
        sp.addProvider(
            lasso.PROVIDER_ROLE_IDP,
            os.path.join(dataDir, 'idp1-la/metadata.xml'),
            os.path.join(dataDir, 'idp1-la/public-key.pem'),
            os.path.join(dataDir, 'idp1-la/certificate.pem'))
        spLogin = lasso.Login(sp)
        spLogin.initAuthnRequest()
        requestAuthnContext = lasso.LibRequestAuthnContext()
        authnContextClassRefsList = []
        authnContextClassRefsList.append(
            lasso.LIB_AUTHN_CONTEXT_CLASS_REF_PASSWORD)
        requestAuthnContext.authnContextClassRef = tuple(authnContextClassRefsList)
        spLogin.request.requestAuthnContext = requestAuthnContext
        spLogin.request.protocolProfile = lasso.LIB_PROTOCOL_PROFILE_BRWS_ART
        spLogin.buildAuthnRequestMsg()
        authnRequestQuery = spLogin.msgUrl[spLogin.msgUrl.index('?') + 1:]
        idp = lasso.Server(
            os.path.join(dataDir, 'idp1-la/metadata.xml'),
            os.path.join(dataDir, 'idp1-la/private-key-raw.pem'),
            None,
            os.path.join(dataDir, 'idp1-la/certificate.pem'))
        idp.addProvider(
            lasso.PROVIDER_ROLE_SP,
            os.path.join(dataDir, 'sp1-la/metadata.xml'),
            os.path.join(dataDir, 'sp1-la/public-key.pem'),
            os.path.join(dataDir, 'sp1-la/certificate.pem'))
        idpLogin = lasso.Login(idp)
        idpLogin.processAuthnRequestMsg(authnRequestQuery)
        self.assertTrue(idpLogin.request.requestAuthnContext)
        authnContextClassRefsList = idpLogin.request.requestAuthnContext.authnContextClassRef
        self.assertEqual(len(authnContextClassRefsList), 1)
        self.assertEqual(authnContextClassRefsList[0], lasso.LIB_AUTHN_CONTEXT_CLASS_REF_PASSWORD)

    def test04(self):
        """Conversion of a lib:AuthnRequest with extensions into a query and back."""

        sp = lasso.Server(
            os.path.join(dataDir, 'sp1-la/metadata.xml'),
            os.path.join(dataDir, 'sp1-la/private-key-raw.pem'),
            None,
            os.path.join(dataDir, 'sp1-la/certificate.pem'))
        sp.addProvider(
            lasso.PROVIDER_ROLE_IDP,
            os.path.join(dataDir, 'idp1-la/metadata.xml'),
            os.path.join(dataDir, 'idp1-la/public-key.pem'),
            os.path.join(dataDir, 'idp1-la/certificate.pem'))
        spLogin = lasso.Login(sp)
        spLogin.initAuthnRequest()
        extensionList = []
        for extension in (
                '<action>do</action>',
                '<action2>do action 2</action2><action3>do action 3</action3>'):
            extensionList.append(
                '<lib:Extension xmlns:lib="urn:liberty:iff:2003-08">%s</lib:Extension>'
                % extension)
        spLogin.request.extension = tuple(extensionList)
        spLogin.request.protocolProfile = lasso.LIB_PROTOCOL_PROFILE_BRWS_ART
        spLogin.buildAuthnRequestMsg()
        authnRequestQuery = spLogin.msgUrl[spLogin.msgUrl.index('?') + 1:]
        idp = lasso.Server(
            os.path.join(dataDir, 'idp1-la/metadata.xml'),
            os.path.join(dataDir, 'idp1-la/private-key-raw.pem'),
            None,
            os.path.join(dataDir, 'idp1-la/certificate.pem'))
        idp.addProvider(
            lasso.PROVIDER_ROLE_SP,
            os.path.join(dataDir, 'sp1-la/metadata.xml'),
            os.path.join(dataDir, 'sp1-la/public-key.pem'),
            os.path.join(dataDir, 'sp1-la/certificate.pem'))
        idpLogin = lasso.Login(idp)
        idpLogin.processAuthnRequestMsg(authnRequestQuery)
        self.assertTrue(idpLogin.request.extension)
        extensionsList = idpLogin.request.extension
        self.assertEqual(len(extensionsList), 1)
        self.assertTrue('<action>do</action>' in extensionsList[0])
        self.assertTrue('<action2>do action 2</action2>' in extensionsList[0])
        self.assertTrue('<action3>do action 3</action3>' in extensionsList[0])

    def test05(self):
        '''SAMLv2 Authn request emitted and received using Artifact binding'''
        sp = lasso.Server(
            os.path.join(dataDir, 'sp5-saml2/metadata.xml'),
            os.path.join(dataDir, 'sp5-saml2/private-key.pem'))
        assert sp
        sp.addProvider(
            lasso.PROVIDER_ROLE_IDP,
            os.path.join(dataDir, 'idp5-saml2/metadata.xml'))
        sp_login = lasso.Login(sp)
        assert sp_login
        sp_login.initAuthnRequest(None, lasso.HTTP_METHOD_ARTIFACT_GET)
        sp_login.buildAuthnRequestMsg()
        sp_login_dump = sp_login.dump()
        idp = lasso.Server(
            os.path.join(dataDir, 'idp5-saml2/metadata.xml'),
            os.path.join(dataDir, 'idp5-saml2/private-key.pem'))
        idp.addProvider(
            lasso.PROVIDER_ROLE_SP,
            os.path.join(dataDir, 'sp5-saml2/metadata.xml'))
        idp_login = lasso.Login(idp)
        idp_login.initRequest(sp_login.msgUrl.split('?')[1], lasso.HTTP_METHOD_ARTIFACT_GET)
        idp_login.buildRequestMsg()
        sp_login2 = lasso.Login.newFromDump(sp, sp_login_dump)
        assert isinstance(sp_login2, lasso.Login)
        assert idp_login.msgBody
        sp_login2.processRequestMsg(idp_login.msgBody)
        sp_login2.buildResponseMsg()
        assert sp_login2.msgBody
        try:
            idp_login.processResponseMsg(sp_login2.msgBody)
        except Exception:
            raise
        assert isinstance(idp_login.request, lasso.Samlp2AuthnRequest)

    def test_06(self):
        '''Login test between SP and IdP with encrypted private keys'''
        sp_server = server('sp7-saml2', lasso.PROVIDER_ROLE_IDP, 'idp7-saml2')
        idp_server = server('idp7-saml2', lasso.PROVIDER_ROLE_SP, 'sp7-saml2')

        sp_login = lasso.Login(sp_server)
        sp_login.initAuthnRequest()
        sp_login.request.protocolBinding = lasso.SAML2_METADATA_BINDING_POST
        sp_login.buildAuthnRequestMsg()
        idp_login = lasso.Login(idp_server)
        idp_login.setSignatureVerifyHint(lasso.PROFILE_SIGNATURE_VERIFY_HINT_FORCE)
        idp_login.processAuthnRequestMsg(sp_login.msgUrl.split('?')[1])
        idp_login.validateRequestMsg(True, True)
        idp_login.buildAssertion("None", "None", "None", "None", "None")
        idp_login.buildAuthnResponseMsg()
        sp_login.setSignatureVerifyHint(lasso.PROFILE_SIGNATURE_VERIFY_HINT_FORCE)
        sp_login.processAuthnResponseMsg(idp_login.msgBody)
        sp_login.acceptSso()

    def test07(self):
        '''SAMLv2 SSO with DSA key for the IdP'''
        default_sign_meth = lasso.getDefaultSignatureMethod()
        if default_sign_meth != lasso.SIGNATURE_METHOD_RSA_SHA1:
            self.skipTest("This test requires that lasso is compiled with SHA1 as the default signature method")

        sp = lasso.Server(
            os.path.join(dataDir, 'sp5-saml2/metadata.xml'),
            os.path.join(dataDir, 'sp5-saml2/private-key.pem'))
        assert sp
        sp.addProvider(
            lasso.PROVIDER_ROLE_IDP,
            os.path.join(dataDir, 'idp12-dsa-saml2/metadata.xml'))
        sp_login = lasso.Login(sp)
        assert sp_login
        sp_login.initAuthnRequest(None, lasso.HTTP_METHOD_REDIRECT)
        sp_login.buildAuthnRequestMsg()
        idp = lasso.Server(
            os.path.join(dataDir, 'idp12-dsa-saml2/metadata.xml'),
            os.path.join(dataDir, 'idp12-dsa-saml2/private-key.pem'))
        idp.signatureMethod = lasso.SIGNATURE_METHOD_DSA_SHA1
        idp.addProvider(
            lasso.PROVIDER_ROLE_SP,
            os.path.join(dataDir, 'sp5-saml2/metadata.xml'))
        idp_login = lasso.Login(idp)
        idp_login.processAuthnRequestMsg(sp_login.msgUrl.split('?')[1])
        idp_login.protocolProfile = lasso.LOGIN_PROTOCOL_PROFILE_BRWS_POST
        idp_login.validateRequestMsg(True, True)
        idp_login.buildAssertion("None", "None", "None", "None", "None")
        idp_login.buildAuthnResponseMsg()

    def test08(self):
        '''Verify KeyEncryptionMethod support'''
        sp_server = server('sp5-saml2', lasso.PROVIDER_ROLE_IDP, 'idp5-saml2')
        idp_server = server('idp5-saml2', lasso.PROVIDER_ROLE_SP, 'sp5-saml2')

        def run(key_encryption_method=None):
            sp_login = lasso.Login(sp_server)
            sp_login.initAuthnRequest(None, lasso.HTTP_METHOD_REDIRECT)
            sp_login.buildAuthnRequestMsg()

            provider = idp_server.getProvider('http://sp5/metadata')
            provider.setEncryptionMode(lasso.ENCRYPTION_MODE_ASSERTION)

            if key_encryption_method:
                provider.setKeyEncryptionMethod(key_encryption_method)

            idp_login = lasso.Login(idp_server)
            idp_login.processAuthnRequestMsg(sp_login.msgUrl.split('?')[1])
            idp_login.protocolProfile = lasso.LOGIN_PROTOCOL_PROFILE_BRWS_POST
            idp_login.validateRequestMsg(True, True)
            idp_login.buildAssertion("None", "None", "None", "None", "None")
            idp_login.buildAuthnResponseMsg()

            sp_login.setSignatureVerifyHint(lasso.PROFILE_SIGNATURE_VERIFY_HINT_FORCE)
            sp_login.processAuthnResponseMsg(idp_login.msgBody)
            sp_login.acceptSso()

            if key_encryption_method:
                expected_key_encryption_method = key_encryption_method
            else:
                expected_key_encryption_method = lasso.getDefaultKeyEncryptionMethod()
            assert provider.getKeyEncryptionMethod() == expected_key_encryption_method
            return sp_login.response.debug()

        os.environ['LASSO_DEFAULT_KEY_ENCRYPTION_METHOD'] = 'rsa-pkcs1'
        lasso.init()
        assert 'xmlenc#rsa-1_5' in run()
        assert 'xmlenc#rsa-oaep-mgf1p' not in run()

        os.environ['LASSO_DEFAULT_KEY_ENCRYPTION_METHOD'] = 'rsa-oaep'
        lasso.init()
        assert 'xmlenc#rsa-1_5' not in run()
        assert 'xmlenc#rsa-oaep-mgf1p' in run()

        lasso.setDefaultKeyEncryptionMethod(lasso.KEY_ENCRYPTION_METHOD_PKCS1)
        assert 'xmlenc#rsa-1_5' in run()
        assert 'xmlenc#rsa-oaep-mgf1p' not in run()

        lasso.setDefaultKeyEncryptionMethod(lasso.KEY_ENCRYPTION_METHOD_OAEP)
        assert 'xmlenc#rsa-1_5' not in run()
        assert 'xmlenc#rsa-oaep-mgf1p' in run()

        assert 'xmlenc#rsa-1_5' in run(key_encryption_method=lasso.KEY_ENCRYPTION_METHOD_PKCS1)
        assert 'xmlenc#rsa-oaep-mgf1p' not in run(key_encryption_method=lasso.KEY_ENCRYPTION_METHOD_PKCS1)

        assert 'xmlenc#rsa-1_5' not in run(key_encryption_method=lasso.KEY_ENCRYPTION_METHOD_OAEP)
        assert 'xmlenc#rsa-oaep-mgf1p' in run(key_encryption_method=lasso.KEY_ENCRYPTION_METHOD_OAEP)

    def test_09(self):
        '''Login test between SP and IdP with encrypted private keys'''
        sp_server = server('sp7-saml2', lasso.PROVIDER_ROLE_IDP, 'idp7-saml2')
        idp_server = server('idp7-saml2', lasso.PROVIDER_ROLE_SP, 'sp7-saml2')

        sp_login = lasso.Login(sp_server)
        sp_login.initAuthnRequest()
        sp_login.request.protocolBinding = lasso.SAML2_METADATA_BINDING_POST
        sp_login.buildAuthnRequestMsg()
        idp_login = lasso.Login(idp_server)
        idp_login.setSignatureVerifyHint(lasso.PROFILE_SIGNATURE_VERIFY_HINT_FORCE)
        idp_login.processAuthnRequestMsg(sp_login.msgUrl.split('?')[1])
        idp_login.validateRequestMsg(True, True)
        idp_login.buildAssertion("None", "None", "None", "None", "None")
        content = lasso.MiscTextNode.newWithString("john.doe")
        content.textChild = True
        idp_login.assertion.addAttributeWithNode('login', lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC, content)
        idp_login.buildAuthnResponseMsg()
        sp_login.setSignatureVerifyHint(lasso.PROFILE_SIGNATURE_VERIFY_HINT_FORCE)
        # insert comment inside NameID
        msg = base64.b64decode(idp_login.msgBody).decode()
        msg = msg.replace(idp_login.assertion.subject.nameId.content, idp_login.assertion.subject.nameId.content[:10] + '<!-- coin -->' + idp_login.assertion.subject.nameId.content[10:])
        msg = msg.replace('john.doe', 'john.<!-- coin -->doe')
        msg = base64.b64encode(msg.encode())
        sp_login.processAuthnResponseMsg(msg.decode())
        sp_login.acceptSso()
        assert sp_login.assertion.subject.nameId.content == idp_login.assertion.subject.nameId.content
        nodes = sp_login.assertion.attributeStatement[0].attribute[0].attributeValue[0].any
        assert all(node.textChild for node in nodes)
        assert ''.join(node.content for node in nodes) == 'john.doe'

    def test_10(self):
        '''Login test between SP and IdP with encrypted private keys'''
        sp_server = server('sp7-saml2', lasso.PROVIDER_ROLE_IDP, 'idp7-saml2')
        idp_server = server('idp7-saml2', lasso.PROVIDER_ROLE_SP, 'sp7-saml2')

        sp_login = lasso.Login(sp_server)
        sp_login.initAuthnRequest()
        sp_login.request.protocolBinding = lasso.SAML2_METADATA_BINDING_POST
        sp_login.buildAuthnRequestMsg()
        idp_login = lasso.Login(idp_server)
        idp_login.setSignatureVerifyHint(lasso.PROFILE_SIGNATURE_VERIFY_HINT_FORCE)
        idp_login.processAuthnRequestMsg(sp_login.msgUrl.split('?')[1])
        idp_login.validateRequestMsg(True, True)
        idp_login.buildAssertion("None", "None", "None", "None", "None")
        idp_login.buildAuthnResponseMsg()
        sp_login.setSignatureVerifyHint(lasso.PROFILE_SIGNATURE_VERIFY_HINT_FORCE)
        # insert comment inside NameID
        msg = base64.b64encode(base64.b64decode(idp_login.msgBody).decode().replace(idp_login.assertion.subject.nameId.content, '<![CDATA[%s]]>' % idp_login.assertion.subject.nameId.content).replace('NameID>', 'NameID><![CDATA[]]>').encode())
        sp_login.processAuthnResponseMsg(msg.decode())
        sp_login.acceptSso()
        assert sp_login.assertion.subject.nameId.content == idp_login.assertion.subject.nameId.content


class LogoutTestCase(unittest.TestCase):
    def test01(self):
        """SP logout without session and identity; testing initRequest."""

        lassoServer = lasso.Server(
            os.path.join(dataDir, 'sp1-la/metadata.xml'),
            os.path.join(dataDir, 'sp1-la/private-key-raw.pem'),
            None,
            os.path.join(dataDir, 'sp1-la/certificate.pem'))
        lassoServer.addProvider(
            lasso.PROVIDER_ROLE_IDP,
            os.path.join(dataDir, 'idp1-la/metadata.xml'),
            os.path.join(dataDir, 'idp1-la/public-key.pem'),
            os.path.join(dataDir, 'idp1-la/certificate.pem'))
        logout = lasso.Logout(lassoServer)
        try:
            logout.initRequest()
        except lasso.Error as error:
            if error[0] != lasso.PROFILE_ERROR_SESSION_NOT_FOUND:
                raise
        else:
            self.fail('logout.initRequest without having set identity before should fail')

    def test02(self):
        """IDP logout without session and identity; testing logout.getNextProviderId."""

        lassoServer = lasso.Server(
            os.path.join(dataDir, 'idp1-la/metadata.xml'),
            os.path.join(dataDir, 'idp1-la/private-key-raw.pem'),
            None,
            os.path.join(dataDir, 'idp1-la/certificate.pem'))
        lassoServer.addProvider(
            lasso.PROVIDER_ROLE_SP,
            os.path.join(dataDir, 'sp1-la/metadata.xml'),
            os.path.join(dataDir, 'sp1-la/public-key.pem'),
            os.path.join(dataDir, 'sp1-la/certificate.pem'))
        logout = lasso.Logout(lassoServer)
        self.assertFalse(logout.getNextProviderId())

    def test03(self):
        """IDP logout; testing processRequestMsg with non Liberty query."""

        lassoServer = lasso.Server(
            os.path.join(dataDir, 'idp1-la/metadata.xml'),
            os.path.join(dataDir, 'idp1-la/private-key-raw.pem'),
            None,
            os.path.join(dataDir, 'idp1-la/certificate.pem'))
        lassoServer.addProvider(
            lasso.PROVIDER_ROLE_SP,
            os.path.join(dataDir, 'sp1-la/metadata.xml'),
            os.path.join(dataDir, 'sp1-la/public-key.pem'),
            os.path.join(dataDir, 'sp1-la/certificate.pem'))
        logout = lasso.Logout(lassoServer)
        # The processRequestMsg should fail but not abort.
        try:
            logout.processRequestMsg('passport=0&lasso=1')
        except lasso.Error as error:
            if error[0] != lasso.PROFILE_ERROR_INVALID_MSG:
                raise
        else:
            self.fail('Logout processRequestMsg should have failed.')

    def test04(self):
        """IDP logout; testing processResponseMsg with non Liberty query."""

        lassoServer = lasso.Server(
            os.path.join(dataDir, 'idp1-la/metadata.xml'),
            os.path.join(dataDir, 'idp1-la/private-key-raw.pem'),
            None,
            os.path.join(dataDir, 'idp1-la/certificate.pem'))
        lassoServer.addProvider(
            lasso.PROVIDER_ROLE_SP,
            os.path.join(dataDir, 'sp1-la/metadata.xml'),
            os.path.join(dataDir, 'sp1-la/public-key.pem'),
            os.path.join(dataDir, 'sp1-la/certificate.pem'))
        logout = lasso.Logout(lassoServer)
        # The processResponseMsg should fail but not abort.
        try:
            logout.processResponseMsg('liberty=&alliance')
        except lasso.Error as error:
            if error[0] != lasso.PROFILE_ERROR_INVALID_MSG:
                raise
        else:
            self.fail('Logout processResponseMsg should have failed.')

    def test05(self):
        '''Test parsing of a logout request with more than one session index'''
        content = '''<samlp:LogoutRequest xmlns:samlp="urn:oasis:names:tc:SAML:2.0:protocol" \
xmlns:saml="urn:oasis:names:tc:SAML:2.0:assertion" ID="xxxx" Version="2.0" IssueInstant="2010-06-14T22:00:00">
        <saml:Issuer>me</saml:Issuer>
        <saml:NameID>coin</saml:NameID>
        <samlp:SessionIndex>id1</samlp:SessionIndex>
        <samlp:SessionIndex>id2</samlp:SessionIndex>
        <samlp:SessionIndex>id3</samlp:SessionIndex>
        </samlp:LogoutRequest>'''

        node = lasso.Samlp2LogoutRequest.newFromXmlNode(content)
        assert isinstance(node, lasso.Samlp2LogoutRequest)
        assert node.sessionIndex == 'id1'
        assert node.sessionIndexes == ('id1', 'id2', 'id3')


class DefederationTestCase(unittest.TestCase):
    def test01(self):
        """IDP initiated defederation; testing processNotificationMsg with non Liberty query."""

        lassoServer = lasso.Server(
            os.path.join(dataDir, 'idp1-la/metadata.xml'),
            os.path.join(dataDir, 'idp1-la/private-key-raw.pem'),
            None,
            os.path.join(dataDir, 'idp1-la/certificate.pem'))
        lassoServer.addProvider(
            lasso.PROVIDER_ROLE_SP,
            os.path.join(dataDir, 'sp1-la/metadata.xml'),
            os.path.join(dataDir, 'sp1-la/public-key.pem'),
            os.path.join(dataDir, 'sp1-la/certificate.pem'))
        defederation = lasso.Defederation(lassoServer)
        # The processNotificationMsg should fail but not abort.
        try:
            defederation.processNotificationMsg('nonLibertyQuery=1')
        except lasso.Error as error:
            if error[0] != lasso.PROFILE_ERROR_INVALID_MSG:
                raise
        else:
            self.fail('Defederation processNotificationMsg should have failed.')


class IdentityTestCase(unittest.TestCase):
    def test01(self):
        """Identity newFromDump & dump."""
        return
        # test disabled since dump format changed
        identityDump = """<Identity xmlns="http://www.entrouvert.org/namespaces/lasso/0.0" Version="1"><Federations><Federation xmlns="http://www.entrouvert.org/namespaces/lasso/0.0" Version="1" RemoteProviderID="https://sp1.entrouvert.lan/metadata"><LocalNameIdentifier><saml:NameIdentifier xmlns:saml="urn:oasis:names:tc:SAML:1.0:assertion" NameQualifier="https://proxy2.entrouvert.lan/metadata" Format="urn:liberty:iff:nameid:federated">_CD739B41C602EAEA93626EBD1751CB46</saml:NameIdentifier></LocalNameIdentifier></Federation><Federation xmlns="http://www.entrouvert.org/namespaces/lasso/0.0" Version="1" RemoteProviderID="https://idp1.entrouvert.lan/metadata"><RemoteNameIdentifier><saml:NameIdentifier xmlns:saml="urn:oasis:names:tc:SAML:1.0:assertion" NameQualifier="https://idp1.entrouvert.lan/metadata" Format="urn:liberty:iff:nameid:federated">_11EA77A4FED32C41824AC5DE87298E65</saml:NameIdentifier></RemoteNameIdentifier></Federation></Federations></Identity>"""
        identity = lasso.Identity.newFromDump(identityDump)
        newIdentityDump = identity.dump()
        self.assertEqual(identityDump, newIdentityDump)

class AttributeAuthorityTestCase(unittest.TestCase):
    def test01(self):
        '''Attribute request and response test between sp5 and idp6'''
        s = lasso.Server(
            os.path.join(dataDir, 'sp5-saml2/metadata.xml'),
            os.path.join(dataDir, 'sp5-saml2/private-key.pem'))
        s.addProvider(lasso.PROVIDER_ROLE_ATTRIBUTE_AUTHORITY, os.path.join(dataDir, 'idp6-saml2/metadata.xml'))

        s2 = lasso.Server(
            os.path.join(dataDir, 'idp6-saml2/metadata.xml'),
            os.path.join(dataDir, 'idp6-saml2/private-key.pem'))
        s2.addProvider(lasso.PROVIDER_ROLE_SP, os.path.join(dataDir, 'sp5-saml2/metadata.xml'))

        aq = lasso.AssertionQuery(s)
        rpid = list(s.providers.keys())[0]
        aq.initRequest(rpid, lasso.HTTP_METHOD_SOAP, lasso.ASSERTION_QUERY_REQUEST_TYPE_ATTRIBUTE)
        assert aq.request
        assert aq.remoteProviderId == rpid
        nid = lasso.Saml2NameID.newWithPersistentFormat(
            lasso.buildUniqueId(32),
            s.providerId, s2.providerId)
        aq.nameIdentifier = nid
        aq.addAttributeRequest(lasso.SAML2_ATTRIBUTE_NAME_FORMAT_BASIC, 'testAttribute')
        aq.buildRequestMsg()
        assert aq.msgUrl
        assert aq.msgBody

        aq2 = lasso.AssertionQuery(s2)
        aq2.processRequestMsg(aq.msgBody)
        assert aq.request
        aq2.validateRequest()
        assert aq2.response
        assertion = lasso.Saml2Assertion()
        aq2.response.assertion = (assertion, )
        for attribute in aq2.request.attribute:
            content = lasso.MiscTextNode.newWithString("xxx")
            content.textChild = True
            assertion.addAttributeWithNode(attribute.name, attribute.nameFormat, content)
            assertion.addAttributeWithNode(attribute.name, attribute.nameFormat, content)
        assertion.subject = aq.request.subject
        s2.saml2AssertionSetupSignature(assertion)
        aq2.buildResponseMsg()
        aq.processResponseMsg(aq2.msgBody)
        assert aq.response
        assert aq.response.assertion[0]
        assert aq.response.assertion[0].attributeStatement[0]
        assert aq.response.assertion[0].attributeStatement[0].attribute[0]
        assert aq.response.assertion[0].attributeStatement[0].attribute[0].attributeValue[0]

serverSuite = unittest.defaultTestLoader.loadTestsFromTestCase(ServerTestCase)
loginSuite = unittest.defaultTestLoader.loadTestsFromTestCase(LoginTestCase)
logoutSuite = unittest.defaultTestLoader.loadTestsFromTestCase(LogoutTestCase)
defederationSuite = unittest.defaultTestLoader.loadTestsFromTestCase(DefederationTestCase)
identitySuite = unittest.defaultTestLoader.loadTestsFromTestCase(IdentityTestCase)
attributeSuite = unittest.defaultTestLoader.loadTestsFromTestCase(AttributeAuthorityTestCase)

allTests = unittest.TestSuite((serverSuite, loginSuite, logoutSuite, defederationSuite,
                               identitySuite, attributeSuite))

if __name__ == '__main__':
    sys.exit(not unittest.TextTestRunner(verbosity=2).run(allTests).wasSuccessful())

