File: rforward.py

package info (click to toggle)
python-x2go 0.6.0.2-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 1,632 kB
  • sloc: python: 9,758; makefile: 216
file content (377 lines) | stat: -rw-r--r-- 17,002 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
# Copyright (C) 2010-2018 by Mike Gabriel <mike.gabriel@das-netzwerkteam.de>
#
# Python X2Go is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# Python X2Go is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program; if not, write to the
# Free Software Foundation, Inc.,
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.

"""\
X2Go reverse SSH/Paramiko tunneling provides X2Go sound, X2Go printing and
X2Go sshfs for folder sharing and mounting remote devices in X2Go terminal
server sessions.

"""
__NAME__ = 'x2gorevtunnel-pylib'

__package__ = 'x2go'
__name__    = 'x2go.rforward'

# modules
import copy
import threading
import gevent
import paramiko

# gevent/greenlet
from gevent import select, socket, Timeout

# Python X2Go modules
from . import log


def x2go_transport_tcp_handler(chan, origin, server):
    """\
    An X2Go customized TCP handler for the Paramiko/SSH ``Transport()`` class.

    Incoming channels will be put into Paramiko's default accept queue. This corresponds to
    the default behaviour of Paramiko's ``Transport`` class.

    However, additionally this handler function checks the server port of the incoming channel
    and detects if there are Paramiko/SSH reverse forwarding tunnels waiting for the incoming
    channels. The Paramiko/SSH reverse forwarding tunnels are initiated by an :class:`x2go.session.X2GoSession` instance
    (currently supported: reverse tunneling auf audio data, reverse tunneling of SSH requests).

    If the server port of an incoming Paramiko/SSH channel matches the configured port of an :class:`x2go.rforward.X2GoRevFwTunnel`
    instance, this instance gets notified of the incoming channel and a new :class:`x2go.rforward.X2GoRevFwChannelThread` is
    started. This :class:`x2go.rforward.X2GoRevFwChannelThread` then takes care of the new channel's incoming data stream.

    :param chan: a Paramiko channel object
    :type chan: ``paramiko.Channel`` object
    :param origin: host/port tuple where a connection originates from
    :type origin: ``tuple``
    :param server: host/port tuple where to connect to
    :type server: ``tuple``

    """
    (origin_addr, origin_port) = origin
    (server_addr, server_port) = server
    transport = chan.get_transport()
    transport._queue_incoming_channel(chan)
    rev_tuns = transport.reverse_tunnels

    for session_name in list(rev_tuns.keys()):

        if int(server_port) in [ int(tunnel[0]) for tunnel in list(rev_tuns[session_name].values()) ]:

            if rev_tuns[session_name]['snd'] is not None and int(server_port) == int(rev_tuns[session_name]['snd'][0]):
                rev_tuns[session_name]['snd'][1].notify()

            elif rev_tuns[session_name]['sshfs'] is not None and int(server_port) == int(rev_tuns[session_name]['sshfs'][0]):
                rev_tuns[session_name]['sshfs'][1].notify()


