#!/usr/bin/env python3
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4


import libtorrent as lt

import unittest
import time
import datetime
import os
import shutil
import binascii
import subprocess as sub
import sys
import pickle
import threading
import tempfile
import socket
import select
import logging
import ssl
import http.server
import functools

import dummy_data

# include terminal interface for travis parallel executions of scripts which use
# terminal features: fix multiple stdin assignment at termios.tcgetattr
if os.name != 'nt':
    import pty

settings = {
    'alert_mask': lt.alert.category_t.all_categories,
    'enable_dht': False, 'enable_lsd': False, 'enable_natpmp': False,
    'enable_upnp': False, 'listen_interfaces': '0.0.0.0:0', 'file_pool_size': 1}


def has_deprecated():
    return hasattr(lt, 'version')


class test_create_torrent(unittest.TestCase):

    def test_from_torrent_info(self):
        ti = lt.torrent_info('unordered.torrent')
        print(ti.ssl_cert())
        ct = lt.create_torrent(ti)
        entry = ct.generate()
        content = lt.bencode(entry).strip()
        with open('unordered.torrent', 'rb') as f:
            file_content = bytearray(f.read().strip())
            print(content)
            print(file_content)
            print(entry)
            self.assertEqual(content, file_content)

    def test_from_scratch(self):
        fs = lt.file_storage()
        fs.add_file('test/file1', 1000)
        fs.add_file('test/file2', 2000)
        self.assertEqual(fs.file_name(0), 'file1')
        self.assertEqual(fs.file_name(1), 'file2')
        ct = lt.create_torrent(fs, 0, lt.create_torrent.canonical_files_no_tail_padding)
        ct.add_url_seed('foo')
        ct.add_http_seed('bar')
        ct.add_tracker('bar')
        ct.set_root_cert('1234567890')
        ct.add_collection('1337')
        for i in range(ct.num_pieces()):
            ct.set_hash(i, b'abababababababababab')
        entry = ct.generate()
        encoded = lt.bencode(entry)
        print(encoded)

        # zero out the creation date:
        encoded = encoded.split(b'13:creation datei', 1)
        encoded[1] = b'0e' + encoded[1].split(b'e', 1)[1]
        encoded = b'13:creation datei'.join(encoded)

        self.assertEqual(encoded, b'd8:announce3:bar13:creation datei0e9:httpseeds3:bar4:infod11:collectionsl4:1337e5:filesld6:lengthi1000e4:pathl5:file1eed4:attr1:p6:lengthi15384e4:pathl4:.pad5:15384eed6:lengthi2000e4:pathl5:file2eee4:name4:test12:piece lengthi16384e6:pieces40:abababababababababababababababababababab8:ssl-cert10:1234567890e8:url-list3:fooe')


class test_session_stats(unittest.TestCase):

    def test_add_torrent_params(self):
        atp = lt.add_torrent_params()

        for field_name in dir(atp):
            field = getattr(atp, field_name)
            print(field_name, field)

        atp.renamed_files = {}
        atp.merkle_tree = []
        atp.unfinished_pieces = {}
        atp.have_pieces = []
        atp.banned_peers = []
        atp.verified_pieces = []
        atp.piece_priorities = []
        atp.url_seeds = []

    def test_unique(self):
        metrics = lt.session_stats_metrics()
        self.assertTrue(len(metrics) > 40)
        idx = set()
        for m in metrics:
            self.assertTrue(m.value_index not in idx)
            idx.add(m.value_index)

    def test_find_idx(self):
        self.assertEqual(lt.find_metric_idx("peer.error_peers"), 0)


