1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
|
# Copyright (c) str4d <str4d@mail.i2p>
# See COPYING for details.
from builtins import object
import base64
import hashlib
from twisted.internet.interfaces import IAddress, ITransport
from twisted.internet.protocol import Protocol
from twisted.python.util import FancyEqMixin
from zope.interface import implementer
@implementer(IAddress)
class I2PAddress(FancyEqMixin, object):
"""An :class:`IAddress` that represents the address of an I2P Destination.
Args:
destination (str): An I2P Destination string in I2P-style B64 format, or
an :class:`I2PAddress`. In the latter case, the default host is also
taken from the provided address.
host (str): An I2P host string; for example, ``'example.i2p'``.
port (int): An integer representing the port number.
Attributes:
destination (str): An I2P Destination string in I2P-style B64 format.
host (str): An I2P host string; for example, ``'example.i2p'`` or
``'fiftytwocharacters.b32.i2p'``. If looked up, it is guaranteed to
resolve to ``destination``.
port (int): An integer representing the port number. Will be ``None`` if
no port is configured.
"""
compareAttributes = ('destination', 'port')
def __init__(self, destination, host=None, port=None):
if hasattr(destination, 'destination'):
self.destination = destination.destination
else:
self.destination = destination
self.port = int(port) if port else None
if host:
self.host = host
elif hasattr(destination, 'host'):
self.host = destination.host
else:
raw_key = base64.b64decode(destination.encode('utf-8'), b'-~')
hash = hashlib.sha256(raw_key)
base32_hash = base64.b32encode(hash.digest()).decode('utf-8')
self.host = base32_hash.lower().replace('=', '')+'.b32.i2p'
def __repr__(self):
if self.port:
return '%s(%s, %d)' % (
self.__class__.__name__, self.host, self.port)
return '%s(%s)' % (
self.__class__.__name__, self.host)
def __hash__(self):
return hash((self.host, self.port))
@implementer(ITransport)
class I2PTunnelTransport(object):
def __init__(self, wrappedTransport, localAddr, peerAddr=None, invertTLS=False):
self.t = wrappedTransport
self._localAddr = localAddr
self.peerAddr = peerAddr
# Workaround for https://tahoe-lafs.org/trac/tahoe-lafs/ticket/2861
if invertTLS and hasattr(self.t, 'startTLS'):
import types
def startTLSWrapper(self, ctx, normal=True):
self.t.startTLS(ctx, not normal)
self.startTLS = types.MethodType(startTLSWrapper, self)
def __getattr__(self, attr):
return getattr(self.t, attr)
def getPeer(self):
return self.peerAddr
def getHost(self):
return self._localAddr
class I2PServerTunnelProtocol(Protocol):
def __init__(self, wrappedProto, serverAddr):
self.wrappedProto = wrappedProto
self._serverAddr = serverAddr
self.peer = None
def connectionMade(self):
# Substitute transport for an I2P wrapper
self.transport = I2PTunnelTransport(self.transport, self._serverAddr)
self.wrappedProto.makeConnection(self.transport)
def dataReceived(self, data):
if self.peer:
# Pass all other data to the wrapped Protocol.
self.wrappedProto.dataReceived(data.encode('utf-8'))
else:
# First line is the peer's Destination.
self.setPeer(data)
def setPeer(self, data):
self.peer = I2PAddress(data.split('\n')[0])
self.transport.peerAddr = self.peer
def connectionLost(self, reason):
self.wrappedProto.connectionLost(reason)
|