1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
|
# Copyright (c) 2010-2022 Belledonne Communications SARL.
#
# This file is part of Liblinphone
# (see https://gitlab.linphone.org/BC/public/liblinphone).
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from nose.tools import assert_equals
import linphone
from linphonetester import *
import os
import time
class RegisterCoreManager(CoreManager):
@classmethod
def authentication_requested(cls, lc, auth_info, method):
info = linphone.Factory.get().create_auth_info(test_username, None, test_password, None, auth_info.realm, auth_info.domain) # Create authentication structure from identity
lc.add_auth_info(info) # Add authentication info to LinphoneCore
def __init__(self, with_auth = False):
additional_cbs = None
if with_auth:
additional_cbs = linphone.Factory.get().create_core_cbs()
additional_cbs.authentication_requested = RegisterCoreManager.authentication_requested
CoreManager.__init__(self, additional_cbs=additional_cbs)
def __del__(self):
linphonetester_logger.info("deleting" + str(self))
def register_with_refresh_base(self, refresh, domain, route, late_auth_info = False, transport = linphone.SipTransports(5070, 5070, 5071, 0), expected_final_state = linphone.RegistrationState.Ok):
assert self.lc is not None
self.stats.reset()
self.lc.sip_transports = transport
proxy_cfg = self.lc.create_proxy_config()
from_address = create_address(domain)
proxy_cfg.identity_address = from_address
server_addr = from_address.domain
proxy_cfg.register_enabled = True
proxy_cfg.expires = 1
if route is None:
proxy_cfg.server_addr = server_addr
else:
proxy_cfg.route = route
proxy_cfg.server_addr = route
self.lc.add_proxy_config(proxy_cfg)
self.lc.default_proxy_config = proxy_cfg
retry = 0
expected_count = 1
if refresh:
expected_count += 1
max_retry = 110
if expected_final_state == linphone.RegistrationState.Progress:
max_retry += 200
while self.stats.number_of_LinphoneRegistrationOk < expected_count and retry < max_retry:
retry += 1
self.lc.iterate()
if self.stats.number_of_auth_info_requested > 0 and proxy_cfg.state == linphone.RegistrationState.Failed and late_auth_info:
if len(self.lc.auth_info_list) == 0:
assert_equals(proxy_cfg.error, linphone.Reason.Unauthorized)
info = linphone.Factory.get().create_auth_info(test_username, None, test_password, None, None, None) # Create authentication structure from identity
self.lc.add_auth_info(info)
if proxy_cfg.error == linphone.Reason.Forbidden or \
(self.stats.number_of_auth_info_requested > 2 and proxy_cfg.error == linphone.Reason.Unauthorized):
break
time.sleep(0.1)
assert_equals(proxy_cfg.state, expected_final_state)
assert_equals(self.stats.number_of_LinphoneRegistrationNone, 0)
assert self.stats.number_of_LinphoneRegistrationProgress >= 1
if expected_final_state == linphone.RegistrationState.Ok:
assert_equals(self.stats.number_of_LinphoneRegistrationOk, expected_count)
expected_failed = 0
if late_auth_info:
expected_failed = 1
assert_equals(self.stats.number_of_LinphoneRegistrationFailed, expected_failed)
else:
assert_equals(self.stats.number_of_LinphoneRegistrationCleared, 0)
def register_with_refresh(self, refresh, domain, route, late_auth_info = False, transport = linphone.SipTransports(5070, 5070, 5071, 0), expected_final_state = linphone.RegistrationState.Ok):
self.register_with_refresh_base(refresh, domain, route, late_auth_info, expected_final_state = expected_final_state)
# Not testable as the callbacks can not be called once the core destruction has started
#assert_equals(self.stats.number_of_LinphoneRegistrationCleared, 1)
class TestRegister:
def teardown(self):
linphone.Factory.clean()
def test_simple_register(self):
cm = RegisterCoreManager()
cm.register_with_refresh(False, None, None)
assert_equals(cm.stats.number_of_auth_info_requested, 0)
def test_simple_unregister(self):
cm = RegisterCoreManager()
cm.register_with_refresh_base(False, None, None)
pc = cm.lc.default_proxy_config
pc.edit()
cm.stats.reset() # clear stats
# nothing is supposed to arrive until done
assert_equals(CoreManager.wait_for_until(cm, cm, lambda cm1, cm2: cm1.stats.number_of_LinphoneRegistrationCleared == 1, 3000), False)
pc.register_enabled = False
pc.done()
assert_equals(CoreManager.wait_for(cm, cm, lambda cm1, cm2: cm1.stats.number_of_LinphoneRegistrationCleared == 1), True)
def test_simple_tcp_register(self):
cm = RegisterCoreManager()
cm.register_with_refresh(False, test_domain, "sip:{route};transport=tcp".format(route=test_route))
def test_simple_tcp_register_compatibility_mode(self):
cm = RegisterCoreManager()
cm.register_with_refresh(False, test_domain, "sip:{route}".format(route=test_route), transport=linphone.SipTransports(0, 5070, 0, 0))
def test_simple_tls_register(self):
cm = RegisterCoreManager()
cm.register_with_refresh(False, test_domain, "sip:{route};transport=tls".format(route=test_route))
def test_tls_register_with_alt_name(self):
cm = CoreManager('pauline_alt_rc', False)
cm.lc.root_ca = os.path.join(tester_resources_path, 'certificates', 'cn', 'cafile.pem')
cm.lc.refresh_registers()
assert_equals(CoreManager.wait_for(cm, cm, lambda cm1, cm2: cm1.stats.number_of_LinphoneRegistrationOk == 1), True)
assert_equals(cm.stats.number_of_LinphoneRegistrationFailed, 0)
def test_tls_wildcard_register(self):
cm = CoreManager('pauline_wild_rc', False)
cm.lc.root_ca = os.path.join(tester_resources_path, 'certificates', 'cn', 'cafile.pem')
cm.lc.refresh_registers()
assert_equals(CoreManager.wait_for(cm, cm, lambda cm1, cm2: cm1.stats.number_of_LinphoneRegistrationOk == 2), True)
assert_equals(cm.stats.number_of_LinphoneRegistrationFailed, 0)
def test_tls_certificate_failure(self):
cm = CoreManager('pauline_rc', False)
cm.lc.root_ca = os.path.join(tester_resources_path, 'certificates', 'cn', 'agent.pem') # bad root ca
cm.lc.network_reachable = True
assert_equals(CoreManager.wait_for(cm, cm, lambda cm1, cm2: cm1.stats.number_of_LinphoneRegistrationFailed == 1), True)
cm.lc.root_ca = None # no root ca
cm.lc.refresh_registers()
assert_equals(CoreManager.wait_for(cm, cm, lambda cm1, cm2: cm1.stats.number_of_LinphoneRegistrationFailed == 2), True)
cm.lc.root_ca = os.path.join(tester_resources_path, 'certificates', 'cn', 'cafile.pem') # good root ca
cm.lc.refresh_registers()
assert_equals(CoreManager.wait_for(cm, cm, lambda cm1, cm2: cm1.stats.number_of_LinphoneRegistrationOk == 1), True)
assert_equals(cm.stats.number_of_LinphoneRegistrationFailed, 2)
def test_tls_with_non_tls_server(self):
cm = CoreManager('marie_rc', False)
cm.lc.sip_transport_timeout = 3000
pc = cm.lc.default_proxy_config
pc.edit()
addr = linphone.Factory.get().create_address(pc.server_addr)
port = addr.port
if port <= 0:
port = 5060
pc.server_addr = "sip:{domain}:{port};transport=tls".format(domain=addr.domain, port=port)
pc.done()
assert_equals(CoreManager.wait_for_until(cm, cm, lambda cm1, cm2: cm1.stats.number_of_LinphoneRegistrationFailed == 1, 10000), True)
def test_simple_authenticated_register(self):
cm = RegisterCoreManager()
info = linphone.Factory.get().create_auth_info(test_username, None, test_password, None, auth_domain, None) # Create authentication structure from identity
cm.lc.add_auth_info(info)
cm.register_with_refresh(False, auth_domain, "sip:{route}".format(route=test_route))
assert_equals(cm.stats.number_of_auth_info_requested, 0)
def test_digest_auth_without_initial_credentials(self):
cm = RegisterCoreManager(with_auth=True)
cm.register_with_refresh(False, auth_domain, "sip:{route}".format(route=test_route))
assert_equals(cm.stats.number_of_auth_info_requested, 1)
def test_authenticated_register_with_late_credentials(self):
cm = RegisterCoreManager()
cm.register_with_refresh(False, auth_domain, "sip:{route}".format(route=test_route), True, linphone.SipTransports(5070, 5070, 5071, 0))
assert_equals(cm.stats.number_of_auth_info_requested, 1)
def test_simple_register_with_refresh(self):
cm = RegisterCoreManager()
cm.register_with_refresh(True, None, None)
assert_equals(cm.stats.number_of_auth_info_requested, 0)
def test_simple_auth_register_with_refresh(self):
cm = RegisterCoreManager(with_auth=True)
cm.register_with_refresh(True, auth_domain, "sip:{route}".format(route=test_route))
assert_equals(cm.stats.number_of_auth_info_requested, 1)
def test_multiple_accounts(self):
CoreManager('multi_account_rc', False)
def test_transport_change(self):
cm = CoreManager('multi_account_rc', True)
number_of_udp_proxies = 0
for p in cm.lc.proxy_config_list:
if p.transport == "udp":
number_of_udp_proxies += 1
total_number_of_proxies = len(cm.lc.proxy_config_list)
register_ok = cm.stats.number_of_LinphoneRegistrationOk
# Keep only UDP
tr = linphone.SipTransports(0, 0, 0, 0)
tr.udp_port = cm.lc.sip_transports.udp_port
cm.lc.sip_transports = tr
assert_equals(CoreManager.wait_for(cm, cm, lambda cm1, cm2: cm1.stats.number_of_LinphoneRegistrationOk == (register_ok + number_of_udp_proxies)), True)
assert_equals(CoreManager.wait_for(cm, cm, lambda cm1, cm2: cm1.stats.number_of_LinphoneRegistrationFailed == (total_number_of_proxies - number_of_udp_proxies)), True)
|