class test_torrent_handle(unittest.TestCase):

    def setup(self):
        self.ses = lt.session(settings)
        self.ti = lt.torrent_info('url_seed_multi.torrent')
        self.h = self.ses.add_torrent({
            'ti': self.ti, 'save_path': os.getcwd(),
            'flags': lt.torrent_flags.default_flags})

    def test_add_torrent_error(self):
        self.ses = lt.session(settings)
        self.ti = lt.torrent_info('url_seed_multi.torrent')
        with self.assertRaises(RuntimeError):
            self.ses.add_torrent({'ti': self.ti, 'save_path': os.getcwd(), 'info_hashes': b'abababababababababab'})

    def test_move_storage(self):
        self.setup()
        self.h.move_storage(u'test-dir')
        self.h.move_storage(b'test-dir2')
        self.h.move_storage('test-dir3')
        self.h.move_storage(u'test-dir', flags=lt.move_flags_t.dont_replace)
        self.h.move_storage(u'test-dir', flags=2)
        self.h.move_storage(b'test-dir2', flags=2)
        self.h.move_storage('test-dir3', flags=2)

    def test_torrent_handle(self):
        self.setup()
        self.assertEqual(self.h.get_file_priorities(), [4, 4])
        self.assertEqual(self.h.get_piece_priorities(), [4])

        self.h.prioritize_files([0, 1])
        # workaround for asynchronous priority update
        time.sleep(1)
        self.assertEqual(self.h.get_file_priorities(), [0, 1])

        self.h.prioritize_pieces([0])
        self.assertEqual(self.h.get_piece_priorities(), [0])

        # also test the overload that takes a list of piece->priority mappings
        self.h.prioritize_pieces([(0, 1)])
        self.assertEqual(self.h.get_piece_priorities(), [1])
        self.h.connect_peer(('127.0.0.1', 6881))
        self.h.connect_peer(('127.0.0.2', 6881), source=4)
        self.h.connect_peer(('127.0.0.3', 6881), flags=2)
        self.h.connect_peer(('127.0.0.4', 6881), flags=2, source=4)

        torrent_files = self.h.torrent_file()
        print(torrent_files.map_file(0, 0, 0).piece)

        print(self.h.queue_position())

    def test_torrent_handle_in_set(self):
        self.setup()
        torrents = set()
        torrents.add(self.h)

        # get another instance of a torrent_handle that represents the same
        # torrent. Make sure that when we add it to a set, it just replaces the
        # existing object
        t = self.ses.get_torrents()
        self.assertEqual(len(t), 1)
        for h in t:
            torrents.add(h)

        self.assertEqual(len(torrents), 1)

    def test_torrent_handle_in_dict(self):
        self.setup()
        torrents = {}
        torrents[self.h] = 'foo'

        # get another instance of a torrent_handle that represents the same
        # torrent. Make sure that when we add it to a dict, it just replaces the
        # existing object
        t = self.ses.get_torrents()
        self.assertEqual(len(t), 1)
        for h in t:
            torrents[h] = 'bar'

        self.assertEqual(len(torrents), 1)
        self.assertEqual(torrents[self.h], 'bar')

    def test_replace_trackers(self):
        self.setup()
        trackers = []
        for idx, tracker_url in enumerate(('udp://tracker1.com', 'udp://tracker2.com')):
            tracker = lt.announce_entry(tracker_url)
            tracker.tier = idx
            tracker.fail_limit = 2
            trackers.append(tracker)
            self.assertEqual(tracker.url, tracker_url)
        self.h.replace_trackers(trackers)
        new_trackers = self.h.trackers()
        self.assertEqual(new_trackers[0]['url'], 'udp://tracker1.com')
        self.assertEqual(new_trackers[1]['tier'], 1)
        self.assertEqual(new_trackers[1]['fail_limit'], 2)

    def test_pickle_trackers(self):
        """Test lt objects converters are working and trackers can be pickled"""
        self.setup()
        tracker = lt.announce_entry('udp://tracker1.com')
        tracker.tier = 0
        tracker.fail_limit = 1
        trackers = [tracker]
        self.h.replace_trackers(trackers)
        # wait a bit until the endpoints list gets populated
        while len(self.h.trackers()[0]['endpoints']) == 0:
            time.sleep(0.1)

        trackers = self.h.trackers()
        self.assertEqual(trackers[0]['url'], 'udp://tracker1.com')
        # this is not necessarily 0, it could also be (EHOSTUNREACH) if the
        # local machine doesn't support the address family
        expect_value = trackers[0]['endpoints'][0]['info_hashes'][0]['last_error']['value']
        pickled_trackers = pickle.dumps(trackers)
        unpickled_trackers = pickle.loads(pickled_trackers)
        self.assertEqual(unpickled_trackers[0]['url'], 'udp://tracker1.com')
        self.assertEqual(unpickled_trackers[0]['endpoints'][0]['info_hashes'][0]['last_error']['value'], expect_value)

    def test_file_status(self):
        self.setup()
        status = self.h.file_status()
        print(status)

    def test_piece_deadlines(self):
        self.setup()
        self.h.clear_piece_deadlines()

    def test_status_last_uploaded_downloaded(self):
        # we want to check at seconds precision but can't control session
        # time, wait for next full second to prevent second increment
        time.sleep(1 - datetime.datetime.now().microsecond / 1000000.0)

        self.setup()
        st = self.h.status()
        for attr in dir(st):
            print('%s: %s' % (attr, getattr(st, attr)))
        # last upload and download times are at session start time
        self.assertEqual(st.last_upload, None)
        self.assertEqual(st.last_download, None)

    def test_serialize_trackers(self):
        """Test to ensure the dict contains only python built-in types"""
        self.setup()
        self.h.add_tracker({'url': 'udp://tracker1.com'})
        tr = self.h.trackers()[0]
        # wait a bit until the endpoints list gets populated
        while len(tr['endpoints']) == 0:
            time.sleep(0.1)
            tr = self.h.trackers()[0]
        import json
        print(json.dumps(self.h.trackers()[0]))

    def test_torrent_status(self):
        self.setup()
        st = self.h.status()
        ti = st.handle
        self.assertEqual(ti.info_hashes(), self.ti.info_hashes())
        # make sure we can compare torrent_status objects
        st2 = self.h.status()
        self.assertEqual(st2, st)
        print(st2)

    def test_read_resume_data(self):

        resume_data = lt.bencode({
            'file-format': 'libtorrent resume file',
            'info-hash': 'abababababababababab',
            'name': 'test',
            'save_path': '.',
            'peers': '\x01\x01\x01\x01\x00\x01\x02\x02\x02\x02\x00\x02',
            'file_priority': [0, 1, 1]})
        tp = lt.read_resume_data(resume_data)

        self.assertEqual(tp.name, 'test')
        self.assertEqual(tp.info_hashes.v1, lt.sha1_hash('abababababababababab'))
        self.assertEqual(tp.file_priorities, [0, 1, 1])
        self.assertEqual(tp.peers, [('1.1.1.1', 1), ('2.2.2.2', 2)])

        ses = lt.session(settings)
        h = ses.add_torrent(tp)
        for attr in dir(tp):
            print('%s: %s' % (attr, getattr(tp, attr)))

        h.connect_peer(('3.3.3.3', 3))

        for i in range(0, 10):
            alerts = ses.pop_alerts()
            for a in alerts:
                print(a.message())
            time.sleep(0.1)

    def test_scrape(self):
        self.setup()
        # this is just to make sure this function can be called like this
        # from python
        self.h.scrape_tracker()

    def test_unknown_torrent_parameter(self):
        self.ses = lt.session(settings)
        try:
            self.h = self.ses.add_torrent({'unexpected-key-name': ''})
            self.assertFalse('should have thrown an exception')
        except KeyError as e:
            print(e)

    def test_torrent_parameter(self):
        self.ses = lt.session(settings)
        self.ti = lt.torrent_info('url_seed_multi.torrent')
        self.h = self.ses.add_torrent({
            'ti': self.ti,
            'save_path': os.getcwd(),
            'trackers': ['http://test.com/announce'],
            'dht_nodes': [('1.2.3.4', 6881), ('4.3.2.1', 6881)],
            'file_priorities': [1, 1],
            'http_seeds': ['http://test.com/file3'],
            'url_seeds': ['http://test.com/announce-url'],
            'peers': [('5.6.7.8', 6881)],
            'banned_peers': [('8.7.6.5', 6881)],
            'renamed_files': {0: 'test.txt', 2: 'test.txt'}
        })
        self.st = self.h.status()
        self.assertEqual(self.st.save_path, os.getcwd())
        trackers = self.h.trackers()
        self.assertEqual(len(trackers), 1)
        self.assertEqual(trackers[0].get('url'), 'http://test.com/announce')
        self.assertEqual(trackers[0].get('tier'), 0)
        self.assertEqual(self.h.get_file_priorities(), [1, 1])
        self.assertEqual(self.h.http_seeds(), ['http://test.com/file3'])
        # url_seeds was already set, test that it did not get overwritten
        self.assertEqual(self.h.url_seeds(),
                         ['http://test.com/announce-url/', 'http://test.com/file/'])
        # piece priorities weren't set explicitly, but they were updated by the
        # file priorities being set
        self.assertEqual(self.h.get_piece_priorities(), [1])
        self.assertEqual(self.st.verified_pieces, [])


