1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
|
# Copyright (c) 2016-2024 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
# http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
# GNU General Public License, Version 2.0, or any later versions of
# that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
# Ron Frederick - initial implementation, API, and documentation
"""SSH agent support code for Windows"""
# Some of the imports below won't be found when running pylint on UNIX
# pylint: disable=import-error
import asyncio
import ctypes
import ctypes.wintypes
import errno
from typing import TYPE_CHECKING, Tuple, Union, cast
from .misc import open_file
if TYPE_CHECKING:
# pylint: disable=cyclic-import
from .agent import AgentReader, AgentWriter
try:
import mmapfile
import win32api
import win32con
import win32ui
_pywin32_available = True
except ImportError:
_pywin32_available = False
_AGENT_COPYDATA_ID = 0x804e50ba
_AGENT_MAX_MSGLEN = 8192
_AGENT_NAME = 'Pageant'
_DEFAULT_OPENSSH_PATH = r'\\.\pipe\openssh-ssh-agent'
def _find_agent_window() -> 'win32ui.PyCWnd':
"""Find and return the Pageant window"""
if _pywin32_available:
try:
return win32ui.FindWindow(_AGENT_NAME, _AGENT_NAME)
except win32ui.error:
raise OSError(errno.ENOENT, 'Agent not found') from None
else:
raise OSError(errno.ENOENT, 'PyWin32 not installed') from None
class _CopyDataStruct(ctypes.Structure):
"""Windows COPYDATASTRUCT argument for WM_COPYDATA message"""
_fields_ = (('dwData', ctypes.wintypes.LPARAM),
('cbData', ctypes.wintypes.DWORD),
('lpData', ctypes.c_char_p))
class _PageantTransport:
"""Transport to connect to Pageant agent on Windows"""
def __init__(self) -> None:
self._mapname = f'{_AGENT_NAME}{win32api.GetCurrentThreadId():08x}'
try:
self._mapfile = mmapfile.mmapfile('', self._mapname,
_AGENT_MAX_MSGLEN, 0, 0)
except mmapfile.error as exc:
raise OSError(errno.EIO, str(exc)) from None
self._cds = _CopyDataStruct(_AGENT_COPYDATA_ID, len(self._mapname) + 1,
self._mapname.encode())
self._writing = False
def write(self, data: bytes) -> None:
"""Write request data to Pageant agent"""
if not self._writing:
self._mapfile.seek(0)
self._writing = True
try:
self._mapfile.write(data)
except ValueError as exc:
raise OSError(errno.EIO, str(exc)) from None
async def readexactly(self, n: int) -> bytes:
"""Read response data from Pageant agent"""
if self._writing:
cwnd = _find_agent_window()
if not cwnd.SendMessage(win32con.WM_COPYDATA, 0,
cast(int, self._cds)):
raise OSError(errno.EIO, 'Unable to send agent request')
self._writing = False
self._mapfile.seek(0)
result = self._mapfile.read(n)
if len(result) != n:
raise asyncio.IncompleteReadError(result, n)
return result
def close(self) -> None:
"""Close the connection to Pageant"""
if self._mapfile:
self._mapfile.close()
async def wait_closed(self) -> None:
"""Wait for the transport to close"""
class _W10OpenSSHTransport:
"""Transport to connect to OpenSSH agent on Windows 10"""
def __init__(self, agent_path: str):
self._agentfile = open_file(agent_path, 'r+b')
async def readexactly(self, n: int) -> bytes:
"""Read response data from OpenSSH agent"""
result = self._agentfile.read(n)
if len(result) != n:
raise asyncio.IncompleteReadError(result, n)
return result
def write(self, data: bytes) -> None:
"""Write request data to OpenSSH agent"""
self._agentfile.write(data)
def close(self) -> None:
"""Close the connection to OpenSSH"""
if self._agentfile:
self._agentfile.close()
async def wait_closed(self) -> None:
"""Wait for the transport to close"""
async def open_agent(agent_path: str) -> Tuple['AgentReader', 'AgentWriter']:
"""Open a connection to the Pageant or Windows 10 OpenSSH agent"""
transport: Union[None, _PageantTransport, _W10OpenSSHTransport] = None
if not agent_path:
try:
_find_agent_window()
transport = _PageantTransport()
except OSError:
agent_path = _DEFAULT_OPENSSH_PATH
if not transport:
transport = _W10OpenSSHTransport(agent_path)
return transport, transport
|