1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
|
from test_dropbear import *
import signal
import queue
import socket
# Tests for various edge cases of SSH channels and connection service
def test_exitcode(request, dropbear):
r = dbclient(request, "exit 44")
assert r.returncode == 44
@pytest.mark.xfail(reason="Not yet implemented", strict=True)
def test_signal(request, dropbear):
r = dbclient(request, "kill -FPE $$")
assert r.returncode == -signal.SIGFPE
@pytest.mark.parametrize("size", [0, 1, 2, 100, 5000, 200_000])
def test_roundtrip(request, dropbear, size):
dat = os.urandom(size)
r = dbclient(request, "cat", input=dat, capture_output=True)
r.check_returncode()
assert r.stdout == dat
@pytest.mark.parametrize("size", [0, 1, 2, 100, 20001, 41234])
def test_read_pty(request, dropbear, size):
# testcase for
# https://bugs.openwrt.org/index.php?do=details&task_id=1814
# https://github.com/mkj/dropbear/pull/85
# From Yousong Zhou
# Fixed Oct 2021
#
#$ ssh -t my.router cat /tmp/bigfile | wc
#Connection to my.router closed.
# 0 1 14335 <- should be 20001
# Write the file. No newlines etc which could confuse ptys
dat = random_alnum(size)
r = dbclient(request, "tmpf=`mktemp`; echo $tmpf; cat > $tmpf", input=dat, capture_output=True, text=True)
tmpf = r.stdout.rstrip()
r.check_returncode()
# Read with a pty, this is what is being tested.
# Timing/buffering is subtle, we seem to need to cat a file from disk to hit it.
m, s = pty.openpty()
r = dbclient(request, "-t", f"cat {tmpf}; rm {tmpf}", stdin=s, capture_output=True)
r.check_returncode()
assert r.stdout.decode() == dat
@pytest.mark.parametrize("fd", [1, 2])
def test_bg_sleep(request, fd, dropbear):
# https://lists.ucc.asn.au/pipermail/dropbear/2006q1/000362.html
# Rob Landley "Is this a bug?" 24 Mar 2006
# dbclient user@system "sleep 10& echo hello"
#
# It should return right after printing hello, but it doesn't. It waits until
# the child process exits.
# failure is TimeoutExpired
redir = "" if fd == 1 else " >&2 "
r = dbclient(request, f"sleep 10& echo hello {redir}",
capture_output=True, timeout=2, text=True)
r.check_returncode()
st = r.stdout if fd == 1 else r.stderr
if fd == 2 and 'accepted unconditionally' in st:
# ignore hostkey warning, a bit of a hack
assert st.endswith("\n\nhello\n")
else:
assert st.rstrip() == "hello"
def test_idle(request, dropbear):
# Idle test, -I 1 should make it return before the 5 second timeout
r = dbclient(request, "-I", "1", "echo zong; sleep 10",
capture_output=True, timeout=5, text=True)
r.check_returncode()
assert r.stdout.rstrip() == "zong"
@pytest.mark.parametrize("size", [1, 4000, 40000])
def test_netcat(request, dropbear, size):
opt = request.config.option
if opt.remote:
pytest.xfail("don't know netcat address for remote")
dat1 = os.urandom(size)
dat2 = os.urandom(size)
with HandleTcp(3344, 1, dat2) as tcp:
r = dbclient(request, "-B", "localhost:3344", input=dat1, capture_output=True)
r.check_returncode()
assert r.stdout == dat2
assert tcp.inbound() == dat1
@pytest.mark.parametrize("size", [1, 4000, 40000])
@pytest.mark.parametrize("fwd_flag", "LR")
def test_tcpflushout(request, dropbear, size, fwd_flag):
""" Tests that an opened TCP connection prevent a SSH session from being closed
until that TCP connection has finished transferring
"""
opt = request.config.option
if opt.remote:
pytest.xfail("don't know address for remote")
dat1 = os.urandom(size)
dat2 = os.urandom(size)
q = queue.Queue()
with HandleTcp(3344, timeout=1, response=q) as tcp:
r = dbclient(request, f"-{fwd_flag}", "7788:localhost:3344", "sleep 1; echo -n done",
text=True, background=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
# time to let the listener start
time.sleep(1)
# open a tcp connection
c = socket.create_connection(("localhost", 7788))
# wait for the shell to finish. sleep a bit longer in case it exits.
assert r.stdout.read(4) == "done"
time.sleep(0.1)
# now the shell has finished, we can write on the tcp socket
c.sendall(dat2)
c.shutdown(socket.SHUT_WR)
q.put(dat1)
# return a tcp response
q.put(None)
# check hasn't exited
assert r.poll() == None
# read the response
assert readall_socket(c) == dat1
c.close()
assert tcp.inbound() == dat2
# check has exited, allow time for dbclient to exit
time.sleep(0.1)
assert r.poll() == 0
|