class TestAddPiece(unittest.TestCase):

    def setUp(self):
        self.dir = tempfile.TemporaryDirectory()
        self.session = lt.session(settings)
        self.ti = lt.torrent_info(dummy_data.DICT)
        self.atp = lt.add_torrent_params()
        self.atp.ti = self.ti
        self.atp.save_path = self.dir.name
        self.handle = self.session.add_torrent(self.atp)
        self.wait_for(lambda: self.handle.status().state != lt.torrent_status.checking_files
                      and self.handle.status().state != lt.torrent_status.checking_resume_data, msg="checking")

    def wait_for(self, condition, msg="condition", timeout=5):
        deadline = time.time() + timeout
        while not condition():
            self.assertLess(time.time(), deadline, msg="%s timed out" % msg)
            time.sleep(0.1)

    def wait_until_torrent_finished(self):
        self.wait_for(lambda: self.handle.status().progress == 1.0, msg="progress")

        def file_written():
            with open(os.path.join(self.dir.name.encode(), dummy_data.NAME), mode="rb") as f:
                return f.read() == dummy_data.DATA

        self.wait_for(file_written, msg="file write")

    def test_with_str(self):
        for i, data in enumerate(dummy_data.PIECES):
            self.handle.add_piece(i, data.decode(), 0)

        self.wait_until_torrent_finished()

    def test_with_bytes(self):
        for i, data in enumerate(dummy_data.PIECES):
            self.handle.add_piece(i, data, 0)

        self.wait_until_torrent_finished()


class test_load_torrent(unittest.TestCase):

    def test_bytearray(self):
        # a bytearray object is interpreted as a bencoded buffer
        atp = lt.load_torrent_buffer(bytearray(lt.bencode({'info': {
            'name': 'test_torrent', 'length': 1234,
            'piece length': 16 * 1024,
            'pieces': 'aaaaaaaaaaaaaaaaaaaa'}})))
        self.assertEqual(atp.ti.num_files(), 1)

    def test_bytes(self):
        # a bytes object is interpreted as a bencoded buffer
        atp = lt.load_torrent_buffer(bytes(lt.bencode({'info': {
            'name': 'test_torrent', 'length': 1234,
            'piece length': 16 * 1024,
            'pieces': 'aaaaaaaaaaaaaaaaaaaa'}})))
        self.assertEqual(atp.ti.num_files(), 1)

    def test_info_section(self):
        atp = lt.load_torrent_file('base.torrent')

        self.assertTrue(len(atp.ti.info_section()) != 0)
        self.assertTrue(len(atp.ti.hash_for_piece(0)) != 0)

class test_torrent_info(unittest.TestCase):

    def test_non_ascii_file(self):
        try:
            shutil.copy('base.torrent', 'base-\u745E\u5177.torrent')
        except shutil.SameFileError:
            pass
        ti = lt.torrent_info('base-\u745E\u5177.torrent')

        self.assertTrue(len(ti.info_section()) != 0)
        self.assertTrue(len(ti.hash_for_piece(0)) != 0)

    def test_bencoded_constructor(self):
        # things that can be converted to a bencoded entry, will be interpreted
        # as such and encoded
        info = lt.torrent_info({'info': {
            'name': 'test_torrent', 'length': 1234,
            'piece length': 16 * 1024,
            'pieces': 'aaaaaaaaaaaaaaaaaaaa'}})

        self.assertEqual(info.num_files(), 1)

        f = info.files()
        self.assertEqual(f.file_path(0), 'test_torrent')
        self.assertEqual(f.file_name(0), 'test_torrent')
        self.assertEqual(f.file_size(0), 1234)
        self.assertEqual(info.total_size(), 1234)
        self.assertEqual(info.creation_date(), 0)

    def test_bytearray(self):
        # a bytearray object is interpreted as a bencoded buffer
        info = lt.torrent_info(bytearray(lt.bencode({'info': {
            'name': 'test_torrent', 'length': 1234,
            'piece length': 16 * 1024,
            'pieces': 'aaaaaaaaaaaaaaaaaaaa'}})))
        self.assertEqual(info.num_files(), 1)

    def test_bytes(self):
        # a bytes object is interpreted as a bencoded buffer
        info = lt.torrent_info(bytes(lt.bencode({'info': {
            'name': 'test_torrent', 'length': 1234,
            'piece length': 16 * 1024,
            'pieces': 'aaaaaaaaaaaaaaaaaaaa'}})))
        self.assertEqual(info.num_files(), 1)

    def test_load_decode_depth_limit(self):
        self.assertRaises(RuntimeError, lambda: lt.torrent_info(
            {'test': {'test': {'test': {'test': {'test': {}}}}}, 'info': {
                'name': 'test_torrent', 'length': 1234,
                'piece length': 16 * 1024,
                'pieces': 'aaaaaaaaaaaaaaaaaaaa'}}, {'max_decode_depth': 1}))

    def test_load_max_pieces_limit(self):
        self.assertRaises(RuntimeError, lambda: lt.torrent_info(
            {'info': {
                'name': 'test_torrent', 'length': 1234000,
                'piece length': 16 * 1024,
                'pieces': 'aaaaaaaaaaaaaaaaaaaa'}}, {'max_pieces': 1}))

    def test_load_max_buffer_size_limit(self):
        self.assertRaises(RuntimeError, lambda: lt.torrent_info(
            {'info': {
                'name': 'test_torrent', 'length': 1234000,
                'piece length': 16 * 1024,
                'pieces': 'aaaaaaaaaaaaaaaaaaaa'}}, {'max_buffer_size': 1}))

    def test_info_section(self):
        ti = lt.torrent_info('base.torrent')

        self.assertTrue(len(ti.info_section()) != 0)
        self.assertTrue(len(ti.hash_for_piece(0)) != 0)

    def test_torrent_info_bytes_overload(self):
        # bytes will never be interpreted as a file name. It's interpreted as a
        # bencoded buffer
        with self.assertRaises(RuntimeError):
            ti = lt.torrent_info(b'base.torrent')

    def test_web_seeds(self):
        ti = lt.torrent_info('base.torrent')

        ws = [{'url': 'http://foo/test', 'auth': '', 'type': 0},
              {'url': 'http://bar/test', 'auth': '', 'type': 1}]
        ti.set_web_seeds(ws)
        web_seeds = ti.web_seeds()
        self.assertEqual(len(ws), len(web_seeds))
        for i in range(len(web_seeds)):
            self.assertEqual(web_seeds[i]["url"], ws[i]["url"])
            self.assertEqual(web_seeds[i]["auth"], ws[i]["auth"])
            self.assertEqual(web_seeds[i]["type"], ws[i]["type"])

    def test_announce_entry(self):
        ae = lt.announce_entry('test')
        self.assertEqual(ae.url, 'test')
        self.assertEqual(ae.tier, 0)
        self.assertEqual(ae.verified, False)
        self.assertEqual(ae.source, 0)

    def test_torrent_info_sha1_overload(self):
        ti = lt.torrent_info(lt.info_hash_t(lt.sha1_hash(b'a' * 20)))
        self.assertEqual(ti.info_hash(), lt.sha1_hash(b'a' * 20))
        self.assertEqual(ti.info_hashes().v1, lt.sha1_hash(b'a' * 20))

        ti_copy = lt.torrent_info(ti)
        self.assertEqual(ti_copy.info_hash(), lt.sha1_hash(b'a' * 20))
        self.assertEqual(ti_copy.info_hashes().v1, lt.sha1_hash(b'a' * 20))

    def test_torrent_info_sha256_overload(self):
        ti = lt.torrent_info(lt.info_hash_t(lt.sha256_hash(b'a' * 32)))
        self.assertEqual(ti.info_hashes().v2, lt.sha256_hash(b'a' * 32))

        ti_copy = lt.torrent_info(ti)
        self.assertEqual(ti_copy.info_hashes().v2, lt.sha256_hash(b'a' * 32))

    def test_url_seed(self):
        ti = lt.torrent_info('base.torrent')

        ti.add_tracker('foobar1')
        ti.add_url_seed('foobar2')
        ti.add_url_seed('foobar3', 'username:password')
        ti.add_url_seed('foobar4', 'username:password', [])

        seeds = ti.web_seeds()
        self.assertEqual(seeds, [
            {'url': 'foobar2', 'type': 0, 'auth': ''},
            {'url': 'foobar3', 'type': 0, 'auth': 'username:password'},
            {'url': 'foobar4', 'type': 0, 'auth': 'username:password'},
        ])

    def test_http_seed(self):
        ti = lt.torrent_info('base.torrent')

        ti.add_http_seed('foobar2')
        ti.add_http_seed('foobar3', 'username:password')
        ti.add_http_seed('foobar4', 'username:password', [])

        seeds = ti.web_seeds()
        self.assertEqual(seeds, [
            {'url': 'foobar2', 'type': 1, 'auth': ''},
            {'url': 'foobar3', 'type': 1, 'auth': 'username:password'},
            {'url': 'foobar4', 'type': 1, 'auth': 'username:password'},
        ])

