File: agent_win32.py

package info (click to toggle)
python-asyncssh 2.21.0-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 3,464 kB
  • sloc: python: 40,306; makefile: 11
file content (182 lines) | stat: -rw-r--r-- 5,141 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# Copyright (c) 2016-2024 by Ron Frederick <ronf@timeheart.net> and others.
#
# This program and the accompanying materials are made available under
# the terms of the Eclipse Public License v2.0 which accompanies this
# distribution and is available at:
#
#     http://www.eclipse.org/legal/epl-2.0/
#
# This program may also be made available under the following secondary
# licenses when the conditions for such availability set forth in the
# Eclipse Public License v2.0 are satisfied:
#
#    GNU General Public License, Version 2.0, or any later versions of
#    that license
#
# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
#
# Contributors:
#     Ron Frederick - initial implementation, API, and documentation

"""SSH agent support code for Windows"""

# Some of the imports below won't be found when running pylint on UNIX
# pylint: disable=import-error

import asyncio
import ctypes
import ctypes.wintypes
import errno
from typing import TYPE_CHECKING, Tuple, Union, cast

from .misc import open_file


if TYPE_CHECKING:
    # pylint: disable=cyclic-import
    from .agent import AgentReader, AgentWriter


try:
    import mmapfile
    import win32api
    import win32con
    import win32ui
    _pywin32_available = True
except ImportError:
    _pywin32_available = False


_AGENT_COPYDATA_ID = 0x804e50ba
_AGENT_MAX_MSGLEN = 8192
_AGENT_NAME = 'Pageant'

_DEFAULT_OPENSSH_PATH = r'\\.\pipe\openssh-ssh-agent'


def _find_agent_window() -> 'win32ui.PyCWnd':
    """Find and return the Pageant window"""

    if _pywin32_available:
        try:
            return win32ui.FindWindow(_AGENT_NAME, _AGENT_NAME)
        except win32ui.error:
            raise OSError(errno.ENOENT, 'Agent not found') from None
    else:
        raise OSError(errno.ENOENT, 'PyWin32 not installed') from None


class _CopyDataStruct(ctypes.Structure):
    """Windows COPYDATASTRUCT argument for WM_COPYDATA message"""

    _fields_ = (('dwData', ctypes.wintypes.LPARAM),
                ('cbData', ctypes.wintypes.DWORD),
                ('lpData', ctypes.c_char_p))


class _PageantTransport:
    """Transport to connect to Pageant agent on Windows"""

    def __init__(self) -> None:
        self._mapname = f'{_AGENT_NAME}{win32api.GetCurrentThreadId():08x}'

        try:
            self._mapfile = mmapfile.mmapfile('', self._mapname,
                                              _AGENT_MAX_MSGLEN, 0, 0)
        except mmapfile.error as exc:
            raise OSError(errno.EIO, str(exc)) from None

        self._cds = _CopyDataStruct(_AGENT_COPYDATA_ID, len(self._mapname) + 1,
                                    self._mapname.encode())

        self._writing = False

    def write(self, data: bytes) -> None:
        """Write request data to Pageant agent"""

        if not self._writing:
            self._mapfile.seek(0)
            self._writing = True

        try:
            self._mapfile.write(data)
        except ValueError as exc:
            raise OSError(errno.EIO, str(exc)) from None

    async def readexactly(self, n: int) -> bytes:
        """Read response data from Pageant agent"""

        if self._writing:
            cwnd = _find_agent_window()

            if not cwnd.SendMessage(win32con.WM_COPYDATA, 0,
                                    cast(int, self._cds)):
                raise OSError(errno.EIO, 'Unable to send agent request')

            self._writing = False
            self._mapfile.seek(0)

        result = self._mapfile.read(n)

        if len(result) != n:
            raise asyncio.IncompleteReadError(result, n)

        return result

    def close(self) -> None:
        """Close the connection to Pageant"""

        if self._mapfile:
            self._mapfile.close()

    async def wait_closed(self) -> None:
        """Wait for the transport to close"""


class _W10OpenSSHTransport:
    """Transport to connect to OpenSSH agent on Windows 10"""

    def __init__(self, agent_path: str):
        self._agentfile = open_file(agent_path, 'r+b')

    async def readexactly(self, n: int) -> bytes:
        """Read response data from OpenSSH agent"""

        result = self._agentfile.read(n)

        if len(result) != n:
            raise asyncio.IncompleteReadError(result, n)

        return result

    def write(self, data: bytes) -> None:
        """Write request data to OpenSSH agent"""

        self._agentfile.write(data)

    def close(self) -> None:
        """Close the connection to OpenSSH"""

        if self._agentfile:
            self._agentfile.close()

    async def wait_closed(self) -> None:
        """Wait for the transport to close"""


async def open_agent(agent_path: str) -> Tuple['AgentReader', 'AgentWriter']:
    """Open a connection to the Pageant or Windows 10 OpenSSH agent"""

    transport: Union[None, _PageantTransport, _W10OpenSSHTransport] = None

    if not agent_path:
        try:
            _find_agent_window()
            transport = _PageantTransport()
        except OSError:
            agent_path = _DEFAULT_OPENSSH_PATH

    if not transport:
        transport = _W10OpenSSHTransport(agent_path)

    return transport, transport