File: telnet.py

package info (click to toggle)
circuits 3.2.3-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,980 kB
  • sloc: python: 17,583; javascript: 3,226; makefile: 100
file content (181 lines) | stat: -rwxr-xr-x 4,347 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#!/usr/bin/env python
"""
Telnet Example

A basic telnet-like clone that connects to remote hosts
via tcp and allows the user to send data to the remote
server.

This example demonstrates:
    * Basic Component creation.
    * Basic Event handling.
    * Basiv Networking
    * Basic Request/Response Networking

This example makes use of:
    * Component
    * Event
    * net.sockets.TCPClient
"""

import os
from optparse import OptionParser

import circuits
from circuits import Component, handler
from circuits.io import stdin
from circuits.net.events import connect, write
from circuits.net.sockets import TCPClient, UDPClient, UNIXClient
from circuits.tools import graph


USAGE = '%prog [options] host [port]'
VERSION = '%prog v' + circuits.__version__


def parse_options():
    parser = OptionParser(usage=USAGE, version=VERSION)

    parser.add_option(
        '-s',
        '--secure',
        action='store_true',
        default=False,
        dest='secure',
        help='Enable secure mode',
    )

    parser.add_option(
        '-u',
        '--udp',
        action='store_true',
        default=False,
        dest='udp',
        help='Use UDP transport',
    )

    parser.add_option(
        '-v',
        '--verbose',
        action='store_true',
        default=False,
        dest='verbose',
        help='Enable verbose debugging',
    )

    opts, args = parser.parse_args()

    if len(args) < 1:
        parser.print_help()
        raise SystemExit(1)

    return opts, args


class Telnet(Component):
    # Define a separate channel for this component so we don't clash with
    # the ``read`` event of the ``stdin`` component.
    channel = 'telnet'

    def __init__(self, *args, **opts):
        super().__init__()

        self.args = args
        self.opts = opts

        if len(args) == 1:
            if os.path.exists(args[0]):
                UNIXClient(channel=self.channel).register(self)
                host = dest = port = args[0]
                dest = (dest,)
            else:
                raise OSError('Path %s not found' % args[0])
        else:
            if not opts['udp']:
                TCPClient(channel=self.channel).register(self)
            else:
                UDPClient(0, channel=self.channel).register(self)

            host, port = args
            port = int(port)
            dest = host, port

        self.host = host
        self.port = port

        print('Trying %s ...' % host)

        if not opts['udp']:
            self.fire(connect(*dest, secure=opts['secure']))
        else:
            self.fire(write((host, port), b'\x00'))

    def ready(self, *args):
        graph(self.root)

    def connected(self, host, port=None):
        """
        connected Event Handler

        This event is fired by the TCPClient Componentt to indicate a
        successful connection.
        """
        print(f'connected to {host}')

    def error(self, *args, **kwargs):
        """
        error Event Handler

        If any exception/error occurs in the system this event is triggered.
        """
        if len(args) == 3:
            print(f'ERROR: {args[1]}')
        else:
            print(f'ERROR: {args[0]}')

    def read(self, *args):
        """
        read Event Handler

        This event is fired by the underlying TCPClient Component when there
        is data to be read from the connection.
        """
        if len(args) == 1:
            data = args[0]
        else:
            _peer, data = args

        data = data.strip().decode('utf-8')

        print(data)

    # Setup an Event Handler for "read" events on the "stdin" channel.
    @handler('read', channel='stdin')
    def _on_stdin_read(self, data):
        """
        read Event Handler for stdin

        This event is triggered by the connected ``stdin`` component when
        there is new data to be read in from standard input.
        """
        if not self.opts['udp']:
            self.fire(write(data))
        else:
            self.fire(write((self.host, self.port), data))


def main():
    opts, args = parse_options()

    # Configure and "run" the System.
    app = Telnet(*args, **opts.__dict__)
    if opts.verbose:
        from circuits import Debugger

        Debugger().register(app)
    stdin.register(app)
    app.run()


if __name__ == '__main__':
    main()