class X2GoRevFwTunnel(threading.Thread):
    """\
    :class:`x2go.rforward.X2GoRevFwTunnel` class objects are used to reversely tunnel
    X2Go audio, X2Go printing and X2Go folder sharing / device mounting
    through Paramiko/SSH.


    """
    def __init__(self, server_port, remote_host, remote_port, ssh_transport, session_instance=None, logger=None, loglevel=log.loglevel_DEFAULT):
        """\
        Setup a reverse tunnel through Paramiko/SSH.

        After the reverse tunnel has been setup up with :func:`X2GoRevFwTunnel.start() <x2go.rforward.X2GoRevFwTunnel.start()>` it waits
        for notification from :func:`X2GoRevFwTunnel.notify() <x2go.rforward.X2GoRevFwTunnel.notify()>` to accept incoming channels. This
        notification (:func:`X2GoRevFwTunnel.notify() <x2go.rforward.X2GoRevFwTunnel.notify()>` gets called from within the transport's
        TCP handler function :func:`x2go_transport_tcp_handler()` of the :class:`x2go.session.X2GoSession` instance.

        :param server_port: the TCP/IP port on the X2Go server (starting point of the tunnel),
            normally some number above 30000
        :type server_port: int
        :param remote_host: the target address for reversely tunneled traffic. With X2Go this should
            always be set to the localhost (IPv4) address.
        :type remote_host: str
        :param remote_port: the TCP/IP port on the X2Go client (end point of the tunnel),
            normally an application's standard port (22 for SSH, 4713 for pulse audio, etc.)
        :type remote_port: int
        :param ssh_transport: the :class:`x2go.session.X2GoSession`'s Paramiko/SSH transport instance
        :type ssh_transport: ``paramiko.Transport`` instance
        :param logger: you can pass an :class:`x2go.log.X2GoLogger` object to the
            :class:`x2go.rforward.X2GoRevFwTunnel` constructor
        :type logger: :class:`x2go.log.X2GoLogger` instance
        :param loglevel: if no :class:`x2go.log.X2GoLogger` object has been supplied a new one will be
            constructed with the given loglevel
        :type loglevel: int

        """
        if logger is None:
            self.logger = log.X2GoLogger(loglevel=loglevel)
        else:
            self.logger = copy.deepcopy(logger)
        self.logger.tag = __NAME__

        self.server_port = server_port
        self.remote_host = remote_host
        self.remote_port = remote_port
        self.ssh_transport = ssh_transport
        self.session_instance = session_instance

        self.open_channels = {}
        self.incoming_channel = threading.Condition()

        threading.Thread.__init__(self)
        self.daemon = True
        self._accept_channels = True

    def __del__(self):
        """\
        Class destructor.

        """
        self.stop_thread()
        self.cancel_port_forward('', self.server_port)

    def cancel_port_forward(self, address, port):
        """\
        Cancel a port forwarding request. This cancellation request is sent to the server and
        on the server the port forwarding should be unregistered.

        :param address: remote server address
        :type address: ``str``
        :param port: remote port
        :type port: ``int``

        """
        timeout = Timeout(10)
        timeout.start()
        try:
            self.ssh_transport.global_request('cancel-tcpip-forward', (address, port), wait=True)
        except:
            pass
        finally:
            timeout.cancel()

    def pause(self):
        """\
        Prevent acceptance of new incoming connections through the Paramiko/SSH
        reverse forwarding tunnel. Also, any active connection on this :class:`x2go.rforward.X2GoRevFwTunnel`
        instance will be closed immediately, if this method is called.


        """
        if self._accept_channels == True:
            self.cancel_port_forward('', self.server_port)
            self._accept_channels = False
            self.logger('paused thread: %s' % repr(self), loglevel=log.loglevel_DEBUG)

    def resume(self):
        """\
        Resume operation of the Paramiko/SSH reverse forwarding tunnel
        and continue accepting new incoming connections.


        """
        if self._accept_channels == False:
            self._accept_channels = True
            self._requested_port = self.ssh_transport.request_port_forward('127.0.0.1', self.server_port, handler=x2go_transport_tcp_handler)
            self.logger('resumed thread: %s' % repr(self), loglevel=log.loglevel_DEBUG)

    def notify(self):
        """\
        Notify an :class:`x2go.rforward.X2GoRevFwTunnel` instance of an incoming Paramiko/SSH channel.

        If an incoming reverse tunnel channel appropriate for this instance has
        been detected, this method gets called from the :class:`x2go.session.X2GoSession`'s transport
        TCP handler.

        The sent notification will trigger a ``thread.Condition()`` waiting for notification
        in :func:`X2GoRevFwTunnel.run() <x2go.rforward.X2GoRevFwTunnel.run()>`.


        """
        self.incoming_channel.acquire()
        self.logger('notifying thread of incoming channel: %s' % repr(self), loglevel=log.loglevel_DEBUG)
        self.incoming_channel.notify()
        self.incoming_channel.release()

    def stop_thread(self):
        """\
        Stops this :class:`x2go.rforward.X2GoRevFwTunnel` thread completely.


        """
        self.pause()
        self._keepalive = False
        self.logger('stopping thread: %s' % repr(self), loglevel=log.loglevel_DEBUG)
        self.notify()

    def _request_port_forwarding(self):
        try:
            self._requested_port = self.ssh_transport.request_port_forward('127.0.0.1', self.server_port, handler=x2go_transport_tcp_handler)
        except paramiko.SSHException:
            # if port forward request fails, we try to tell the server to cancel all foregoing port forward requests on
            # self.server_port
            self.cancel_port_forward('', self.server_port)
            gevent.sleep(1)
            try:
                self._requested_port = self.ssh_transport.request_port_forward('127.0.0.1', self.server_port, handler=x2go_transport_tcp_handler)
            except paramiko.SSHException as e:
                if self.session_instance:
                    self.session_instance.HOOK_rforward_request_denied(server_port=self.server_port)
                else:
                    self.logger('Encountered SSHException: %s (for reverse TCP port forward with local destination port %s' % (str(e), self.server_port), loglevel=log.loglevel_WARN)

    def run(self):
        """\
        This method gets run once an :class:`x2go.rforward.X2GoRevFwTunnel` has been started with its
        :func:`start()` method. Use :class:`x2go.rforward.X2GoRevFwTunnel`.stop_thread() to stop the
        reverse forwarding tunnel again. You can also temporarily lock the tunnel
        down with :func:`X2GoRevFwTunnel.pause() <x2go.rforward.X2GoRevFwTunnel.pause()>` and :func:`X2GoRevFwTunnel.resume() <x2go.rforward.X2GoRevFwTunnel.resume()>`).

        :func:`X2GoRevFwTunnel.run() <x2go.rforward.X2GoRevFwTunnel.run()>` waits for notifications of an appropriate incoming
        Paramiko/SSH channel (issued by :func:`X2GoRevFwTunnel.notify() <x2go.rforward.X2GoRevFwTunnel.notify()>`). Appropriate in
        this context means, that its start point on the X2Go server matches the class's
        property ``server_port``.

        Once a new incoming channel gets announced by the :func:`notify()` method, a new
        :class:`x2go.rforward.X2GoRevFwChannelThread` instance will be initialized. As a data stream handler,
        the function :func:`x2go_rev_forward_channel_handler()` will be used.

        The channel will last till the connection gets dropped on the X2Go server side or
        until the tunnel gets paused by an :func:`X2GoRevFwTunnel.pause() <x2go.rforward.X2GoRevFwTunnel.pause()>` call or stopped via the
        :func:`X2GoRevFwTunnel.stop_thread() <x2go.rforward.X2GoRevFwTunnel.stop_thread()>` method.


        """
        self._request_port_forwarding()
        self._keepalive = True
        while self._keepalive:

            self.incoming_channel.acquire()

            self.logger('waiting for incoming data channel on X2Go server port: [127.0.0.1]:%s' % self.server_port, loglevel=log.loglevel_DEBUG)
            self.incoming_channel.wait()

            if self._keepalive:
                self.logger('detected incoming data channel on X2Go server port: [127.0.0.1]:%s' % self.server_port, loglevel=log.loglevel_DEBUG)
                _chan = self.ssh_transport.accept()
                self.logger('data channel %s for server port [127.0.0.1]:%s is up' % (_chan, self.server_port), loglevel=log.loglevel_DEBUG)
            else:
                self.logger('closing down rev forwarding tunnel on remote end [127.0.0.1]:%s' % self.server_port, loglevel=log.loglevel_DEBUG)

            self.incoming_channel.release()
            if self._accept_channels and self._keepalive:
                _new_chan_thread = X2GoRevFwChannelThread(_chan, (self.remote_host, self.remote_port),
                                                          target=x2go_rev_forward_channel_handler,
                                                          kwargs={
                                                            'chan': _chan,
                                                            'addr': self.remote_host,
                                                            'port': self.remote_port,
                                                            'parent_thread': self,
                                                            'logger': self.logger,
                                                          }
                                                  )
                _new_chan_thread.start()
                self.open_channels['[%s]:%s' % _chan.origin_addr] = _new_chan_thread


