import os
import re
import signal
import tempfile
import threading
from datetime import datetime

import pytest
import requests

import docker
from .. import helpers
from ..helpers import assert_cat_socket_detached_with_keys
from ..helpers import ctrl_with
from ..helpers import requires_api_version
from .base import BaseAPIIntegrationTest
from .base import TEST_IMG
from docker.constants import IS_WINDOWS_PLATFORM
from docker.utils.socket import next_frame_header
from docker.utils.socket import read_exactly


class ListContainersTest(BaseAPIIntegrationTest):
    def test_list_containers(self):
        res0 = self.client.containers(all=True)
        size = len(res0)
        res1 = self.client.create_container(TEST_IMG, 'true')
        assert 'Id' in res1
        self.client.start(res1['Id'])
        self.tmp_containers.append(res1['Id'])
        res2 = self.client.containers(all=True)
        assert size + 1 == len(res2)
        retrieved = [x for x in res2 if x['Id'].startswith(res1['Id'])]
        assert len(retrieved) == 1
        retrieved = retrieved[0]
        assert 'Command' in retrieved
        assert retrieved['Command'] == 'true'
        assert 'Image' in retrieved
        assert re.search(r'alpine:.*', retrieved['Image'])
        assert 'Status' in retrieved


class CreateContainerTest(BaseAPIIntegrationTest):

    def test_create(self):
        res = self.client.create_container(TEST_IMG, 'true')
        assert 'Id' in res
        self.tmp_containers.append(res['Id'])

    def test_create_with_host_pid_mode(self):
        ctnr = self.client.create_container(
            TEST_IMG, 'true', host_config=self.client.create_host_config(
                pid_mode='host', network_mode='none'
            )
        )
        assert 'Id' in ctnr
        self.tmp_containers.append(ctnr['Id'])
        self.client.start(ctnr)
        inspect = self.client.inspect_container(ctnr)
        assert 'HostConfig' in inspect
        host_config = inspect['HostConfig']
        assert 'PidMode' in host_config
        assert host_config['PidMode'] == 'host'

    def test_create_with_links(self):
        res0 = self.client.create_container(
            TEST_IMG, 'cat',
            detach=True, stdin_open=True,
            environment={'FOO': '1'})

        container1_id = res0['Id']
        self.tmp_containers.append(container1_id)

        self.client.start(container1_id)

        res1 = self.client.create_container(
            TEST_IMG, 'cat',
            detach=True, stdin_open=True,
            environment={'FOO': '1'})

        container2_id = res1['Id']
        self.tmp_containers.append(container2_id)

        self.client.start(container2_id)

        # we don't want the first /
        link_path1 = self.client.inspect_container(container1_id)['Name'][1:]
        link_alias1 = 'mylink1'
        link_env_prefix1 = link_alias1.upper()

        link_path2 = self.client.inspect_container(container2_id)['Name'][1:]
        link_alias2 = 'mylink2'
        link_env_prefix2 = link_alias2.upper()

        res2 = self.client.create_container(
            TEST_IMG, 'env', host_config=self.client.create_host_config(
                links={link_path1: link_alias1, link_path2: link_alias2},
                network_mode='bridge'
            )
        )
        container3_id = res2['Id']
        self.tmp_containers.append(container3_id)
        self.client.start(container3_id)
        assert self.client.wait(container3_id)['StatusCode'] == 0

        logs = self.client.logs(container3_id).decode('utf-8')
        assert f'{link_env_prefix1}_NAME=' in logs
        assert f'{link_env_prefix1}_ENV_FOO=1' in logs
        assert f'{link_env_prefix2}_NAME=' in logs
        assert f'{link_env_prefix2}_ENV_FOO=1' in logs

    def test_create_with_restart_policy(self):
        container = self.client.create_container(
            TEST_IMG, ['sleep', '2'],
            host_config=self.client.create_host_config(
                restart_policy={"Name": "always", "MaximumRetryCount": 0},
                network_mode='none'
            )
        )
        id = container['Id']
        self.client.start(id)
        self.client.wait(id)
        with pytest.raises(docker.errors.APIError) as exc:
            self.client.remove_container(id)
        err = exc.value.explanation
        assert 'You cannot remove ' in err
        self.client.remove_container(id, force=True)

    def test_create_container_with_volumes_from(self):
        vol_names = ['foobar_vol0', 'foobar_vol1']

        res0 = self.client.create_container(
            TEST_IMG, 'true', name=vol_names[0]
        )
        container1_id = res0['Id']
        self.tmp_containers.append(container1_id)
        self.client.start(container1_id)

        res1 = self.client.create_container(
            TEST_IMG, 'true', name=vol_names[1]
        )
        container2_id = res1['Id']
        self.tmp_containers.append(container2_id)
        self.client.start(container2_id)

        res = self.client.create_container(
            TEST_IMG, 'cat', detach=True, stdin_open=True,
            host_config=self.client.create_host_config(
                volumes_from=vol_names, network_mode='none'
            )
        )
        container3_id = res['Id']
        self.tmp_containers.append(container3_id)
        self.client.start(container3_id)

        info = self.client.inspect_container(res['Id'])
        assert len(info['HostConfig']['VolumesFrom']) == len(vol_names)

    def create_container_readonly_fs(self):
        ctnr = self.client.create_container(
            TEST_IMG, ['mkdir', '/shrine'],
            host_config=self.client.create_host_config(
                read_only=True, network_mode='none'
            )
        )
        assert 'Id' in ctnr
        self.tmp_containers.append(ctnr['Id'])
        self.client.start(ctnr)
        res = self.client.wait(ctnr)['StatusCode']
        assert res != 0

    def create_container_with_name(self):
        res = self.client.create_container(TEST_IMG, 'true', name='foobar')
        assert 'Id' in res
        self.tmp_containers.append(res['Id'])
        inspect = self.client.inspect_container(res['Id'])
        assert 'Name' in inspect
        assert '/foobar' == inspect['Name']

    def create_container_privileged(self):
        res = self.client.create_container(
            TEST_IMG, 'true', host_config=self.client.create_host_config(
                privileged=True, network_mode='none'
            )
        )
        assert 'Id' in res
        self.tmp_containers.append(res['Id'])
        self.client.start(res['Id'])
        inspect = self.client.inspect_container(res['Id'])
        assert 'Config' in inspect
        assert 'Id' in inspect
        assert inspect['Id'].startswith(res['Id'])
        assert 'Image' in inspect
        assert 'State' in inspect
        assert 'Running' in inspect['State']
        if not inspect['State']['Running']:
            assert 'ExitCode' in inspect['State']
            assert inspect['State']['ExitCode'] == 0
        # Since Nov 2013, the Privileged flag is no longer part of the
        # container's config exposed via the API (safety concerns?).
        #
        if 'Privileged' in inspect['Config']:
            assert inspect['Config']['Privileged'] is True

    def test_create_with_mac_address(self):
        mac_address_expected = "02:42:ac:11:00:0a"
        container = self.client.create_container(
            TEST_IMG, ['sleep', '60'], mac_address=mac_address_expected)

        id = container['Id']

        self.client.start(container)
        res = self.client.inspect_container(container['Id'])
        assert mac_address_expected == res['NetworkSettings']['MacAddress']

        self.client.kill(id)

    def test_group_id_ints(self):
        container = self.client.create_container(
            TEST_IMG, 'id -G',
            host_config=self.client.create_host_config(group_add=[1000, 1001])
        )
        self.tmp_containers.append(container)
        self.client.start(container)
        self.client.wait(container)

        logs = self.client.logs(container).decode('utf-8')
        groups = logs.strip().split(' ')
        assert '1000' in groups
        assert '1001' in groups

    def test_group_id_strings(self):
        container = self.client.create_container(
            TEST_IMG, 'id -G', host_config=self.client.create_host_config(
                group_add=['1000', '1001']
            )
        )
        self.tmp_containers.append(container)
        self.client.start(container)
        self.client.wait(container)

        logs = self.client.logs(container).decode('utf-8')

        groups = logs.strip().split(' ')
        assert '1000' in groups
        assert '1001' in groups

    def test_valid_log_driver_and_log_opt(self):
        log_config = docker.types.LogConfig(
            type='json-file',
            config={'max-file': '100'}
        )

        container = self.client.create_container(
            TEST_IMG, ['true'],
            host_config=self.client.create_host_config(log_config=log_config)
        )
        self.tmp_containers.append(container['Id'])
        self.client.start(container)

        info = self.client.inspect_container(container)
        container_log_config = info['HostConfig']['LogConfig']

        assert container_log_config['Type'] == log_config.type
        assert container_log_config['Config'] == log_config.config

    def test_invalid_log_driver_raises_exception(self):
        log_config = docker.types.LogConfig(
            type='asdf',
            config={}
        )

        expected_msgs = [
            "logger: no log driver named 'asdf' is registered",
            "error looking up logging plugin asdf: plugin \"asdf\" not found",
        ]
        with pytest.raises(docker.errors.APIError) as excinfo:
            # raises an internal server error 500
            container = self.client.create_container(
                TEST_IMG, ['true'], host_config=self.client.create_host_config(
                    log_config=log_config
                )
            )
            self.client.start(container)

        assert excinfo.value.explanation in expected_msgs

    def test_valid_no_log_driver_specified(self):
        log_config = docker.types.LogConfig(
            type="",
            config={'max-file': '100'}
        )

        container = self.client.create_container(
            TEST_IMG, ['true'],
            host_config=self.client.create_host_config(log_config=log_config)
        )
        self.tmp_containers.append(container['Id'])
        self.client.start(container)

        info = self.client.inspect_container(container)
        container_log_config = info['HostConfig']['LogConfig']

        assert container_log_config['Type'] == "json-file"
        assert container_log_config['Config'] == log_config.config

    def test_valid_no_config_specified(self):
        log_config = docker.types.LogConfig(
            type="json-file",
            config=None
        )

        container = self.client.create_container(
            TEST_IMG, ['true'],
            host_config=self.client.create_host_config(log_config=log_config)
        )
        self.tmp_containers.append(container['Id'])
        self.client.start(container)

        info = self.client.inspect_container(container)
        container_log_config = info['HostConfig']['LogConfig']

        assert container_log_config['Type'] == "json-file"
        assert container_log_config['Config'] == {}

    def test_create_with_memory_constraints_with_str(self):
        ctnr = self.client.create_container(
            TEST_IMG, 'true',
            host_config=self.client.create_host_config(
                memswap_limit='1G',
                mem_limit='700M'
            )
        )
        assert 'Id' in ctnr
        self.tmp_containers.append(ctnr['Id'])
        self.client.start(ctnr)
        inspect = self.client.inspect_container(ctnr)

        assert 'HostConfig' in inspect
        host_config = inspect['HostConfig']
        for limit in ['Memory', 'MemorySwap']:
            assert limit in host_config

    def test_create_with_memory_constraints_with_int(self):
        ctnr = self.client.create_container(
            TEST_IMG, 'true',
            host_config=self.client.create_host_config(mem_swappiness=40)
        )
        assert 'Id' in ctnr
        self.tmp_containers.append(ctnr['Id'])
        self.client.start(ctnr)
        inspect = self.client.inspect_container(ctnr)

        assert 'HostConfig' in inspect
        host_config = inspect['HostConfig']
        assert 'MemorySwappiness' in host_config

    def test_create_with_environment_variable_no_value(self):
        container = self.client.create_container(
            TEST_IMG,
            ['echo'],
            environment={'Foo': None, 'Other': 'one', 'Blank': ''},
        )
        self.tmp_containers.append(container['Id'])
        config = self.client.inspect_container(container['Id'])
        assert 'Foo' in config['Config']['Env']
        assert 'Other=one' in config['Config']['Env']
        assert 'Blank=' in config['Config']['Env']

    @requires_api_version('1.22')
    def test_create_with_tmpfs(self):
        tmpfs = {
            '/tmp1': 'size=3M'
        }

        container = self.client.create_container(
            TEST_IMG,
            ['echo'],
            host_config=self.client.create_host_config(
                tmpfs=tmpfs))

        self.tmp_containers.append(container['Id'])
        config = self.client.inspect_container(container)
        assert config['HostConfig']['Tmpfs'] == tmpfs

    @requires_api_version('1.24')
    def test_create_with_isolation(self):
        container = self.client.create_container(
            TEST_IMG, ['echo'], host_config=self.client.create_host_config(
                isolation='default'
            )
        )
        self.tmp_containers.append(container['Id'])
        config = self.client.inspect_container(container)
        assert config['HostConfig']['Isolation'] == 'default'

    @requires_api_version('1.25')
    def test_create_with_auto_remove(self):
        host_config = self.client.create_host_config(
            auto_remove=True
        )
        container = self.client.create_container(
            TEST_IMG, ['echo', 'test'], host_config=host_config
        )
        self.tmp_containers.append(container['Id'])
        config = self.client.inspect_container(container)
        assert config['HostConfig']['AutoRemove'] is True

    @requires_api_version('1.25')
    def test_create_with_stop_timeout(self):
        container = self.client.create_container(
            TEST_IMG, ['echo', 'test'], stop_timeout=25
        )
        self.tmp_containers.append(container['Id'])
        config = self.client.inspect_container(container)
        assert config['Config']['StopTimeout'] == 25

    @requires_api_version('1.24')
    @pytest.mark.xfail(True, reason='Not supported on most drivers')
    def test_create_with_storage_opt(self):
        host_config = self.client.create_host_config(
            storage_opt={'size': '120G'}
        )
        container = self.client.create_container(
            TEST_IMG, ['echo', 'test'], host_config=host_config
        )
        self.tmp_containers.append(container)
        config = self.client.inspect_container(container)
        assert config['HostConfig']['StorageOpt'] == {
            'size': '120G'
        }

    @requires_api_version('1.25')
    def test_create_with_init(self):
        ctnr = self.client.create_container(
            TEST_IMG, 'true',
            host_config=self.client.create_host_config(
                init=True
            )
        )
        self.tmp_containers.append(ctnr['Id'])
        config = self.client.inspect_container(ctnr)
        assert config['HostConfig']['Init'] is True

    @requires_api_version('1.24')
    @pytest.mark.xfail(not os.path.exists('/sys/fs/cgroup/cpu.rt_runtime_us'),
                       reason='CONFIG_RT_GROUP_SCHED isn\'t enabled')
    def test_create_with_cpu_rt_options(self):
        ctnr = self.client.create_container(
            TEST_IMG, 'true', host_config=self.client.create_host_config(
                cpu_rt_period=1000, cpu_rt_runtime=500
            )
        )
        self.tmp_containers.append(ctnr)
        config = self.client.inspect_container(ctnr)
        assert config['HostConfig']['CpuRealtimeRuntime'] == 500
        assert config['HostConfig']['CpuRealtimePeriod'] == 1000

    @requires_api_version('1.28')
    def test_create_with_device_cgroup_rules(self):
        rule = 'c 7:128 rwm'
        ctnr = self.client.create_container(
            TEST_IMG, 'cat /sys/fs/cgroup/devices/devices.list',
            host_config=self.client.create_host_config(
                device_cgroup_rules=[rule]
            )
        )
        self.tmp_containers.append(ctnr)
        config = self.client.inspect_container(ctnr)
        assert config['HostConfig']['DeviceCgroupRules'] == [rule]
        self.client.start(ctnr)
        assert rule in self.client.logs(ctnr).decode('utf-8')

    def test_create_with_uts_mode(self):
        container = self.client.create_container(
            TEST_IMG, ['echo'], host_config=self.client.create_host_config(
                uts_mode='host'
            )
        )
        self.tmp_containers.append(container)
        config = self.client.inspect_container(container)
        assert config['HostConfig']['UTSMode'] == 'host'


