1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
|
import pytest
from .. import helpers
BUSYBOX = helpers.BUSYBOX
class ExecTest(helpers.BaseTestCase):
def test_execute_command(self):
if not helpers.exec_driver_is_native():
pytest.skip('Exec driver not native')
container = self.client.create_container(BUSYBOX, 'cat',
detach=True, stdin_open=True)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
res = self.client.exec_create(id, ['echo', 'hello'])
self.assertIn('Id', res)
exec_log = self.client.exec_start(res)
self.assertEqual(exec_log, b'hello\n')
def test_exec_command_string(self):
if not helpers.exec_driver_is_native():
pytest.skip('Exec driver not native')
container = self.client.create_container(BUSYBOX, 'cat',
detach=True, stdin_open=True)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
res = self.client.exec_create(id, 'echo hello world')
self.assertIn('Id', res)
exec_log = self.client.exec_start(res)
self.assertEqual(exec_log, b'hello world\n')
def test_exec_command_as_user(self):
if not helpers.exec_driver_is_native():
pytest.skip('Exec driver not native')
container = self.client.create_container(BUSYBOX, 'cat',
detach=True, stdin_open=True)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
res = self.client.exec_create(id, 'whoami', user='default')
self.assertIn('Id', res)
exec_log = self.client.exec_start(res)
self.assertEqual(exec_log, b'default\n')
def test_exec_command_as_root(self):
if not helpers.exec_driver_is_native():
pytest.skip('Exec driver not native')
container = self.client.create_container(BUSYBOX, 'cat',
detach=True, stdin_open=True)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
res = self.client.exec_create(id, 'whoami')
self.assertIn('Id', res)
exec_log = self.client.exec_start(res)
self.assertEqual(exec_log, b'root\n')
def test_exec_command_streaming(self):
if not helpers.exec_driver_is_native():
pytest.skip('Exec driver not native')
container = self.client.create_container(BUSYBOX, 'cat',
detach=True, stdin_open=True)
id = container['Id']
self.tmp_containers.append(id)
self.client.start(id)
exec_id = self.client.exec_create(id, ['echo', 'hello\nworld'])
self.assertIn('Id', exec_id)
res = b''
for chunk in self.client.exec_start(exec_id, stream=True):
res += chunk
self.assertEqual(res, b'hello\nworld\n')
def test_exec_start_socket(self):
if not helpers.exec_driver_is_native():
pytest.skip('Exec driver not native')
container = self.client.create_container(BUSYBOX, 'cat',
detach=True, stdin_open=True)
container_id = container['Id']
self.client.start(container_id)
self.tmp_containers.append(container_id)
line = 'yay, interactive exec!'
# `echo` appends CRLF, `printf` doesn't
exec_id = self.client.exec_create(
container_id, ['printf', line], tty=True)
self.assertIn('Id', exec_id)
socket = self.client.exec_start(exec_id, socket=True)
self.addCleanup(socket.close)
next_size = helpers.next_packet_size(socket)
self.assertEqual(next_size, len(line))
data = helpers.read_data(socket, next_size)
self.assertEqual(data.decode('utf-8'), line)
def test_exec_inspect(self):
if not helpers.exec_driver_is_native():
pytest.skip('Exec driver not native')
container = self.client.create_container(BUSYBOX, 'cat',
detach=True, stdin_open=True)
id = container['Id']
self.client.start(id)
self.tmp_containers.append(id)
exec_id = self.client.exec_create(id, ['mkdir', '/does/not/exist'])
self.assertIn('Id', exec_id)
self.client.exec_start(exec_id)
exec_info = self.client.exec_inspect(exec_id)
self.assertIn('ExitCode', exec_info)
self.assertNotEqual(exec_info['ExitCode'], 0)
|