1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
|
#!/usr/bin/env python
"""
Telnet Example
A basic telnet-like clone that connects to remote hosts
via tcp and allows the user to send data to the remote
server.
This example demonstrates:
* Basic Component creation.
* Basic Event handling.
* Basiv Networking
* Basic Request/Response Networking
This example makes use of:
* Component
* Event
* net.sockets.TCPClient
"""
import os
from optparse import OptionParser
import circuits
from circuits import Component, handler
from circuits.io import stdin
from circuits.net.events import connect, write
from circuits.net.sockets import TCPClient, UDPClient, UNIXClient
from circuits.tools import graph
USAGE = '%prog [options] host [port]'
VERSION = '%prog v' + circuits.__version__
def parse_options():
parser = OptionParser(usage=USAGE, version=VERSION)
parser.add_option(
'-s',
'--secure',
action='store_true',
default=False,
dest='secure',
help='Enable secure mode',
)
parser.add_option(
'-u',
'--udp',
action='store_true',
default=False,
dest='udp',
help='Use UDP transport',
)
parser.add_option(
'-v',
'--verbose',
action='store_true',
default=False,
dest='verbose',
help='Enable verbose debugging',
)
opts, args = parser.parse_args()
if len(args) < 1:
parser.print_help()
raise SystemExit(1)
return opts, args
class Telnet(Component):
# Define a separate channel for this component so we don't clash with
# the ``read`` event of the ``stdin`` component.
channel = 'telnet'
def __init__(self, *args, **opts):
super().__init__()
self.args = args
self.opts = opts
if len(args) == 1:
if os.path.exists(args[0]):
UNIXClient(channel=self.channel).register(self)
host = dest = port = args[0]
dest = (dest,)
else:
raise OSError('Path %s not found' % args[0])
else:
if not opts['udp']:
TCPClient(channel=self.channel).register(self)
else:
UDPClient(0, channel=self.channel).register(self)
host, port = args
port = int(port)
dest = host, port
self.host = host
self.port = port
print('Trying %s ...' % host)
if not opts['udp']:
self.fire(connect(*dest, secure=opts['secure']))
else:
self.fire(write((host, port), b'\x00'))
def ready(self, *args):
graph(self.root)
def connected(self, host, port=None):
"""
connected Event Handler
This event is fired by the TCPClient Componentt to indicate a
successful connection.
"""
print(f'connected to {host}')
def error(self, *args, **kwargs):
"""
error Event Handler
If any exception/error occurs in the system this event is triggered.
"""
if len(args) == 3:
print(f'ERROR: {args[1]}')
else:
print(f'ERROR: {args[0]}')
def read(self, *args):
"""
read Event Handler
This event is fired by the underlying TCPClient Component when there
is data to be read from the connection.
"""
if len(args) == 1:
data = args[0]
else:
_peer, data = args
data = data.strip().decode('utf-8')
print(data)
# Setup an Event Handler for "read" events on the "stdin" channel.
@handler('read', channel='stdin')
def _on_stdin_read(self, data):
"""
read Event Handler for stdin
This event is triggered by the connected ``stdin`` component when
there is new data to be read in from standard input.
"""
if not self.opts['udp']:
self.fire(write(data))
else:
self.fire(write((self.host, self.port), data))
def main():
opts, args = parse_options()
# Configure and "run" the System.
app = Telnet(*args, **opts.__dict__)
if opts.verbose:
from circuits import Debugger
Debugger().register(app)
stdin.register(app)
app.run()
if __name__ == '__main__':
main()
|