@pytest.mark.xfail(
    IS_WINDOWS_PLATFORM, reason='Test not designed for Windows platform'
)
class VolumeBindTest(BaseAPIIntegrationTest):
    def setUp(self):
        super().setUp()

        self.mount_dest = '/mnt'

        # Get a random pathname - we don't need it to exist locally
        self.mount_origin = tempfile.mkdtemp()
        self.filename = 'shared.txt'

        self.run_with_volume(
            False,
            TEST_IMG,
            ['touch', os.path.join(self.mount_dest, self.filename)],
        )

    def test_create_with_binds_rw(self):

        container = self.run_with_volume(
            False,
            TEST_IMG,
            ['ls', self.mount_dest],
        )
        logs = self.client.logs(container).decode('utf-8')
        assert self.filename in logs
        inspect_data = self.client.inspect_container(container)
        self.check_container_data(inspect_data, True)

    def test_create_with_binds_ro(self):
        self.run_with_volume(
            False,
            TEST_IMG,
            ['touch', os.path.join(self.mount_dest, self.filename)],
        )
        container = self.run_with_volume(
            True,
            TEST_IMG,
            ['ls', self.mount_dest],
        )
        logs = self.client.logs(container).decode('utf-8')

        assert self.filename in logs

        inspect_data = self.client.inspect_container(container)
        self.check_container_data(inspect_data, False)

    @requires_api_version('1.30')
    def test_create_with_mounts(self):
        mount = docker.types.Mount(
            type="bind", source=self.mount_origin, target=self.mount_dest
        )
        host_config = self.client.create_host_config(mounts=[mount])
        container = self.run_container(
            TEST_IMG, ['ls', self.mount_dest],
            host_config=host_config
        )
        assert container
        logs = self.client.logs(container).decode('utf-8')
        assert self.filename in logs
        inspect_data = self.client.inspect_container(container)
        self.check_container_data(inspect_data, True)

    @requires_api_version('1.30')
    def test_create_with_mounts_ro(self):
        mount = docker.types.Mount(
            type="bind", source=self.mount_origin, target=self.mount_dest,
            read_only=True
        )
        host_config = self.client.create_host_config(mounts=[mount])
        container = self.run_container(
            TEST_IMG, ['ls', self.mount_dest],
            host_config=host_config
        )
        assert container
        logs = self.client.logs(container).decode('utf-8')
        assert self.filename in logs
        inspect_data = self.client.inspect_container(container)
        self.check_container_data(inspect_data, False)

    @requires_api_version('1.30')
    def test_create_with_volume_mount(self):
        mount = docker.types.Mount(
            type="volume", source=helpers.random_name(),
            target=self.mount_dest, labels={'com.dockerpy.test': 'true'}
        )
        host_config = self.client.create_host_config(mounts=[mount])
        container = self.client.create_container(
            TEST_IMG, ['true'], host_config=host_config,
        )
        assert container
        inspect_data = self.client.inspect_container(container)
        assert 'Mounts' in inspect_data
        filtered = list(filter(
            lambda x: x['Destination'] == self.mount_dest,
            inspect_data['Mounts']
        ))
        assert len(filtered) == 1
        mount_data = filtered[0]
        assert mount['Source'] == mount_data['Name']
        assert mount_data['RW'] is True

    def check_container_data(self, inspect_data, rw):
        assert 'Mounts' in inspect_data
        filtered = list(filter(
            lambda x: x['Destination'] == self.mount_dest,
            inspect_data['Mounts']
        ))
        assert len(filtered) == 1
        mount_data = filtered[0]
        assert mount_data['Source'] == self.mount_origin
        assert mount_data['RW'] == rw

    def run_with_volume(self, ro, *args, **kwargs):
        return self.run_container(
            *args,
            volumes={self.mount_dest: {}},
            host_config=self.client.create_host_config(
                binds={
                    self.mount_origin: {
                        'bind': self.mount_dest,
                        'ro': ro,
                    },
                },
                network_mode='none'
            ),
            **kwargs
        )