def x2go_rev_forward_channel_handler(chan=None, addr='', port=0, parent_thread=None, logger=None, ):
    """\
    Handle the data stream of a requested channel that got set up by a :class:`x2go.rforward.X2GoRevFwTunnel` (Paramiko/SSH
    reverse forwarding tunnel).

    The channel (and the corresponding connections) close either ...

        - ... if the connecting application closes the connection and thus, drops
          the channel, or
        - ... if the :class:`x2go.rforward.X2GoRevFwTunnel` parent thread gets paused. The call
          of :func:`X2GoRevFwTunnel.pause() <x2go.rforward.X2GoRevFwTunnel.pause()>` on the instance can be used to shut down all incoming
          tunneled SSH connections associated to this :class:`x2go.rforward.X2GoRevFwTunnel` instance
          from within a Python X2Go application.

    :param chan: channel (Default value = None)
    :type chan: ``class``
    :param addr: bind address (Default value = '')
    :type addr: ``str``
    :param port: bind port (Default value = 0)
    :type port: ``int``
    :param parent_thread: the calling :class:`x2go.rforward.X2GoRevFwTunnel` instance (Default value = None)
    :type parent_thread: :class:`x2go.rforward.X2GoRevFwTunnel` instance
    :param logger: you can pass an :class:`x2go.log.X2GoLogger` object to the
        :class:`x2go.rforward.X2GoRevFwTunnel` constructor (Default value = None)
    :type logger: :class:`x2go.log.X2GoLogger` instance

    """
    fw_socket = socket.socket()
    fw_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
    if logger is None:
        def _dummy_logger(msg, l):
            pass
        logger = _dummy_logger

    try:
        fw_socket.connect((addr, port))
    except Exception as e:
        logger('Reverse forwarding request to %s:%d failed: %r' % (addr, port, e), loglevel=log.loglevel_INFO)
        return

    logger('Connected! Reverse tunnel open %r -> %r -> %r' % (chan.origin_addr,
                                                              chan.getpeername(), (addr, port)),
                                                              loglevel=log.loglevel_INFO)
    while parent_thread._accept_channels:
        r, w, x = select.select([fw_socket, chan], [], [])
        try:
            if fw_socket in r:
                data = fw_socket.recv(1024)
                if len(data) == 0:
                    break
                chan.send(data)
            if chan in r:
                data = chan.recv(1024)
                if len(data) == 0:
                    break
                fw_socket.send(data)
        except socket.error as e:
            logger('Reverse tunnel %s encoutered socket error: %s' % (chan, str(e)), loglevel=log.loglevel_WARN)

    chan.close()
    fw_socket.close()
    logger('Reverse tunnel %s closed from %r' % (chan, chan.origin_addr,), loglevel=log.loglevel_INFO)


class X2GoRevFwChannelThread(threading.Thread):
    """\
    Starts a thread for each incoming Paramiko/SSH data channel trough the reverse
    forwarding tunnel.


    """
    def __init__(self, channel, remote=None, **kwargs):
        """\
        Initializes a reverse forwarding channel thread.

        :param channel: incoming Paramiko/SSH channel from the :class:`x2go.session.X2GoSession`'s transport
            accept queue
        :type channel: class
        :param remote: tuple (addr, port) that specifies the data endpoint of the channel
        :type remote: ``tuple(str, int)``

        """
        self.channel = channel
        if remote is not None:
            self.remote_host = remote[0]
            self.remote_port = remote[1]
        threading.Thread.__init__(self, **kwargs)
        self.daemon = True