1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
|
import os
import struct
import pytest
from ssh_audit.auditconf import AuditConf
from ssh_audit.outputbuffer import OutputBuffer
from ssh_audit.protocol import Protocol
from ssh_audit.readbuf import ReadBuf
from ssh_audit.ssh2_kex import SSH2_Kex
from ssh_audit.ssh2_kexparty import SSH2_KexParty
from ssh_audit.ssh_audit import audit
from ssh_audit.writebuf import WriteBuf
# pylint: disable=line-too-long,attribute-defined-outside-init
class TestSSH2:
@pytest.fixture(autouse=True)
def init(self, ssh_audit):
self.OutputBuffer = OutputBuffer
self.protocol = Protocol
self.ssh2_kex = SSH2_Kex
self.ssh2_kexparty = SSH2_KexParty
self.rbuf = ReadBuf
self.wbuf = WriteBuf
self.audit = audit
self.AuditConf = AuditConf
def _conf(self):
conf = self.AuditConf('localhost', 22)
conf.colors = False
conf.batch = True
conf.verbose = True
conf.ssh1 = False
conf.ssh2 = True
conf.skip_rate_test = True
return conf
@classmethod
def _create_ssh2_packet(cls, payload):
padding = -(len(payload) + 5) % 8
if padding < 4:
padding += 8
plen = len(payload) + padding + 1
pad_bytes = b'\x00' * padding
data = struct.pack('>Ib', plen, padding) + payload + pad_bytes
return data
def _kex_payload(self):
w = self.wbuf()
w.write(b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff')
w.write_list(['bogus_kex1', 'bogus_kex2']) # We use a bogus kex, otherwise the host key tests will kick off and fail.
w.write_list(['ssh-rsa', 'rsa-sha2-512', 'rsa-sha2-256', 'ssh-ed25519'])
w.write_list(['chacha20-poly1305@openssh.com', 'aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'aes128-gcm@openssh.com', 'aes256-gcm@openssh.com', 'aes128-cbc', 'aes192-cbc', 'aes256-cbc'])
w.write_list(['chacha20-poly1305@openssh.com', 'aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'aes128-gcm@openssh.com', 'aes256-gcm@openssh.com', 'aes128-cbc', 'aes192-cbc', 'aes256-cbc'])
w.write_list(['umac-64-etm@openssh.com', 'umac-128-etm@openssh.com', 'hmac-sha2-256-etm@openssh.com', 'hmac-sha2-512-etm@openssh.com', 'hmac-sha1-etm@openssh.com', 'umac-64@openssh.com', 'umac-128@openssh.com', 'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1'])
w.write_list(['umac-64-etm@openssh.com', 'umac-128-etm@openssh.com', 'hmac-sha2-256-etm@openssh.com', 'hmac-sha2-512-etm@openssh.com', 'hmac-sha1-etm@openssh.com', 'umac-64@openssh.com', 'umac-128@openssh.com', 'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1'])
w.write_list(['none', 'zlib@openssh.com'])
w.write_list(['none', 'zlib@openssh.com'])
w.write_list([''])
w.write_list([''])
w.write_byte(False)
w.write_int(0)
return w.write_flush()
def _kex_payload_with_gss(self):
w = self.wbuf()
w.write(b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff')
w.write_list(['gss-gex-sha1-dZuIebMjgUqaxvbF7hDbAw==', 'gss-gex-sha1-vz8J1E9PzLr8b1K+0remTg==', 'gss-group14-sha1-dZuIebMjgUqaxvbF7hDbAw==', 'gss-group14-sha1-vz8J1E9PzLr8b1K+0remTg==', 'gss-group14-sha256-dZuIebMjgUqaxvbF7hDbAw==', 'gss-group14-sha256-vz8J1E9PzLr8b1K+0remTg==', 'gss-group16-sha512-dZuIebMjgUqaxvbF7hDbAw==', 'gss-group16-sha512-vz8J1E9PzLr8b1K+0remTg==', 'gss-group18-sha512-dZuIebMjgUqaxvbF7hDbAw==', 'gss-group18-sha512-vz8J1E9PzLr8b1K+0remTg==', 'gss-group1-sha1-dZuIebMjgUqaxvbF7hDbAw==', 'gss-group1-sha1-vz8J1E9PzLr8b1K+0remTg==', 'gss-curve448-sha512-XXX', 'gss-nistp256-sha256-RANDOMCHARSTOTESTWILDCARDMATCHING'])
w.write_list(['ssh-ed25519'])
w.write_list(['chacha20-poly1305@openssh.com'])
w.write_list(['chacha20-poly1305@openssh.com'])
w.write_list(['hmac-sha2-512-etm@openssh.com'])
w.write_list(['hmac-sha2-512-etm@openssh.com'])
w.write_list(['none', 'zlib@openssh.com'])
w.write_list(['none', 'zlib@openssh.com'])
w.write_list([''])
w.write_list([''])
w.write_byte(False)
w.write_int(0)
return w.write_flush()
def test_kex_read(self):
kex = self.ssh2_kex.parse(self.OutputBuffer, self._kex_payload())
assert kex is not None
assert kex.cookie == b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff'
assert kex.kex_algorithms == ['bogus_kex1', 'bogus_kex2']
assert kex.key_algorithms == ['ssh-rsa', 'rsa-sha2-512', 'rsa-sha2-256', 'ssh-ed25519']
assert kex.client is not None
assert kex.server is not None
assert kex.client.encryption == ['chacha20-poly1305@openssh.com', 'aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'aes128-gcm@openssh.com', 'aes256-gcm@openssh.com', 'aes128-cbc', 'aes192-cbc', 'aes256-cbc']
assert kex.server.encryption == ['chacha20-poly1305@openssh.com', 'aes128-ctr', 'aes192-ctr', 'aes256-ctr', 'aes128-gcm@openssh.com', 'aes256-gcm@openssh.com', 'aes128-cbc', 'aes192-cbc', 'aes256-cbc']
assert kex.client.mac == ['umac-64-etm@openssh.com', 'umac-128-etm@openssh.com', 'hmac-sha2-256-etm@openssh.com', 'hmac-sha2-512-etm@openssh.com', 'hmac-sha1-etm@openssh.com', 'umac-64@openssh.com', 'umac-128@openssh.com', 'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1']
assert kex.server.mac == ['umac-64-etm@openssh.com', 'umac-128-etm@openssh.com', 'hmac-sha2-256-etm@openssh.com', 'hmac-sha2-512-etm@openssh.com', 'hmac-sha1-etm@openssh.com', 'umac-64@openssh.com', 'umac-128@openssh.com', 'hmac-sha2-256', 'hmac-sha2-512', 'hmac-sha1']
assert kex.client.compression == ['none', 'zlib@openssh.com']
assert kex.server.compression == ['none', 'zlib@openssh.com']
assert kex.client.languages == ['']
assert kex.server.languages == ['']
assert kex.follows is False
assert kex.unused == 0
def _get_empty_kex(self, cookie=None):
kex_algs, key_algs = [], []
enc, mac, compression, languages = [], [], ['none'], []
cli = self.ssh2_kexparty(enc, mac, compression, languages)
enc, mac, compression, languages = [], [], ['none'], []
srv = self.ssh2_kexparty(enc, mac, compression, languages)
if cookie is None:
cookie = os.urandom(16)
kex = self.ssh2_kex(self.OutputBuffer, cookie, kex_algs, key_algs, cli, srv, 0)
return kex
def _get_kex_variat1(self):
cookie = b'\x00\x11\x22\x33\x44\x55\x66\x77\x88\x99\xaa\xbb\xcc\xdd\xee\xff'
kex = self._get_empty_kex(cookie)
kex.kex_algorithms.append('bogus_kex1')
kex.kex_algorithms.append('bogus_kex2')
kex.key_algorithms.append('ssh-rsa')
kex.key_algorithms.append('rsa-sha2-512')
kex.key_algorithms.append('rsa-sha2-256')
kex.key_algorithms.append('ssh-ed25519')
kex.server.encryption.append('chacha20-poly1305@openssh.com')
kex.server.encryption.append('aes128-ctr')
kex.server.encryption.append('aes192-ctr')
kex.server.encryption.append('aes256-ctr')
kex.server.encryption.append('aes128-gcm@openssh.com')
kex.server.encryption.append('aes256-gcm@openssh.com')
kex.server.encryption.append('aes128-cbc')
kex.server.encryption.append('aes192-cbc')
kex.server.encryption.append('aes256-cbc')
kex.server.mac.append('umac-64-etm@openssh.com')
kex.server.mac.append('umac-128-etm@openssh.com')
kex.server.mac.append('hmac-sha2-256-etm@openssh.com')
kex.server.mac.append('hmac-sha2-512-etm@openssh.com')
kex.server.mac.append('hmac-sha1-etm@openssh.com')
kex.server.mac.append('umac-64@openssh.com')
kex.server.mac.append('umac-128@openssh.com')
kex.server.mac.append('hmac-sha2-256')
kex.server.mac.append('hmac-sha2-512')
kex.server.mac.append('hmac-sha1')
kex.server.compression.append('zlib@openssh.com')
for a in kex.server.encryption:
kex.client.encryption.append(a)
for a in kex.server.mac:
kex.client.mac.append(a)
for a in kex.server.compression:
if a == 'none':
continue
kex.client.compression.append(a)
return kex
def test_key_payload(self):
kex1 = self._get_kex_variat1()
kex2 = self.ssh2_kex.parse(self.OutputBuffer, self._kex_payload())
assert kex1.payload == kex2.payload
def test_ssh2_server_simple(self, output_spy, virtual_socket):
vsocket = virtual_socket
w = self.wbuf()
w.write_byte(self.protocol.MSG_KEXINIT)
w.write(self._kex_payload())
vsocket.rdata.append(b'SSH-2.0-OpenSSH_7.3 ssh-audit-test\r\n')
vsocket.rdata.append(self._create_ssh2_packet(w.write_flush()))
output_spy.begin()
out = self.OutputBuffer()
self.audit(out, self._conf())
out.write()
lines = output_spy.flush()
assert len(lines) == 78
def test_ssh2_server_invalid_first_packet(self, output_spy, virtual_socket):
vsocket = virtual_socket
w = self.wbuf()
w.write_byte(self.protocol.MSG_KEXINIT + 1)
vsocket.rdata.append(b'SSH-2.0-OpenSSH_7.3 ssh-audit-test\r\n')
vsocket.rdata.append(self._create_ssh2_packet(w.write_flush()))
output_spy.begin()
out = self.OutputBuffer()
ret = self.audit(out, self._conf())
out.write()
assert ret != 0
lines = output_spy.flush()
assert len(lines) == 4
assert 'unknown message' in lines[-1]
def test_ssh2_gss_kex(self, output_spy, virtual_socket):
'''Ensure that GSS kex algorithms are properly parsed.'''
vsocket = virtual_socket
w = self.wbuf()
w.write_byte(self.protocol.MSG_KEXINIT)
w.write(self._kex_payload_with_gss()) # Use the kex with GSS algorithms.
vsocket.rdata.append(b'SSH-2.0-OpenSSH_7.3 ssh-audit-test\r\n')
vsocket.rdata.append(self._create_ssh2_packet(w.write_flush()))
output_spy.begin()
out = self.OutputBuffer()
self.audit(out, self._conf())
out.write()
lines = output_spy.flush()
# Ensure that none of the lines are reported as "unknown algorithm".
for line in lines:
assert line.find('unknown algorithm') == -1
|