class ArchiveTest(BaseAPIIntegrationTest):
    def test_get_file_archive_from_container(self):
        data = 'The Maid and the Pocket Watch of Blood'
        ctnr = self.client.create_container(
            TEST_IMG, f'sh -c "echo {data} > /vol1/data.txt"',
            volumes=['/vol1']
        )
        self.tmp_containers.append(ctnr)
        self.client.start(ctnr)
        self.client.wait(ctnr)
        with tempfile.NamedTemporaryFile() as destination:
            strm, stat = self.client.get_archive(ctnr, '/vol1/data.txt')
            for d in strm:
                destination.write(d)
            destination.seek(0)
            retrieved_data = helpers.untar_file(destination, 'data.txt')\
                .decode('utf-8')
            assert data == retrieved_data.strip()

    def test_get_file_stat_from_container(self):
        data = 'The Maid and the Pocket Watch of Blood'
        ctnr = self.client.create_container(
            TEST_IMG, f'sh -c "echo -n {data} > /vol1/data.txt"',
            volumes=['/vol1']
        )
        self.tmp_containers.append(ctnr)
        self.client.start(ctnr)
        self.client.wait(ctnr)
        strm, stat = self.client.get_archive(ctnr, '/vol1/data.txt')
        assert 'name' in stat
        assert stat['name'] == 'data.txt'
        assert 'size' in stat
        assert stat['size'] == len(data)

    def test_copy_file_to_container(self):
        data = b'Deaf To All But The Song'
        with tempfile.NamedTemporaryFile(delete=False) as test_file:
            test_file.write(data)
            test_file.seek(0)
            ctnr = self.client.create_container(
                TEST_IMG,
                'cat {}'.format(
                    os.path.join('/vol1/', os.path.basename(test_file.name))
                ),
                volumes=['/vol1']
            )
            self.tmp_containers.append(ctnr)
            with helpers.simple_tar(test_file.name) as test_tar:
                self.client.put_archive(ctnr, '/vol1', test_tar)
        self.client.start(ctnr)
        self.client.wait(ctnr)
        logs = self.client.logs(ctnr)
        assert logs.strip() == data

    def test_copy_directory_to_container(self):
        files = ['a.py', 'b.py', 'foo/b.py']
        dirs = ['foo', 'bar']
        base = helpers.make_tree(dirs, files)
        ctnr = self.client.create_container(
            TEST_IMG, 'ls -p /vol1', volumes=['/vol1']
        )
        self.tmp_containers.append(ctnr)
        with docker.utils.tar(base) as test_tar:
            self.client.put_archive(ctnr, '/vol1', test_tar)
        self.client.start(ctnr)
        self.client.wait(ctnr)
        logs = self.client.logs(ctnr).decode('utf-8')
        results = logs.strip().split()
        assert 'a.py' in results
        assert 'b.py' in results
        assert 'foo/' in results
        assert 'bar/' in results


