1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
|
import struct
import pytest
from ssh_audit.auditconf import AuditConf
from ssh_audit.fingerprint import Fingerprint
from ssh_audit.outputbuffer import OutputBuffer
from ssh_audit.protocol import Protocol
from ssh_audit.readbuf import ReadBuf
from ssh_audit.ssh1 import SSH1
from ssh_audit.ssh1_publickeymessage import SSH1_PublicKeyMessage
from ssh_audit.ssh_audit import audit
from ssh_audit.writebuf import WriteBuf
# pylint: disable=line-too-long,attribute-defined-outside-init
class TestSSH1:
@pytest.fixture(autouse=True)
def init(self, ssh_audit):
self.OutputBuffer = OutputBuffer
self.protocol = Protocol
self.ssh1 = SSH1
self.PublicKeyMessage = SSH1_PublicKeyMessage
self.rbuf = ReadBuf
self.wbuf = WriteBuf
self.audit = audit
self.AuditConf = AuditConf
self.fingerprint = Fingerprint
def _conf(self):
conf = self.AuditConf('localhost', 22)
conf.colors = False
conf.batch = True
conf.verbose = True
conf.ssh1 = True
conf.ssh2 = False
conf.skip_rate_test = True
return conf
def _create_ssh1_packet(self, payload, valid_crc=True):
padding = -(len(payload) + 4) % 8
plen = len(payload) + 4
pad_bytes = b'\x00' * padding
cksum = self.ssh1.crc32(pad_bytes + payload) if valid_crc else 0
data = struct.pack('>I', plen) + pad_bytes + payload + struct.pack('>I', cksum)
return data
@classmethod
def _server_key(cls):
return (1024, 0x10001, 0xee6552da432e0ac2c422df1a51287507748bfe3b5e3e4fa989a8f49fdc163a17754939ef18ef8a667ea3b71036a151fcd7f5e01ceef1e4439864baf3ac569047582c69d6c128212e0980dcb3168f00d371004039983f6033cd785b8b8f85096c7d9405cbfdc664e27c966356a6b4eb6ee20ad43414b50de18b22829c1880b551)
@classmethod
def _host_key(cls):
return (2048, 0x10001, 0xdfa20cd2a530ccc8c870aa60d9feb3b35deeab81c3215a96557abbd683d21f4600f38e475d87100da9a4404220eeb3bb5584e5a2b5b48ffda58530ea19104a32577d7459d91e76aa711b241050f4cc6d5327ccce254f371acad3be56d46eb5919b73f20dbdb1177b700f00891c5bf4ed128bb90ed541b778288285bcfa28432ab5cbcb8321b6e24760e998e0daa519f093a631e44276d7dd252ce0c08c75e2ab28a7349ead779f97d0f20a6d413bf3623cd216dc35375f6366690bcc41e3b2d5465840ec7ee0dc7e3f1c101d674a0c7dbccbc3942788b111396add2f8153b46a0e4b50d66e57ee92958f1c860dd97cc0e40e32febff915343ed53573142bdf4b)
def _pkm_payload(self):
w = self.wbuf()
w.write(b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff')
b, e, m = self._server_key()
w.write_int(b).write_mpint1(e).write_mpint1(m)
b, e, m = self._host_key()
w.write_int(b).write_mpint1(e).write_mpint1(m)
w.write_int(2)
w.write_int(72)
w.write_int(36)
return w.write_flush()
def test_crc32(self):
assert self.ssh1.crc32(b'') == 0x00
assert self.ssh1.crc32(b'The quick brown fox jumps over the lazy dog') == 0xb9c60808
def test_fingerprint(self):
# pylint: disable=protected-access
b, e, m = self._host_key()
fpd = self.wbuf._create_mpint(m, False)
fpd += self.wbuf._create_mpint(e, False)
fp = self.fingerprint(fpd)
assert b == 2048
assert fp.md5 == 'MD5:9d:26:f8:39:fc:20:9d:9b:ca:cc:4a:0f:e1:93:f5:96'
assert fp.sha256 == 'SHA256:vZdx3mhzbvVJmn08t/ruv8WDhJ9jfKYsCTuSzot+QIs'
def _assert_pkm_keys(self, pkm, skey, hkey):
b, e, m = skey
assert pkm.server_key_bits == b
assert pkm.server_key_public_exponent == e
assert pkm.server_key_public_modulus == m
b, e, m = hkey
assert pkm.host_key_bits == b
assert pkm.host_key_public_exponent == e
assert pkm.host_key_public_modulus == m
def _assert_pkm_fields(self, pkm, skey, hkey):
assert pkm is not None
assert pkm.cookie == b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff'
self._assert_pkm_keys(pkm, skey, hkey)
assert pkm.protocol_flags == 2
assert pkm.supported_ciphers_mask == 72
assert pkm.supported_ciphers == ['3des', 'blowfish']
assert pkm.supported_authentications_mask == 36
assert pkm.supported_authentications == ['rsa', 'tis']
fp = self.fingerprint(pkm.host_key_fingerprint_data)
assert fp.md5 == 'MD5:9d:26:f8:39:fc:20:9d:9b:ca:cc:4a:0f:e1:93:f5:96'
assert fp.sha256 == 'SHA256:vZdx3mhzbvVJmn08t/ruv8WDhJ9jfKYsCTuSzot+QIs'
def test_pkm_init(self):
cookie = b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff'
pflags, cmask, amask = 2, 72, 36
skey, hkey = self._server_key(), self._host_key()
pkm = self.PublicKeyMessage(cookie, skey, hkey, pflags, cmask, amask)
self._assert_pkm_fields(pkm, skey, hkey)
for skey2 in ([], [0], [0, 1], [0, 1, 2, 3]):
with pytest.raises(ValueError):
pkm = self.PublicKeyMessage(cookie, skey2, hkey, pflags, cmask, amask)
for hkey2 in ([], [0], [0, 1], [0, 1, 2, 3]):
with pytest.raises(ValueError):
print(hkey2)
pkm = self.PublicKeyMessage(cookie, skey, hkey2, pflags, cmask, amask)
def test_pkm_read(self):
pkm = self.PublicKeyMessage.parse(self._pkm_payload())
self._assert_pkm_fields(pkm, self._server_key(), self._host_key())
def test_pkm_payload(self):
cookie = b'\x88\x99\xaa\xbb\xcc\xdd\xee\xff'
skey, hkey = self._server_key(), self._host_key()
pflags, cmask, amask = 2, 72, 36
pkm1 = self.PublicKeyMessage(cookie, skey, hkey, pflags, cmask, amask)
pkm2 = self.PublicKeyMessage.parse(self._pkm_payload())
assert pkm1.payload == pkm2.payload
def test_ssh1_server_simple(self, output_spy, virtual_socket):
vsocket = virtual_socket
w = self.wbuf()
w.write_byte(self.protocol.SMSG_PUBLIC_KEY)
w.write(self._pkm_payload())
vsocket.rdata.append(b'SSH-1.5-OpenSSH_7.2 ssh-audit-test\r\n')
vsocket.rdata.append(self._create_ssh1_packet(w.write_flush()))
output_spy.begin()
out = self.OutputBuffer()
self.audit(out, self._conf())
out.write()
lines = output_spy.flush()
assert len(lines) == 13
def test_ssh1_server_invalid_first_packet(self, output_spy, virtual_socket):
vsocket = virtual_socket
w = self.wbuf()
w.write_byte(self.protocol.SMSG_PUBLIC_KEY + 1)
w.write(self._pkm_payload())
vsocket.rdata.append(b'SSH-1.5-OpenSSH_7.2 ssh-audit-test\r\n')
vsocket.rdata.append(self._create_ssh1_packet(w.write_flush()))
output_spy.begin()
out = self.OutputBuffer()
ret = self.audit(out, self._conf())
out.write()
assert ret != 0
lines = output_spy.flush()
assert len(lines) == 6
assert 'unknown message' in lines[-1]
def test_ssh1_server_invalid_checksum(self, output_spy, virtual_socket):
vsocket = virtual_socket
w = self.wbuf()
w.write_byte(self.protocol.SMSG_PUBLIC_KEY + 1)
w.write(self._pkm_payload())
vsocket.rdata.append(b'SSH-1.5-OpenSSH_7.2 ssh-audit-test\r\n')
vsocket.rdata.append(self._create_ssh1_packet(w.write_flush(), False))
output_spy.begin()
out = self.OutputBuffer()
with pytest.raises(SystemExit):
self.audit(out, self._conf())
out.write()
lines = output_spy.flush()
assert len(lines) == 3
assert ('checksum' in lines[0]) or ('checksum' in lines[1]) or ('checksum' in lines[2])
|