1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
|
#--
# =============================================================================
# Copyright (c) 2004,2005 Jamis Buck (jamis@37signals.com)
# All rights reserved.
#
# This source file is distributed as part of the Net::SSH Secure Shell Client
# library for Ruby. This file (and the library as a whole) may be used only as
# allowed by either the BSD license, or the Ruby license (or, by association
# with the Ruby license, the GPL). See the "doc" subdirectory of the Net::SSH
# distribution for the texts of these licenses.
# -----------------------------------------------------------------------------
# net-ssh website : http://net-ssh.rubyforge.org
# project website: http://rubyforge.org/projects/net-ssh
# =============================================================================
#++
$:.unshift "#{File.dirname(__FILE__)}/../../lib"
require 'net/ssh/transport/algorithm-negotiator'
require 'net/ssh/transport/constants'
require 'net/ssh/util/buffer'
require 'test/unit'
class TC_AlgorithmNegotiator < Test::Unit::TestCase
include Net::SSH::Transport::Constants
class MockLogger; def debug?; false; end; end
class Buffers
def writer
Net::SSH::Util::WriterBuffer.new
end
end
class ScriptedSession
attr_reader :messages
attr_reader :host, :port
def initialize( *script )
@script = script
@messages = []
end
def wait_for_message
@script.shift
end
def send_message( msg )
@messages << msg.to_s
end
end
ALGORITHMS = {
:host_key => [ "A", "B" ],
:kex => [ "C", "D" ],
:encryption => [ "E", "F" ],
:hmac => [ "G", "H" ],
:compression => [ "I", "J" ],
:languages => [ "K", "L" ],
}
def reader( text )
Net::SSH::Util::ReaderBuffer.new( text )
end
def setup
logger = MockLogger.new
buffers = Buffers.new
@negotiator = Net::SSH::Transport::AlgorithmNegotiator.new( logger, ALGORITHMS, buffers )
end
def test_no_kexinit
session = ScriptedSession.new( [ -1, reader("") ] )
assert_raise( Net::SSH::Exception ) do
@negotiator.negotiate( session, {} )
end
end
def test_simple_exchange
session = ScriptedSession.new(
[ KEXINIT,
reader("1234567890123456" +
"\0\0\0\3C,D" +
"\0\0\0\3A,B" +
"\0\0\0\3E,F\0\0\0\3E,F" +
"\0\0\0\3G,H\0\0\0\3G,H" +
"\0\0\0\3I,J\0\0\0\3I,J" +
"\0\0\0\0\0\0\0\0\0\0\0\0\0") ]
)
result = @negotiator.negotiate( session, {} )
assert_equal "C", result.kex
assert_equal "A", result.host_key
assert_equal "E", result.encryption_c2s
assert_equal "E", result.encryption_s2c
assert_equal "G", result.mac_c2s
assert_equal "G", result.mac_s2c
assert_equal "I", result.compression_c2s
assert_equal "I", result.compression_s2c
assert_equal "", result.language_c2s
assert_equal "", result.language_s2c
assert_equal "1234567890123456\0\0\0\3C,D\0\0\0\3A,B\0\0\0\3E,F\0\0\0\3E,F\0\0\0\3G,H\0\0\0\3G,H\0\0\0\3I,J\0\0\0\3I,J\0\0\0\0\0\0\0\0\0\0\0\0\0", result.server_packet
assert_equal "\0\0\0\3C,D\0\0\0\3A,B\0\0\0\3E,F\0\0\0\3E,F\0\0\0\3G,H\0\0\0\3G,H\0\0\0\3I,J\0\0\0\3I,J\0\0\0\3K,L\0\0\0\3K,L\0\0\0\0\0", result.client_packet[17..-1]
end
def test_custom_exchange
session = ScriptedSession.new(
[ KEXINIT,
reader("1234567890123456" +
"\0\0\0\3C,D" +
"\0\0\0\3A,B" +
"\0\0\0\3E,F\0\0\0\3E,F" +
"\0\0\0\3G,H\0\0\0\3G,H" +
"\0\0\0\3I,J\0\0\0\3I,J" +
"\0\0\0\0\0\0\0\0\0\0\0\0\0") ]
)
result = @negotiator.negotiate( session,
:kex => "D",
:host_key => [ "B", "A" ] )
assert_equal "D", result.kex
assert_equal "B", result.host_key
assert_equal "E", result.encryption_c2s
assert_equal "E", result.encryption_s2c
assert_equal "G", result.mac_c2s
assert_equal "G", result.mac_s2c
assert_equal "I", result.compression_c2s
assert_equal "I", result.compression_s2c
assert_equal "", result.language_c2s
assert_equal "", result.language_s2c
end
def test_bad_algorithm
session = ScriptedSession.new(
[ KEXINIT,
reader("1234567890123456" +
"\0\0\0\3C,D" +
"\0\0\0\3A,B" +
"\0\0\0\3E,F\0\0\0\3E,F" +
"\0\0\0\3G,H\0\0\0\3G,H" +
"\0\0\0\3I,J\0\0\0\3I,J" +
"\0\0\0\0\0\0\0\0\0\0\0\0\0") ]
)
assert_raise( NotImplementedError ) do
@negotiator.negotiate( session, :kex => "K" )
end
end
def test_no_agree
session = ScriptedSession.new(
[ KEXINIT,
reader("1234567890123456" +
"\0\0\0\3C,D" +
"\0\0\0\3M,N" +
"\0\0\0\3E,F\0\0\0\3E,F" +
"\0\0\0\3G,H\0\0\0\3G,H" +
"\0\0\0\3I,J\0\0\0\3I,J" +
"\0\0\0\0\0\0\0\0\0\0\0\0\0") ]
)
assert_raise( Net::SSH::Exception ) do
@negotiator.negotiate( session, {} )
end
end
end
|