class RenameContainerTest(BaseAPIIntegrationTest):
    def test_rename_container(self):
        version = self.client.version()['Version']
        name = 'hong_meiling'
        res = self.client.create_container(TEST_IMG, 'true')
        assert 'Id' in res
        self.tmp_containers.append(res['Id'])
        self.client.rename(res, name)
        inspect = self.client.inspect_container(res['Id'])
        assert 'Name' in inspect
        if version == '1.5.0':
            assert name == inspect['Name']
        else:
            assert f'/{name}' == inspect['Name']


class StartContainerTest(BaseAPIIntegrationTest):
    def test_start_container(self):
        res = self.client.create_container(TEST_IMG, 'true')
        assert 'Id' in res
        self.tmp_containers.append(res['Id'])
        self.client.start(res['Id'])
        inspect = self.client.inspect_container(res['Id'])
        assert 'Config' in inspect
        assert 'Id' in inspect
        assert inspect['Id'].startswith(res['Id'])
        assert 'Image' in inspect
        assert 'State' in inspect
        assert 'Running' in inspect['State']
        if not inspect['State']['Running']:
            assert 'ExitCode' in inspect['State']
            assert inspect['State']['ExitCode'] == 0

    def test_start_container_with_dict_instead_of_id(self):
        res = self.client.create_container(TEST_IMG, 'true')
        assert 'Id' in res
        self.tmp_containers.append(res['Id'])
        self.client.start(res)
        inspect = self.client.inspect_container(res['Id'])
        assert 'Config' in inspect
        assert 'Id' in inspect
        assert inspect['Id'].startswith(res['Id'])
        assert 'Image' in inspect
        assert 'State' in inspect
        assert 'Running' in inspect['State']
        if not inspect['State']['Running']:
            assert 'ExitCode' in inspect['State']
            assert inspect['State']['ExitCode'] == 0

    def test_run_shlex_commands(self):
        commands = [
            'true',
            'echo "The Young Descendant of Tepes & Septette for the '
            'Dead Princess"',
            'echo -n "The Young Descendant of Tepes & Septette for the '
            'Dead Princess"',
            '/bin/sh -c "echo Hello World"',
            '/bin/sh -c \'echo "Hello World"\'',
            'echo "\"Night of Nights\""',
            'true && echo "Night of Nights"'
        ]
        for cmd in commands:
            container = self.client.create_container(TEST_IMG, cmd)
            id = container['Id']
            self.client.start(id)
            self.tmp_containers.append(id)
            exitcode = self.client.wait(id)['StatusCode']
            assert exitcode == 0, cmd