class test_alerts(unittest.TestCase):

    def test_alert(self):

        ses = lt.session(settings)
        ti = lt.torrent_info('base.torrent')
        h = ses.add_torrent({'ti': ti, 'save_path': os.getcwd()})
        st = h.status()
        time.sleep(1)
        ses.remove_torrent(h)
        ses.wait_for_alert(1000)  # milliseconds
        alerts = ses.pop_alerts()
        for a in alerts:
            if a.what() == 'add_torrent_alert':
                self.assertEqual(a.torrent_name, 'temp')
            print(a.message())
            for field_name in dir(a):
                if field_name.startswith('__'):
                    continue
                field = getattr(a, field_name)
                if callable(field):
                    print('  ', field_name, ' = ', field())
                else:
                    print('  ', field_name, ' = ', field)

        print(st.next_announce)
        self.assertEqual(st.name, 'temp')
        print(st.errc.message())
        print(st.pieces)
        print(st.last_seen_complete)
        print(st.completed_time)
        print(st.progress)
        print(st.num_pieces)
        print(st.distributed_copies)
        print(st.info_hashes)
        print(st.seeding_duration)
        print(st.last_upload)
        print(st.last_download)
        self.assertEqual(st.save_path, os.getcwd())

    def test_alert_fs(self):
        ses = lt.session(settings)
        s1, s2 = socket.socketpair()
        ses.set_alert_fd(s2.fileno())

        ses.pop_alerts()

        # make sure there's an alert to wake us up
        ses.post_session_stats()

        read_sockets, write_sockets, error_sockets = select.select([s1], [], [])

        self.assertEqual(len(read_sockets), 1)
        for s in read_sockets:
            s.recv(10)

    def test_pop_alerts(self):
        ses = lt.session(settings)
        ses.async_add_torrent(
            {"ti": lt.torrent_info("base.torrent"), "save_path": "."})

# this will cause an error (because of duplicate torrents) and the
# torrent_info object created here will be deleted once the alert goes out
# of scope. When that happens, it will decrement the python object, to allow
# it to release the object.
# we're trying to catch the error described in this post, with regards to
# torrent_info.
# https://mail.python.org/pipermail/cplusplus-sig/2007-June/012130.html
        ses.async_add_torrent(
            {"ti": lt.torrent_info("base.torrent"), "save_path": "."})
        time.sleep(1)
        for i in range(0, 10):
            alerts = ses.pop_alerts()
            for a in alerts:
                print(a.message())
            time.sleep(0.1)

    def test_alert_notify(self):
        ses = lt.session(settings)
        event = threading.Event()

        def callback():
            event.set()

        ses.set_alert_notify(callback)
        ses.async_add_torrent(
            {"ti": lt.torrent_info("base.torrent"), "save_path": "."})
        event.wait()


class test_bencoder(unittest.TestCase):

    def test_bencode(self):
        encoded = lt.bencode({'a': 1, 'b': [1, 2, 3], 'c': 'foo'})
        self.assertEqual(encoded, b'd1:ai1e1:bli1ei2ei3ee1:c3:fooe')

    def test_bdecode(self):
        encoded = b'd1:ai1e1:bli1ei2ei3ee1:c3:fooe'
        decoded = lt.bdecode(encoded)
        self.assertEqual(decoded, {b'a': 1, b'b': [1, 2, 3], b'c': b'foo'})

    def test_string(self):
        encoded = lt.bencode('foo\u00e5\u00e4\u00f6')
        self.assertEqual(encoded, b'9:foo\xc3\xa5\xc3\xa4\xc3\xb6')

    def test_bytes(self):
        encoded = lt.bencode(b'foo')
        self.assertEqual(encoded, b'3:foo')

    def test_float(self):
        # TODO: this should throw a TypeError in the future
        with self.assertWarns(DeprecationWarning):
            encoded = lt.bencode(1.337)
            self.assertEqual(encoded, b'0:')

    def test_object(self):
        class FooBar:
            dummy = 1

        # TODO: this should throw a TypeError in the future
        with self.assertWarns(DeprecationWarning):
            encoded = lt.bencode(FooBar())
            self.assertEqual(encoded, b'0:')

    def test_preformatted(self):
        encoded = lt.bencode((1, 2, 3, 4, 5))
        self.assertEqual(encoded, b'\x01\x02\x03\x04\x05')

