1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290
|
#!/usr/bin/env python3
# This is a demonstration example of how to write a
# keyboard-interactive authentication helper plugin using PuTTY's
# protocol for involving it in SSH connection setup.
# The protocol, and the purpose of an authentication plugin, is
# fully documented in an appendix to the PuTTY manual.
import io
import os
import struct
import sys
# Exception class we'll use to get a clean exit on EOF.
class PluginEOF(Exception): pass
# ----------------------------------------------------------------------
#
# Marshalling and unmarshalling routines to write and read the
# necessary SSH data types to/from a binary file handle (which can
# include an io.BytesIO if you need to encode/decode in-process).
#
# Error handling is a totally ad-hoc mixture of 'assert' and just
# assuming things will have the right type, or be the right length of
# tuple, or be valid UTF-8. So it should be _robust_, in the sense
# that you'll get a Python exception if anything fails. But no
# sensible error reporting or recovery is implemented.
#
# That should be good enough, because PuTTY will log the plugin's
# standard error in its Event Log, so if the plugin crashes, you'll be
# able to retrieve the traceback.
def wr_byte(fh, b):
assert 0 <= b < 0x100
fh.write(bytes([b]))
def wr_boolean(fh, b):
wr_byte(fh, 1 if b else 0)
def wr_uint32(fh, u):
assert 0 <= u < 0x100000000
fh.write(struct.pack(">I", u))
def wr_string(fh, s):
wr_uint32(fh, len(s))
fh.write(s)
def wr_string_utf8(fh, s):
wr_string(fh, s.encode("UTF-8"))
def rd_n(fh, n):
data = fh.read(n)
if len(data) < n:
raise PluginEOF()
return data
def rd_byte(fh):
return rd_n(fh, 1)[0]
def rd_boolean(fh):
return rd_byte(fh) != 0
def rd_uint32(fh):
return struct.unpack(">I", rd_n(fh, 4))[0]
def rd_string(fh):
length = rd_uint32(fh)
return rd_n(fh, length)
def rd_string_utf8(fh):
return rd_string(fh).decode("UTF-8")
# ----------------------------------------------------------------------
#
# Protocol definitions.
our_max_version = 2
PLUGIN_INIT = 1
PLUGIN_INIT_RESPONSE = 2
PLUGIN_PROTOCOL = 3
PLUGIN_PROTOCOL_ACCEPT = 4
PLUGIN_PROTOCOL_REJECT = 5
PLUGIN_AUTH_SUCCESS = 6
PLUGIN_AUTH_FAILURE = 7
PLUGIN_INIT_FAILURE = 8
PLUGIN_KI_SERVER_REQUEST = 20
PLUGIN_KI_SERVER_RESPONSE = 21
PLUGIN_KI_USER_REQUEST = 22
PLUGIN_KI_USER_RESPONSE = 23
# ----------------------------------------------------------------------
#
# Classes to make it easy to construct and receive messages.
#
# OutMessage is constructed with the message type; then you use the
# wr_foo() routines to add fields to it, and finally call its send()
# method.
#
# InMessage is constructed via the expect() class method, to which you
# give a list of message types you expect to see one of at this stage.
# Once you've got one, you can rd_foo() fields from it.
class OutMessage:
def __init__(self, msgtype):
self.buf = io.BytesIO()
wr_byte(self.buf, msgtype)
self.write = self.buf.write
def send(self, fh=sys.stdout.buffer):
wr_string(fh, self.buf.getvalue())
fh.flush()
class InMessage:
@classmethod
def expect(cls, expected_types, fh=sys.stdin.buffer):
self = cls()
self.buf = io.BytesIO(rd_string(fh))
self.msgtype = rd_byte(self.buf)
self.read = self.buf.read
if self.msgtype not in expected_types:
raise ValueError("received packet type {:d}, expected {}".format(
self.msgtype, ",".join(map("{:d}".format,
sorted(expected_types)))))
return self
# ----------------------------------------------------------------------
#
# The main implementation of the protocol.
def protocol():
# Start by expecting PLUGIN_INIT.
msg = InMessage.expect({PLUGIN_INIT})
their_version = rd_uint32(msg)
hostname = rd_string_utf8(msg)
port = rd_uint32(msg)
username = rd_string_utf8(msg)
print(f"Got hostname {hostname!r}, port {port!r}", file=sys.stderr,
flush=True)
# Decide which protocol version we're speaking.
version = min(their_version, our_max_version)
assert version != 0, "Protocol version 0 does not exist"
if "TESTPLUGIN_INIT_FAIL" in os.environ:
# Test the plugin failing at startup time.
msg = OutMessage(PLUGIN_INIT_FAILURE)
wr_string_utf8(msg, os.environ["TESTPLUGIN_INIT_FAIL"])
msg.send()
return
# Send INIT_RESPONSE, with our protocol version and an overridden
# username.
#
# By default this test plugin doesn't override the username, but
# you can make it do so by setting TESTPLUGIN_USERNAME in the
# environment.
msg = OutMessage(PLUGIN_INIT_RESPONSE)
wr_uint32(msg, version)
wr_string_utf8(msg, os.environ.get("TESTPLUGIN_USERNAME", ""))
msg.send()
# Outer loop run once per authentication protocol.
while True:
# Expect a message telling us what the protocol is.
msg = InMessage.expect({PLUGIN_PROTOCOL})
method = rd_string(msg)
if "TESTPLUGIN_PROTO_REJECT" in os.environ:
# Test the plugin failing at PLUGIN_PROTOCOL time.
msg = OutMessage(PLUGIN_PROTOCOL_REJECT)
wr_string_utf8(msg, os.environ["TESTPLUGIN_PROTO_REJECT"])
msg.send()
continue
# We only support keyboard-interactive. If we supported other
# auth methods, this would be the place to add further clauses
# to this if statement for them.
if method == b"keyboard-interactive":
msg = OutMessage(PLUGIN_PROTOCOL_ACCEPT)
msg.send()
# Inner loop run once per keyboard-interactive exchange
# with the SSH server.
while True:
# Expect a set of prompts from the server, or
# terminate the loop on SUCCESS or FAILURE.
#
# (We could also respond to SUCCESS or FAILURE by
# updating caches of our own, if we had any that were
# useful.)
msg = InMessage.expect({PLUGIN_KI_SERVER_REQUEST,
PLUGIN_AUTH_SUCCESS,
PLUGIN_AUTH_FAILURE})
if (msg.msgtype == PLUGIN_AUTH_SUCCESS or
msg.msgtype == PLUGIN_AUTH_FAILURE):
break
# If we didn't just break, we're sitting on a
# PLUGIN_KI_SERVER_REQUEST message. Get all its bits
# and pieces out.
name = rd_string_utf8(msg)
instructions = rd_string_utf8(msg)
language = rd_string(msg)
nprompts = rd_uint32(msg)
prompts = []
for i in range(nprompts):
prompt = rd_string_utf8(msg)
echo = rd_boolean(msg)
prompts.append((prompt, echo))
# Actually make up some answers for the prompts. This
# is the part that a non-example implementation would
# do very differently, of course!
#
# Here, we answer "foo" to every prompt, except that
# if there are exactly two prompts in the packet then
# we answer "stoat" to the first and "weasel" to the
# second.
#
# (These answers are consistent with the ones required
# by PuTTY's test SSH server Uppity in its own
# keyboard-interactive test implementation: that
# presents a two-prompt packet and expects
# "stoat","weasel" as the answers, and then presents a
# zero-prompt packet. So this test plugin will get you
# through Uppity's k-i in a one-touch manner. The
# "foo" in this code isn't used by Uppity at all; I
# just include it because I had to have _some_
# handling for the else clause.)
#
# If TESTPLUGIN_PROMPTS is set in the environment, we
# ask the user questions of our own by sending them
# back to PuTTY as USER_REQUEST messages.
if nprompts == 2:
if "TESTPLUGIN_PROMPTS" in os.environ:
for i in range(2):
# Make up some questions to ask.
msg = OutMessage(PLUGIN_KI_USER_REQUEST)
wr_string_utf8(
msg, "Plugin request #{:d} (name)".format(i))
wr_string_utf8(
msg, "Plugin request #{:d} (instructions)"
.format(i))
wr_string(msg, b"")
wr_uint32(msg, 2)
wr_string_utf8(msg, "Prompt 1 of 2 (echo): ")
wr_boolean(msg, True)
wr_string_utf8(msg, "Prompt 2 of 2 (no echo): ")
wr_boolean(msg, False)
msg.send()
# Expect the answers.
msg = InMessage.expect({PLUGIN_KI_USER_RESPONSE})
user_nprompts = rd_uint32(msg)
assert user_nprompts == 2, (
"Should match what we just sent")
for i in range(nprompts):
user_response = rd_string_utf8(msg)
# We don't actually check these
# responses for anything.
answers = ["stoat", "weasel"]
else:
answers = ["foo"] * nprompts
# Send the answers to the SSH server's questions.
msg = OutMessage(PLUGIN_KI_SERVER_RESPONSE)
wr_uint32(msg, len(answers))
for answer in answers:
wr_string_utf8(msg, answer)
msg.send()
else:
# Default handler if we don't speak the offered protocol
# at all.
msg = OutMessage(PLUGIN_PROTOCOL_REJECT)
wr_string_utf8(msg, "")
msg.send()
# Demonstration write to stderr, to prove that it shows up in PuTTY's
# Event Log.
print("Hello from test plugin's stderr", file=sys.stderr, flush=True)
try:
protocol()
except PluginEOF:
pass
|