class WaitTest(BaseAPIIntegrationTest):
    def test_wait(self):
        res = self.client.create_container(TEST_IMG, ['sleep', '3'])
        id = res['Id']
        self.tmp_containers.append(id)
        self.client.start(id)
        exitcode = self.client.wait(id)['StatusCode']
        assert exitcode == 0
        inspect = self.client.inspect_container(id)
        assert 'Running' in inspect['State']
        assert inspect['State']['Running'] is False
        assert 'ExitCode' in inspect['State']
        assert inspect['State']['ExitCode'] == exitcode

    def test_wait_with_dict_instead_of_id(self):
        res = self.client.create_container(TEST_IMG, ['sleep', '3'])
        id = res['Id']
        self.tmp_containers.append(id)
        self.client.start(res)
        exitcode = self.client.wait(res)['StatusCode']
        assert exitcode == 0
        inspect = self.client.inspect_container(res)
        assert 'Running' in inspect['State']
        assert inspect['State']['Running'] is False
        assert 'ExitCode' in inspect['State']
        assert inspect['State']['ExitCode'] == exitcode

    @requires_api_version('1.30')
    def test_wait_with_condition(self):
        ctnr = self.client.create_container(TEST_IMG, 'true')
        self.tmp_containers.append(ctnr)
        with pytest.raises(requests.exceptions.ConnectionError):
            self.client.wait(ctnr, condition='removed', timeout=1)

        ctnr = self.client.create_container(
            TEST_IMG, ['sleep', '3'],
            host_config=self.client.create_host_config(auto_remove=True)
        )
        self.tmp_containers.append(ctnr)
        self.client.start(ctnr)
        assert self.client.wait(
            ctnr, condition='removed', timeout=5
        )['StatusCode'] == 0


class LogsTest(BaseAPIIntegrationTest):
    def test_logs(self):
        snippet = 'Flowering Nights (Sakuya Iyazoi)'
        container = self.client.create_container(
            TEST_IMG, f'echo {snippet}'
        )
        id = container['Id']
        self.tmp_containers.append(id)
        self.client.start(id)
        exitcode = self.client.wait(id)['StatusCode']
        assert exitcode == 0
        logs = self.client.logs(id)
        assert logs == (snippet + '\n').encode(encoding='ascii')

    def test_logs_tail_option(self):
        snippet = '''Line1
Line2'''
        container = self.client.create_container(
            TEST_IMG, f'echo "{snippet}"'
        )
        id = container['Id']
        self.tmp_containers.append(id)
        self.client.start(id)
        exitcode = self.client.wait(id)['StatusCode']
        assert exitcode == 0
        logs = self.client.logs(id, tail=1)
        assert logs == 'Line2\n'.encode(encoding='ascii')

    def test_logs_streaming_and_follow(self):
        snippet = 'Flowering Nights (Sakuya Iyazoi)'
        container = self.client.create_container(
            TEST_IMG, f'echo {snippet}'
        )
        id = container['Id']
        self.tmp_containers.append(id)
        self.client.start(id)
        logs = b''
        for chunk in self.client.logs(id, stream=True, follow=True):
            logs += chunk

        exitcode = self.client.wait(id)['StatusCode']
        assert exitcode == 0

        assert logs == (snippet + '\n').encode(encoding='ascii')

    @pytest.mark.timeout(5)
    @pytest.mark.skipif(os.environ.get('DOCKER_HOST', '').startswith('ssh://'),
                        reason='No cancellable streams over SSH')
    def test_logs_streaming_and_follow_and_cancel(self):
        snippet = 'Flowering Nights (Sakuya Iyazoi)'
        container = self.client.create_container(
            TEST_IMG, f'sh -c "echo \\"{snippet}\\" && sleep 3"'
        )
        id = container['Id']
        self.tmp_containers.append(id)
        self.client.start(id)
        logs = b''

        generator = self.client.logs(id, stream=True, follow=True)
        threading.Timer(1, generator.close).start()

        for chunk in generator:
            logs += chunk

        assert logs == (snippet + '\n').encode(encoding='ascii')

    def test_logs_with_dict_instead_of_id(self):
        snippet = 'Flowering Nights (Sakuya Iyazoi)'
        container = self.client.create_container(
            TEST_IMG, f'echo {snippet}'
        )
        id = container['Id']
        self.tmp_containers.append(id)
        self.client.start(id)
        exitcode = self.client.wait(id)['StatusCode']
        assert exitcode == 0
        logs = self.client.logs(container)
        assert logs == (snippet + '\n').encode(encoding='ascii')

    def test_logs_with_tail_0(self):
        snippet = 'Flowering Nights (Sakuya Iyazoi)'
        container = self.client.create_container(
            TEST_IMG, f'echo "{snippet}"'
        )
        id = container['Id']
        self.tmp_containers.append(id)
        self.client.start(id)
        exitcode = self.client.wait(id)['StatusCode']
        assert exitcode == 0
        logs = self.client.logs(id, tail=0)
        assert logs == ''.encode(encoding='ascii')

    @requires_api_version('1.35')
    def test_logs_with_until(self):
        snippet = 'Shanghai Teahouse (Hong Meiling)'
        container = self.client.create_container(
            TEST_IMG, f'echo "{snippet}"'
        )

        self.tmp_containers.append(container)
        self.client.start(container)
        exitcode = self.client.wait(container)['StatusCode']
        assert exitcode == 0
        logs_until_1 = self.client.logs(container, until=1)
        assert logs_until_1 == b''
        logs_until_now = self.client.logs(container, datetime.now())
        assert logs_until_now == (snippet + '\n').encode(encoding='ascii')


class DiffTest(BaseAPIIntegrationTest):
    def test_diff(self):
        container = self.client.create_container(TEST_IMG, ['touch', '/test'])
        id = container['Id']
        self.client.start(id)
        self.tmp_containers.append(id)
        exitcode = self.client.wait(id)['StatusCode']
        assert exitcode == 0
        diff = self.client.diff(id)
        test_diff = [x for x in diff if x.get('Path', None) == '/test']
        assert len(test_diff) == 1
        assert 'Kind' in test_diff[0]
        assert test_diff[0]['Kind'] == 1

    def test_diff_with_dict_instead_of_id(self):
        container = self.client.create_container(TEST_IMG, ['touch', '/test'])
        id = container['Id']
        self.client.start(id)
        self.tmp_containers.append(id)
        exitcode = self.client.wait(id)['StatusCode']
        assert exitcode == 0
        diff = self.client.diff(container)
        test_diff = [x for x in diff if x.get('Path', None) == '/test']
        assert len(test_diff) == 1
        assert 'Kind' in test_diff[0]
        assert test_diff[0]['Kind'] == 1