class test_sha1hash(unittest.TestCase):

    def test_sha1hash(self):
        h = 'a0' * 20
        s = lt.sha1_hash(binascii.unhexlify(h))
        self.assertEqual(h, str(s))

    def test_hash(self):
        self.assertNotEqual(hash(lt.sha1_hash(b'b' * 20)), hash(lt.sha1_hash(b'a' * 20)))
        self.assertEqual(hash(lt.sha1_hash(b'b' * 20)), hash(lt.sha1_hash(b'b' * 20)))

class test_sha256hash(unittest.TestCase):

    def test_sha1hash(self):
        h = 'a0' * 32
        s = lt.sha256_hash(binascii.unhexlify(h))
        self.assertEqual(h, str(s))

    def test_hash(self):
        self.assertNotEqual(hash(lt.sha256_hash(b'b' * 32)), hash(lt.sha256_hash(b'a' * 32)))
        self.assertEqual(hash(lt.sha256_hash(b'b' * 32)), hash(lt.sha256_hash(b'b' * 32)))

class test_info_hash(unittest.TestCase):

    def test_info_hash(self):
        s1 = lt.sha1_hash(b'a' * 20)
        s2 = lt.sha256_hash(b'b' * 32)

        ih1 = lt.info_hash_t(s1);
        self.assertTrue(ih1.has_v1())
        self.assertFalse(ih1.has_v2())
        self.assertEqual(ih1.v1, s1)

        ih2 = lt.info_hash_t(s2);
        self.assertFalse(ih2.has_v1())
        self.assertTrue(ih2.has_v2())
        self.assertEqual(ih2.v2, s2)

        ih12 = lt.info_hash_t(s1, s2);
        self.assertTrue(ih12.has_v1())
        self.assertTrue(ih12.has_v2())
        self.assertEqual(ih12.v1, s1)
        self.assertEqual(ih12.v2, s2)

        self.assertNotEqual(hash(ih1), hash(ih2))
        self.assertNotEqual(hash(ih1), hash(ih12))
        self.assertEqual(hash(ih1), hash(lt.info_hash_t(s1)))
        self.assertEqual(hash(ih2), hash(lt.info_hash_t(s2)))
        self.assertEqual(hash(ih12), hash(lt.info_hash_t(s1, s2)))

class test_magnet_link(unittest.TestCase):

    def test_parse_magnet_uri(self):
        ses = lt.session({})
        magnet = 'magnet:?xt=urn:btih:C6EIF4CCYDBTIJVG3APAGM7M4NDONCTI'
        p = lt.parse_magnet_uri(magnet)
        self.assertEqual(str(p.info_hashes.v1), '178882f042c0c33426a6d81e0333ece346e68a68')
        p.save_path = '.'
        h = ses.add_torrent(p)
        self.assertEqual(str(h.info_hash()), '178882f042c0c33426a6d81e0333ece346e68a68')
        self.assertEqual(str(h.info_hashes().v1), '178882f042c0c33426a6d81e0333ece346e68a68')

    def test_parse_magnet_uri_dict(self):
        ses = lt.session({})
        magnet = 'magnet:?xt=urn:btih:C6EIF4CCYDBTIJVG3APAGM7M4NDONCTI'
        p = lt.parse_magnet_uri_dict(magnet)
        self.assertEqual(binascii.hexlify(p['info_hashes']), b'178882f042c0c33426a6d81e0333ece346e68a68')
        p['save_path'] = '.'
        h = ses.add_torrent(p)
        self.assertEqual(str(h.info_hash()), '178882f042c0c33426a6d81e0333ece346e68a68')
        self.assertEqual(str(h.info_hashes().v1), '178882f042c0c33426a6d81e0333ece346e68a68')

    def test_add_deprecated_magnet_link(self):
        ses = lt.session()
        atp = lt.add_torrent_params()
        atp.info_hashes = lt.info_hash_t(lt.sha1_hash(b"a" * 20))
        atp.save_path = "."
        h = ses.add_torrent(atp)

        self.assertTrue(h.status().info_hashes == lt.info_hash_t(lt.sha1_hash(b"a" * 20)))

    def test_add_magnet_link(self):
        ses = lt.session()
        atp = lt.add_torrent_params()
        atp.save_path = "."
        atp.info_hash = lt.sha1_hash(b"a" * 20)
        h = ses.add_torrent(atp)

        self.assertTrue(h.status().info_hashes == lt.info_hash_t(lt.sha1_hash(b"a" * 20)))


class test_peer_class(unittest.TestCase):

    def test_peer_class_ids(self):
        s = lt.session(settings)

        print('global_peer_class_id:', lt.session.global_peer_class_id)
        print('tcp_peer_class_id:', lt.session.tcp_peer_class_id)
        print('local_peer_class_id:', lt.session.local_peer_class_id)

        print('global: ', s.get_peer_class(s.global_peer_class_id))
        print('tcp: ', s.get_peer_class(s.local_peer_class_id))
        print('local: ', s.get_peer_class(s.local_peer_class_id))

    def test_peer_class(self):
        s = lt.session(settings)

        c = s.create_peer_class('test class')
        print('new class: ', s.get_peer_class(c))

        nfo = s.get_peer_class(c)
        self.assertEqual(nfo['download_limit'], 0)
        self.assertEqual(nfo['upload_limit'], 0)
        self.assertEqual(nfo['ignore_unchoke_slots'], False)
        self.assertEqual(nfo['connection_limit_factor'], 100)
        self.assertEqual(nfo['download_priority'], 1)
        self.assertEqual(nfo['upload_priority'], 1)
        self.assertEqual(nfo['label'], 'test class')

        nfo['download_limit'] = 1337
        nfo['upload_limit'] = 1338
        nfo['ignore_unchoke_slots'] = True
        nfo['connection_limit_factor'] = 42
        nfo['download_priority'] = 2
        nfo['upload_priority'] = 3

        s.set_peer_class(c, nfo)

        nfo2 = s.get_peer_class(c)
        self.assertEqual(nfo, nfo2)

    def test_peer_class_filter(self):
        filt = lt.peer_class_type_filter()
        filt.add(lt.peer_class_type_filter.tcp_socket, lt.session.global_peer_class_id)
        filt.remove(lt.peer_class_type_filter.utp_socket, lt.session.local_peer_class_id)

        filt.disallow(lt.peer_class_type_filter.tcp_socket, lt.session.global_peer_class_id)
        filt.allow(lt.peer_class_type_filter.utp_socket, lt.session.local_peer_class_id)

    def test_peer_class_ip_filter(self):
        s = lt.session(settings)
        s.set_peer_class_type_filter(lt.peer_class_type_filter())
        s.set_peer_class_filter(lt.ip_filter())