class StopTest(BaseAPIIntegrationTest):
    def test_stop(self):
        container = self.client.create_container(TEST_IMG, ['sleep', '9999'])
        id = container['Id']
        self.client.start(id)
        self.tmp_containers.append(id)
        self.client.stop(id, timeout=2)
        container_info = self.client.inspect_container(id)
        assert 'State' in container_info
        state = container_info['State']
        assert 'Running' in state
        assert state['Running'] is False

    def test_stop_with_dict_instead_of_id(self):
        container = self.client.create_container(TEST_IMG, ['sleep', '9999'])
        assert 'Id' in container
        id = container['Id']
        self.client.start(container)
        self.tmp_containers.append(id)
        self.client.stop(container, timeout=2)
        container_info = self.client.inspect_container(id)
        assert 'State' in container_info
        state = container_info['State']
        assert 'Running' in state
        assert state['Running'] is False


class KillTest(BaseAPIIntegrationTest):
    def test_kill(self):
        container = self.client.create_container(TEST_IMG, ['sleep', '9999'])
        id = container['Id']
        self.client.start(id)
        self.tmp_containers.append(id)
        self.client.kill(id)
        container_info = self.client.inspect_container(id)
        assert 'State' in container_info
        state = container_info['State']
        assert 'ExitCode' in state
        assert state['ExitCode'] != 0
        assert 'Running' in state
        assert state['Running'] is False

    def test_kill_with_dict_instead_of_id(self):
        container = self.client.create_container(TEST_IMG, ['sleep', '9999'])
        id = container['Id']
        self.client.start(id)
        self.tmp_containers.append(id)
        self.client.kill(container)
        container_info = self.client.inspect_container(id)
        assert 'State' in container_info
        state = container_info['State']
        assert 'ExitCode' in state
        assert state['ExitCode'] != 0
        assert 'Running' in state
        assert state['Running'] is False

    def test_kill_with_signal(self):
        id = self.client.create_container(TEST_IMG, ['sleep', '60'])
        self.tmp_containers.append(id)
        self.client.start(id)
        self.client.kill(
            id, signal=signal.SIGKILL if not IS_WINDOWS_PLATFORM else 9
        )
        exitcode = self.client.wait(id)['StatusCode']
        assert exitcode != 0
        container_info = self.client.inspect_container(id)
        assert 'State' in container_info
        state = container_info['State']
        assert 'ExitCode' in state
        assert state['ExitCode'] != 0
        assert 'Running' in state
        assert state['Running'] is False, state

    def test_kill_with_signal_name(self):
        id = self.client.create_container(TEST_IMG, ['sleep', '60'])
        self.client.start(id)
        self.tmp_containers.append(id)
        self.client.kill(id, signal='SIGKILL')
        exitcode = self.client.wait(id)['StatusCode']
        assert exitcode != 0
        container_info = self.client.inspect_container(id)
        assert 'State' in container_info
        state = container_info['State']
        assert 'ExitCode' in state
        assert state['ExitCode'] != 0
        assert 'Running' in state
        assert state['Running'] is False, state

    def test_kill_with_signal_integer(self):
        id = self.client.create_container(TEST_IMG, ['sleep', '60'])
        self.client.start(id)
        self.tmp_containers.append(id)
        self.client.kill(id, signal=9)
        exitcode = self.client.wait(id)['StatusCode']
        assert exitcode != 0
        container_info = self.client.inspect_container(id)
        assert 'State' in container_info
        state = container_info['State']
        assert 'ExitCode' in state
        assert state['ExitCode'] != 0
        assert 'Running' in state
        assert state['Running'] is False, state


class PortTest(BaseAPIIntegrationTest):
    def test_port(self):
        port_bindings = {
            '1111': ('127.0.0.1', '4567'),
            '2222': ('127.0.0.1', '4568'),
            '3333/udp': ('127.0.0.1', '4569'),
        }
        ports = [
            1111,
            2222,
            (3333, 'udp'),
        ]

        container = self.client.create_container(
            TEST_IMG, ['sleep', '60'], ports=ports,
            host_config=self.client.create_host_config(
                port_bindings=port_bindings, network_mode='bridge'
            )
        )
        id = container['Id']

        self.client.start(container)

        # Call the port function on each biding and compare expected vs actual
        for port in port_bindings:
            port, _, protocol = port.partition('/')
            actual_bindings = self.client.port(container, port)
            port_binding = actual_bindings.pop()

            ip, host_port = port_binding['HostIp'], port_binding['HostPort']

            port_binding = port if not protocol else port + "/" + protocol
            assert ip == port_bindings[port_binding][0]
            assert host_port == port_bindings[port_binding][1]

        self.client.kill(id)


class ContainerTopTest(BaseAPIIntegrationTest):
    @pytest.mark.xfail(reason='Output of docker top depends on host distro, '
                              'and is not formalized.')
    def test_top(self):
        container = self.client.create_container(
            TEST_IMG, ['sleep', '60']
        )

        self.tmp_containers.append(container)

        self.client.start(container)
        res = self.client.top(container)
        if not IS_WINDOWS_PLATFORM:
            assert res['Titles'] == ['PID', 'USER', 'TIME', 'COMMAND']
        assert len(res['Processes']) == 1
        assert res['Processes'][0][-1] == 'sleep 60'
        self.client.kill(container)

    @pytest.mark.skipif(
        IS_WINDOWS_PLATFORM, reason='No psargs support on windows'
    )
    @pytest.mark.xfail(reason='Output of docker top depends on host distro, '
                              'and is not formalized.')
    def test_top_with_psargs(self):
        container = self.client.create_container(
            TEST_IMG, ['sleep', '60'])

        self.tmp_containers.append(container)

        self.client.start(container)
        res = self.client.top(container, '-eopid,user')
        assert res['Titles'] == ['PID', 'USER']
        assert len(res['Processes']) == 1
        assert res['Processes'][0][10] == 'sleep 60'


class RestartContainerTest(BaseAPIIntegrationTest):
    def test_restart(self):
        container = self.client.create_container(TEST_IMG, ['sleep', '9999'])
        id = container['Id']
        self.client.start(id)
        self.tmp_containers.append(id)
        info = self.client.inspect_container(id)
        assert 'State' in info
        assert 'StartedAt' in info['State']
        start_time1 = info['State']['StartedAt']
        self.client.restart(id, timeout=2)
        info2 = self.client.inspect_container(id)
        assert 'State' in info2
        assert 'StartedAt' in info2['State']
        start_time2 = info2['State']['StartedAt']
        assert start_time1 != start_time2
        assert 'Running' in info2['State']
        assert info2['State']['Running'] is True
        self.client.kill(id)

    def test_restart_with_low_timeout(self):
        container = self.client.create_container(TEST_IMG, ['sleep', '9999'])
        self.client.start(container)
        self.client.timeout = 3
        self.client.restart(container, timeout=1)
        self.client.timeout = None
        self.client.restart(container, timeout=1)
        self.client.kill(container)

    def test_restart_with_dict_instead_of_id(self):
        container = self.client.create_container(TEST_IMG, ['sleep', '9999'])
        assert 'Id' in container
        id = container['Id']
        self.client.start(container)
        self.tmp_containers.append(id)
        info = self.client.inspect_container(id)
        assert 'State' in info
        assert 'StartedAt' in info['State']
        start_time1 = info['State']['StartedAt']
        self.client.restart(container, timeout=2)
        info2 = self.client.inspect_container(id)
        assert 'State' in info2
        assert 'StartedAt' in info2['State']
        start_time2 = info2['State']['StartedAt']
        assert start_time1 != start_time2
        assert 'Running' in info2['State']
        assert info2['State']['Running'] is True
        self.client.kill(id)


class RemoveContainerTest(BaseAPIIntegrationTest):
    def test_remove(self):
        container = self.client.create_container(TEST_IMG, ['true'])
        id = container['Id']
        self.client.start(id)
        self.client.wait(id)
        self.client.remove_container(id)
        containers = self.client.containers(all=True)
        res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)]
        assert len(res) == 0

    def test_remove_with_dict_instead_of_id(self):
        container = self.client.create_container(TEST_IMG, ['true'])
        id = container['Id']
        self.client.start(id)
        self.client.wait(id)
        self.client.remove_container(container)
        containers = self.client.containers(all=True)
        res = [x for x in containers if 'Id' in x and x['Id'].startswith(id)]
        assert len(res) == 0


class AttachContainerTest(BaseAPIIntegrationTest):
    def test_run_container_streaming(self):
        container = self.client.create_container(TEST_IMG, '/bin/sh',
                                                 detach=True, stdin_open=True)
        id = container['Id']
        self.tmp_containers.append(id)
        self.client.start(id)
        sock = self.client.attach_socket(container, ws=False)
        assert sock.fileno() > -1

    def test_run_container_reading_socket(self):
        line = 'hi there and stuff and things, words!'
        # `echo` appends CRLF, `printf` doesn't
        command = f"printf '{line}'"
        container = self.client.create_container(TEST_IMG, command,
                                                 detach=True, tty=False)
        self.tmp_containers.append(container)

        opts = {"stdout": 1, "stream": 1, "logs": 1}
        pty_stdout = self.client.attach_socket(container, opts)
        self.addCleanup(pty_stdout.close)

        self.client.start(container)

        (stream, next_size) = next_frame_header(pty_stdout)
        assert stream == 1  # correspond to stdout
        assert next_size == len(line)
        data = read_exactly(pty_stdout, next_size)
        assert data.decode('utf-8') == line

    def test_attach_no_stream(self):
        container = self.client.create_container(
            TEST_IMG, 'echo hello'
        )
        self.tmp_containers.append(container)
        self.client.start(container)
        output = self.client.attach(container, stream=False, logs=True)
        assert output == 'hello\n'.encode(encoding='ascii')

    @pytest.mark.timeout(10)
    @pytest.mark.skipif(os.environ.get('DOCKER_HOST', '').startswith('ssh://'),
                        reason='No cancellable streams over SSH')
    @pytest.mark.xfail(condition=os.environ.get('DOCKER_TLS_VERIFY') or
                       os.environ.get('DOCKER_CERT_PATH'),
                       reason='Flaky test on TLS')
    def test_attach_stream_and_cancel(self):
        container = self.client.create_container(
            TEST_IMG, 'sh -c "sleep 2 && echo hello && sleep 60"',
            tty=True
        )
        self.tmp_containers.append(container)
        self.client.start(container)
        output = self.client.attach(container, stream=True, logs=True)

        threading.Timer(3, output.close).start()

        lines = []
        for line in output:
            lines.append(line)

        assert len(lines) == 1
        assert lines[0] == 'hello\r\n'.encode(encoding='ascii')

    def test_detach_with_default(self):
        container = self.client.create_container(
            TEST_IMG, 'cat',
            detach=True, stdin_open=True, tty=True
        )
        self.tmp_containers.append(container)
        self.client.start(container)

        sock = self.client.attach_socket(
            container,
            {'stdin': True, 'stream': True}
        )

        assert_cat_socket_detached_with_keys(
            sock, [ctrl_with('p'), ctrl_with('q')]
        )

    def test_detach_with_config_file(self):
        self.client._general_configs['detachKeys'] = 'ctrl-p'

        container = self.client.create_container(
            TEST_IMG, 'cat',
            detach=True, stdin_open=True, tty=True
        )
        self.tmp_containers.append(container)
        self.client.start(container)

        sock = self.client.attach_socket(
            container,
            {'stdin': True, 'stream': True}
        )

        assert_cat_socket_detached_with_keys(sock, [ctrl_with('p')])

    def test_detach_with_arg(self):
        self.client._general_configs['detachKeys'] = 'ctrl-p'

        container = self.client.create_container(
            TEST_IMG, 'cat',
            detach=True, stdin_open=True, tty=True
        )
        self.tmp_containers.append(container)
        self.client.start(container)

        sock = self.client.attach_socket(
            container,
            {'stdin': True, 'stream': True, 'detachKeys': 'ctrl-x'}
        )

        assert_cat_socket_detached_with_keys(sock, [ctrl_with('x')])