class test_ip_filter(unittest.TestCase):

    def test_export(self):

        f = lt.ip_filter()
        self.assertEqual(f.access('1.1.1.1'), 0)
        f.add_rule('1.1.1.1', '1.1.1.2', 1)
        self.assertEqual(f.access('1.1.1.0'), 0)
        self.assertEqual(f.access('1.1.1.1'), 1)
        self.assertEqual(f.access('1.1.1.2'), 1)
        self.assertEqual(f.access('1.1.1.3'), 0)
        exp = f.export_filter()
        self.assertEqual(exp, ([('0.0.0.0', '1.1.1.0'), ('1.1.1.1', '1.1.1.2'), ('1.1.1.3', '255.255.255.255')], [('::', 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff')]))

class test_session(unittest.TestCase):

    def test_settings(self):
        sett = { 'alert_mask': lt.alert.category_t.all_categories }
        s = lt.session(sett)
        sett = s.get_settings()
        self.assertEqual(sett['alert_mask'] & 0x7fffffff, 0x7fffffff)

    def test_session_params(self):
        sp = lt.session_params()
        sp.settings = { 'alert_mask': lt.alert.category_t.all_categories }
        s = lt.session(sp)
        sett = s.get_settings()
        self.assertEqual(sett['alert_mask'] & 0x7fffffff, 0x7fffffff)

    def test_session_params_constructor(self):
        sp = lt.session_params({ 'alert_mask': lt.alert.category_t.all_categories })
        s = lt.session(sp)
        sett = s.get_settings()
        self.assertEqual(sett['alert_mask'] & 0x7fffffff, 0x7fffffff)

    def test_session_params_ip_filter(self):
        sp = lt.session_params()
        sp.ip_filter.add_rule("1.1.1.1", "1.1.1.2", 1337)
        self.assertEqual(sp.ip_filter.access("1.1.1.1"), 1337)
        self.assertEqual(sp.ip_filter.access("1.1.1.2"), 1337)
        self.assertEqual(sp.ip_filter.access("1.1.1.3"), 0)

    def test_session_params_roundtrip_buf(self):

        sp = lt.session_params()
        sp.settings = { 'alert_mask': lt.alert.category_t.all_categories }

        buf = lt.write_session_params_buf(sp)
        sp2 = lt.read_session_params(buf)
        self.assertEqual(sp2.settings['alert_mask'] & 0x7fffffff, 0x7fffffff)

    def test_session_params_roundtrip_entry(self):

        sp = lt.session_params()
        sp.settings = { 'alert_mask': lt.alert.category_t.all_categories }

        ent = lt.write_session_params(sp)
        print(ent)
        sp2 = lt.read_session_params(ent)
        self.assertEqual(sp2.settings['alert_mask'] & 0x7fffffff, 0x7fffffff)

    def test_add_torrent(self):
        s = lt.session(settings)
        h = s.add_torrent({'ti': lt.torrent_info('base.torrent'),
                       'save_path': '.',
                       'dht_nodes': [('1.2.3.4', 6881), ('4.3.2.1', 6881)],
                       'http_seeds': ['http://test.com/seed'],
                       'peers': [('5.6.7.8', 6881)],
                       'banned_peers': [('8.7.6.5', 6881)],
                       'file_priorities': [1, 1, 1, 2, 0]})

    def test_find_torrent(self):
        s = lt.session(settings)
        h = s.add_torrent({'info_hash': b"a" * 20,
                           'save_path': '.'})
        self.assertTrue(h.is_valid())

        h2 = s.find_torrent(lt.sha1_hash(b"a" * 20))
        self.assertTrue(h2.is_valid())
        h3 = s.find_torrent(lt.sha1_hash(b"b" * 20))
        self.assertFalse(h3.is_valid())

        self.assertEqual(h, h2)
        self.assertNotEqual(h, h3)

    def test_add_torrent_info_hash(self):
        s = lt.session(settings)
        h = s.add_torrent({
                           'info_hash': b'a' * 20,
                           'info_hashes': b'a' * 32,
                           'save_path': '.'})

        time.sleep(1)
        alerts = s.pop_alerts()

        while len(alerts) > 0:
            a = alerts.pop(0)
            print(a)

        self.assertTrue(h.is_valid())
        self.assertEqual(h.status().info_hashes, lt.info_hash_t(lt.sha256_hash(b'a' * 32)))

    def test_session_status(self):
        if not has_deprecated():
            return

        s = lt.session()
        st = s.status()
        print(st)
        print(st.active_requests)
        print(st.dht_nodes)
        print(st.dht_node_cache)
        print(st.dht_torrents)
        print(st.dht_global_nodes)
        print(st.dht_total_allocations)

    def test_apply_settings(self):

        s = lt.session(settings)
        s.apply_settings({'num_want': 66, 'user_agent': 'test123'})
        self.assertEqual(s.get_settings()['num_want'], 66)
        self.assertEqual(s.get_settings()['user_agent'], 'test123')

    def test_post_session_stats(self):
        s = lt.session({'alert_mask': 0, 'enable_dht': False})
        s.post_session_stats()
        alerts = []
        # first the stats headers log line. but not if logging is disabled
        while len(alerts) == 0:
            s.wait_for_alert(1000)
            alerts = s.pop_alerts()

        while len(alerts) > 0:
            a = alerts.pop(0)
            print(a)
            if isinstance(a, lt.session_stats_header_alert):
                break
        self.assertTrue(isinstance(a, lt.session_stats_header_alert))
        # then the actual stats values
        while len(alerts) == 0:
            s.wait_for_alert(1000)
            alerts = s.pop_alerts()
        a = alerts.pop(0)
        print(a)
        self.assertTrue(isinstance(a, lt.session_stats_alert))
        self.assertTrue(isinstance(a.values, dict))
        self.assertTrue(len(a.values) > 0)

    def test_post_dht_stats(self):
        s = lt.session({'alert_mask': 0, 'enable_dht': False})
        s.post_dht_stats()
        alerts = []
        cnt = 0
        while len(alerts) == 0:
            s.wait_for_alert(1000)
            alerts = s.pop_alerts()
            cnt += 1
            if cnt > 60:
                print('no dht_stats_alert in 1 minute!')
                sys.exit(1)
        a = alerts.pop(0)
        self.assertTrue(isinstance(a, lt.dht_stats_alert))
        self.assertTrue(isinstance(a.active_requests, list))
        self.assertTrue(isinstance(a.routing_table, list))

    def test_unknown_settings(self):
        try:
            lt.session({'unexpected-key-name': 42})
            self.assertFalse('should have thrown an exception')
        except KeyError as e:
            print(e)

    def test_fingerprint(self):
        self.assertEqual(lt.generate_fingerprint('LT', 0, 1, 2, 3), '-LT0123-')
        self.assertEqual(lt.generate_fingerprint('..', 10, 1, 2, 3), '-..A123-')

    def test_min_memory_preset(self):
        min_mem = lt.min_memory_usage()
        print(min_mem)

        self.assertTrue('allow_idna' not in min_mem)
        self.assertTrue('connection_speed' in min_mem)
        self.assertTrue('file_pool_size' in min_mem)

    def test_seed_mode_preset(self):
        seed_mode = lt.high_performance_seed()
        print(seed_mode)

        self.assertTrue('alert_queue_size' in seed_mode)
        self.assertTrue('connection_speed' in seed_mode)
        self.assertTrue('file_pool_size' in seed_mode)

    def test_default_settings(self):

        default = lt.default_settings()
        print(default)


class test_example_client(unittest.TestCase):

    def test_execute_client(self):
        if os.name == 'nt':
            # TODO: fix windows includes of client.py
            return
        my_stdin = sys.stdin
        if os.name != 'nt':
            master_fd, slave_fd = pty.openpty()
            # slave_fd fix multiple stdin assignment at termios.tcgetattr
            my_stdin = slave_fd

        process = sub.Popen(
            [sys.executable, "client.py", "url_seed_multi.torrent"],
            stdin=my_stdin, stdout=sub.PIPE, stderr=sub.PIPE)
        # python2 has no Popen.wait() timeout
        time.sleep(5)
        returncode = process.poll()
        if returncode is None:
            # this is an expected use-case
            process.kill()
        err = process.stderr.read().decode("utf-8")
        self.assertEqual('', err, 'process throw errors: \n' + err)
        # check error code if process did unexpected end
        if returncode is not None:
            # in case of error return: output stdout if nothing was on stderr
            if returncode != 0:
                print("stdout:\n" + process.stdout.read().decode("utf-8"))
            self.assertEqual(returncode, 0, "returncode: " + str(returncode) + "\n"
                             + "stderr: empty\n"
                             + "some configuration does not output errors like missing module members,"
                             + "try to call it manually to get the error message\n")

    def test_execute_simple_client(self):
        process = sub.Popen(
            [sys.executable, "simple_client.py", "url_seed_multi.torrent"],
            stdout=sub.PIPE, stderr=sub.PIPE)
        # python2 has no Popen.wait() timeout
        time.sleep(5)
        returncode = process.poll()
        if returncode is None:
            # this is an expected use-case
            process.kill()
        err = process.stderr.read().decode("utf-8")
        self.assertEqual('', err, 'process throw errors: \n' + err)
        # check error code if process did unexpected end
        if returncode is not None:
            # in case of error return: output stdout if nothing was on stderr
            if returncode != 0:
                print("stdout:\n" + process.stdout.read().decode("utf-8"))
            self.assertEqual(returncode, 0, "returncode: " + str(returncode) + "\n"
                             + "stderr: empty\n"
                             + "some configuration does not output errors like missing module members,"
                             + "try to call it manually to get the error message\n")

    def test_execute_make_torrent(self):
        process = sub.Popen(
            [sys.executable, "make_torrent.py", "url_seed_multi.torrent",
             "http://test.com/test"], stdout=sub.PIPE, stderr=sub.PIPE)
        returncode = process.wait()
        # python2 has no Popen.wait() timeout
        err = process.stderr.read().decode("utf-8")
        self.assertEqual('', err, 'process throw errors: \n' + err)
        # in case of error return: output stdout if nothing was on stderr
        if returncode != 0:
            print("stdout:\n" + process.stdout.read().decode("utf-8"))
        self.assertEqual(returncode, 0, "returncode: " + str(returncode) + "\n"
                         + "stderr: empty\n"
                         + "some configuration does not output errors like missing module members,"
                         + "try to call it manually to get the error message\n")

    def test_default_settings(self):

        default = lt.default_settings()
        self.assertNotIn('', default)
        print(default)


class test_operation_t(unittest.TestCase):

    def test_enum(self):
        self.assertEqual(lt.operation_name(lt.operation_t.sock_accept), "sock_accept")
        self.assertEqual(lt.operation_name(lt.operation_t.unknown), "unknown")
        self.assertEqual(lt.operation_name(lt.operation_t.mkdir), "mkdir")
        self.assertEqual(lt.operation_name(lt.operation_t.partfile_write), "partfile_write")
        self.assertEqual(lt.operation_name(lt.operation_t.hostname_lookup), "hostname_lookup")


class test_error_code(unittest.TestCase):

    def test_error_code(self):

        a = lt.error_code()
        a = lt.error_code(10, lt.libtorrent_category())
        self.assertEqual(a.category().name(), 'libtorrent')

        self.assertEqual(lt.libtorrent_category().name(), 'libtorrent')
        self.assertEqual(lt.upnp_category().name(), 'upnp')
        self.assertEqual(lt.http_category().name(), 'http')
        self.assertEqual(lt.socks_category().name(), 'socks')
        self.assertEqual(lt.bdecode_category().name(), 'bdecode')
        self.assertEqual(lt.generic_category().name(), 'generic')
        self.assertEqual(lt.system_category().name(), 'system')


class test_peer_info(unittest.TestCase):

    def test_peer_info_members(self):

        p = lt.peer_info()

        print(p.client)
        print(p.pieces)
        print(p.pieces)
        print(p.last_request)
        print(p.last_active)
        print(p.flags)
        print(p.source)
        print(p.pid)
        print(p.downloading_piece_index)
        print(p.ip)
        print(p.local_endpoint)
        print(p.read_state)
        print(p.write_state)


class test_dht_settings(unittest.TestCase):

    def test_dht_get_peers(self):
        session = lt.session();
        info_hash = lt.sha1_hash(b"a" * 20)
        session.dht_get_peers(info_hash);

    def test_construct(self):

        ds = lt.dht_settings()

        print(ds.max_peers_reply)
        print(ds.search_branching)
        print(ds.max_fail_count)
        print(ds.max_fail_count)
        print(ds.max_torrents)
        print(ds.max_dht_items)
        print(ds.restrict_routing_ips)
        print(ds.restrict_search_ips)
        print(ds.max_torrent_search_reply)
        print(ds.extended_routing_table)
        print(ds.aggressive_lookups)
        print(ds.privacy_lookups)
        print(ds.enforce_node_id)
        print(ds.ignore_dark_internet)
        print(ds.block_timeout)
        print(ds.block_ratelimit)
        print(ds.read_only)
        print(ds.item_lifetime)


def get_isolated_settings():
    return {
        "enable_dht": False,
        "enable_lsd": False,
        "enable_natpmp": False,
        "enable_upnp": False,
        "listen_interfaces": "127.0.0.1:0",
        "dht_bootstrap_nodes": "",
    }


def loop_until_timeout(timeout, msg="condition"):
    deadline = time.monotonic() + timeout
    while time.monotonic() < deadline:
        yield
    raise AssertionError(f"{msg} timed out")


def unlink_all_files(path):
    for dirpath, _, filenames in os.walk(path):
        for filename in filenames:
            filepath = os.path.join(dirpath, filename)
            os.unlink(filepath)


# In test cases where libtorrent writes torrent data in a temporary directory,
# cleaning up the tempdir on Windows CI sometimes fails with a PermissionError
# having WinError 5 (Access Denied). I can't repro this WinError in any way;
# holding an open file handle results in a different WinError. Seems to be a
# race condition which only happens with very short-lived tests which write
# data. Work around by cleaning up the tempdir in a loop.

# TODO: why is this necessary?
def cleanup_with_windows_fix(tempdir, *, timeout):
    # Clean up just the files, so we don't have to bother with depth-first
    # traversal
    for _ in loop_until_timeout(timeout, msg="PermissionError clear"):
        try:
            unlink_all_files(tempdir.name)
        except PermissionError:
            if sys.platform == "win32":
                # current release of mypy doesn't know about winerror
                # if exc.winerror == 5:
                continue
            raise
        break
    # This removes directories in depth-first traversal.
    # It also marks the tempdir as explicitly cleaned so it doesn't trigger a
    # ResourceWarning.
    tempdir.cleanup()


def wait_for(session, alert_type, *, timeout, prefix=None):
    # Return the first alert of type, but log all alerts.
    result = None
    for _ in loop_until_timeout(timeout, msg=alert_type.__name__):
        for alert in session.pop_alerts():
            print(f"{alert.what()}: {alert.message()}")
            if result is None and isinstance(alert, alert_type):
                result = alert
        if result is not None:
            return result
    raise AssertionError("unreachable")


class LambdaRequestHandler(http.server.BaseHTTPRequestHandler):
    default_request_version = "HTTP/1.1"

    def __init__(self, get_data, *args, **kwargs):
        self.get_data = get_data
        super().__init__(*args, **kwargs)

    def do_GET(self):
        print(f"mock tracker request: {self.requestline}")
        data = self.get_data()
        self.send_response(200)
        self.send_header("Content-Type", "application/octet-stream")
        self.send_header("Content-Length", str(len(data)))
        self.end_headers()
        self.wfile.write(data)


class SSLTrackerAlertTest(unittest.TestCase):

    def setUp(self):
        self.cert_path = os.path.realpath(os.path.join(
            os.path.dirname(__file__), "..", "..", "test", "ssl", "server.pem"
        ))
        print(f"cert_path = {self.cert_path}")

        self.tracker_response = {
            b"external ip": b"\x01\x02\x03\x04",
        }
        self.tracker = http.server.HTTPServer(
            ("127.0.0.1", 0),
            functools.partial(
                LambdaRequestHandler, lambda: lt.bencode(self.tracker_response)
            ),
        )
        self.ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
        self.ctx.load_cert_chain(self.cert_path)
        self.tracker.socket = self.ctx.wrap_socket(
            self.tracker.socket, server_side=True
        )
        self.tracker_thread = threading.Thread(target=self.tracker.serve_forever)
        self.tracker_thread.start()
        # HTTPServer.server_name seems to resolve to things like
        # "localhost.localdomain"
        port = self.tracker.server_port
        self.tracker_url = f"https://127.0.0.1:{port}/announce"
        print(f"mock tracker url = {self.tracker_url}")

        self.settings = get_isolated_settings()
        self.settings["alert_mask"] = lt.alert_category.status
        # I couldn't get validation to work on all platforms. Setting
        # SSL_CERT_FILE to our self-signed cert works on linux and mac, but
        # not on Windows.
        self.settings["validate_https_trackers"] = False
        self.session = lt.session(self.settings)
        self.dir = tempfile.TemporaryDirectory()
        self.atp = lt.add_torrent_params()
        self.atp.info_hash = dummy_data.get_sha1_hash()
        self.atp.flags &= ~lt.torrent_flags.auto_managed
        self.atp.flags &= ~lt.torrent_flags.paused
        self.atp.save_path = self.dir.name

    def tearDown(self):
        # we do this because sessions writing data can collide with
        # cleaning up temporary directories. session.abort() isn't bound
        handles = self.session.get_torrents()
        for handle in handles:
            self.session.remove_torrent(handle)
        for _ in loop_until_timeout(5, msg="clear all handles"):
            if not any(handle.is_valid() for handle in handles):
                break
        cleanup_with_windows_fix(self.dir, timeout=5)
        self.tracker.shutdown()
        # Explicitly clean up server sockets, to avoid ResourceWarning
        self.tracker.server_close()

    def test_external_ip_alert_via_ssl_tracker(self):
        handle = self.session.add_torrent(self.atp)
        handle.add_tracker({"url": self.tracker_url})

        alert = wait_for(self.session, lt.external_ip_alert, timeout=60)

        self.assertEqual(alert.category(), lt.alert_category.status)
        self.assertEqual(alert.what(), "external_ip")
        self.assertIsInstance(alert.message(), str)
        self.assertNotEqual(alert.message(), "")
        self.assertEqual(str(alert), alert.message())
        self.assertEqual(alert.external_address, "1.2.3.4")


if __name__ == '__main__':
    print(lt.__version__)
    try:
        shutil.copy(os.path.join('..', '..', 'test', 'test_torrents',
                                 'url_seed_multi.torrent'), '.')
    except shutil.SameFileError:
        pass
    try:
        shutil.copy(os.path.join('..', '..', 'test', 'test_torrents',
                                 'base.torrent'), '.')
    except shutil.SameFileError:
        pass
    try:
        shutil.copy(os.path.join('..', '..', 'test', 'test_torrents',
                                 'unordered.torrent'), '.')
    except shutil.SameFileError:
        pass
    unittest.main()