class PauseTest(BaseAPIIntegrationTest):
    def test_pause_unpause(self):
        container = self.client.create_container(TEST_IMG, ['sleep', '9999'])
        id = container['Id']
        self.tmp_containers.append(id)
        self.client.start(container)
        self.client.pause(id)
        container_info = self.client.inspect_container(id)
        assert 'State' in container_info
        state = container_info['State']
        assert 'ExitCode' in state
        assert state['ExitCode'] == 0
        assert 'Running' in state
        assert state['Running'] is True
        assert 'Paused' in state
        assert state['Paused'] is True

        self.client.unpause(id)
        container_info = self.client.inspect_container(id)
        assert 'State' in container_info
        state = container_info['State']
        assert 'ExitCode' in state
        assert state['ExitCode'] == 0
        assert 'Running' in state
        assert state['Running'] is True
        assert 'Paused' in state
        assert state['Paused'] is False


class PruneTest(BaseAPIIntegrationTest):
    @requires_api_version('1.25')
    def test_prune_containers(self):
        container1 = self.client.create_container(
            TEST_IMG, ['sh', '-c', 'echo hello > /data.txt']
        )
        container2 = self.client.create_container(TEST_IMG, ['sleep', '9999'])
        self.client.start(container1)
        self.client.start(container2)
        self.client.wait(container1)
        result = self.client.prune_containers()
        assert container1['Id'] in result['ContainersDeleted']
        assert result['SpaceReclaimed'] > 0
        assert container2['Id'] not in result['ContainersDeleted']


class GetContainerStatsTest(BaseAPIIntegrationTest):
    def test_get_container_stats_no_stream(self):
        container = self.client.create_container(
            TEST_IMG, ['sleep', '60'],
        )
        self.tmp_containers.append(container)
        self.client.start(container)
        response = self.client.stats(container, stream=0)
        self.client.kill(container)

        assert type(response) == dict
        for key in ['read', 'networks', 'precpu_stats', 'cpu_stats',
                    'memory_stats', 'blkio_stats']:
            assert key in response

        def test_get_container_stats_stream(self):
            container = self.client.create_container(
                TEST_IMG, ['sleep', '60'],
            )
            self.tmp_containers.append(container)
            self.client.start(container)
            stream = self.client.stats(container)
            for chunk in stream:
                assert type(chunk) == dict
                for key in ['read', 'network', 'precpu_stats', 'cpu_stats',
                            'memory_stats', 'blkio_stats']:
                    assert key in chunk


class ContainerUpdateTest(BaseAPIIntegrationTest):
    @requires_api_version('1.22')
    def test_update_container(self):
        old_mem_limit = 400 * 1024 * 1024
        new_mem_limit = 300 * 1024 * 1024
        container = self.client.create_container(
            TEST_IMG, 'top', host_config=self.client.create_host_config(
                mem_limit=old_mem_limit
            )
        )
        self.tmp_containers.append(container)
        self.client.start(container)
        self.client.update_container(container, mem_limit=new_mem_limit)
        inspect_data = self.client.inspect_container(container)
        assert inspect_data['HostConfig']['Memory'] == new_mem_limit

    @requires_api_version('1.23')
    def test_restart_policy_update(self):
        old_restart_policy = {
            'MaximumRetryCount': 0,
            'Name': 'always'
        }
        new_restart_policy = {
            'MaximumRetryCount': 42,
            'Name': 'on-failure'
        }
        container = self.client.create_container(
            TEST_IMG, ['sleep', '60'],
            host_config=self.client.create_host_config(
                restart_policy=old_restart_policy
            )
        )
        self.tmp_containers.append(container)
        self.client.start(container)
        self.client.update_container(container,
                                     restart_policy=new_restart_policy)
        inspect_data = self.client.inspect_container(container)
        assert (
            inspect_data['HostConfig']['RestartPolicy']['MaximumRetryCount'] ==
            new_restart_policy['MaximumRetryCount']
        )
        assert (
            inspect_data['HostConfig']['RestartPolicy']['Name'] ==
            new_restart_policy['Name']
        )


class ContainerCPUTest(BaseAPIIntegrationTest):
    def test_container_cpu_shares(self):
        cpu_shares = 512
        container = self.client.create_container(
            TEST_IMG, 'ls', host_config=self.client.create_host_config(
                cpu_shares=cpu_shares
            )
        )
        self.tmp_containers.append(container)
        self.client.start(container)
        inspect_data = self.client.inspect_container(container)
        assert inspect_data['HostConfig']['CpuShares'] == 512

    def test_container_cpuset(self):
        cpuset_cpus = "0,1"
        container = self.client.create_container(
            TEST_IMG, 'ls', host_config=self.client.create_host_config(
                cpuset_cpus=cpuset_cpus
            )
        )
        self.tmp_containers.append(container)
        self.client.start(container)
        inspect_data = self.client.inspect_container(container)
        assert inspect_data['HostConfig']['CpusetCpus'] == cpuset_cpus

    @requires_api_version('1.25')
    def test_create_with_runtime(self):
        container = self.client.create_container(
            TEST_IMG, ['echo', 'test'], runtime='runc'
        )
        self.tmp_containers.append(container['Id'])
        config = self.client.inspect_container(container)
        assert config['HostConfig']['Runtime'] == 'runc'


class LinkTest(BaseAPIIntegrationTest):
    def test_remove_link(self):
        # Create containers
        container1 = self.client.create_container(
            TEST_IMG, 'cat', detach=True, stdin_open=True
        )
        container1_id = container1['Id']
        self.tmp_containers.append(container1_id)
        self.client.start(container1_id)

        # Create Link
        # we don't want the first /
        link_path = self.client.inspect_container(container1_id)['Name'][1:]
        link_alias = 'mylink'

        container2 = self.client.create_container(
            TEST_IMG, 'cat', host_config=self.client.create_host_config(
                links={link_path: link_alias}
            )
        )
        container2_id = container2['Id']
        self.tmp_containers.append(container2_id)
        self.client.start(container2_id)

        # Remove link
        linked_name = self.client.inspect_container(container2_id)['Name'][1:]
        link_name = f'{linked_name}/{link_alias}'
        self.client.remove_container(link_name, link=True)

        # Link is gone
        containers = self.client.containers(all=True)
        retrieved = [x for x in containers if link_name in x['Names']]
        assert len(retrieved) == 0

        # Containers are still there
        retrieved = [
            x for x in containers if x['Id'].startswith(container1_id) or
            x['Id'].startswith(container2_id)
        ]
        assert len(retrieved) == 2
