1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
|
# Copyright (c) str4d <str4d@mail.i2p>
# See COPYING for details.
from builtins import object
try:
# Python 3
from unittest import mock
except:
# Python 2 (library)
import mock
from twisted.internet import defer, protocol
from twisted.test import proto_helpers
TEST_B64 = "2wDRF5nDfeTNgM4X-TI5xEk3R-WiaTABvkMQ2eYpvEzayUZQJgr9E2T6Y2m9HHn3xHYGEOg-RLisjW9AubTaUTx-v66AsEEtv745qPcuWuV1SP~w1bdzYEn8MSoK7Zh4mwHBg1uHq8z17TUNvWz19q76vHNth-2PDuBToD7ySBn3cGBFDUU83wJJXPD6OueLY8yosWWtksk7WZk60~6z~nVePPSEY8JDry3myLDe11szAVER4A8eX1sFpw247cXGGJK9wQhV-TXFj~m76GPVcFKh7u79zwTwZnZ1GXXKqqyRoj1c4-U69CvvJsQRLmdLFwFEpRkxwV8z6LIFclYJk443YpTnPXC7vNdFOzqqS4FLR1ra~DNfN5foMtR2~2VxuR5m2dYiOS6GzHDxA4acJJSGqnasJjcEIFNVSQKxMnFu9PvGLNJHZ83EraHCErENcOGkPlnVgcJCtPGNGiirwCbBz38jE0lfjkrNrWabc6uWeU559CobG8F8KUDx1irpAAAA"
TEST_B32 = "tv5iv4i5roywnv2rg6rjqufniqbogn4rokjkooa7n4jht3lex3ga.b32.i2p"
def fakeSession(nickname, **kwargs):
m = mock.Mock()
m.nickname = nickname
m.kwargs = kwargs
return m
class FakeFactory(protocol.ClientFactory):
protocol = proto_helpers.AccumulatingProtocol
def __init__(self, returnNoProtocol=False):
self.returnNoProtocol = returnNoProtocol
self.protocolConnectionMade = defer.Deferred()
def buildProtocol(self, addr):
if self.returnNoProtocol:
return None
self.proto = protocol.ClientFactory.buildProtocol(self, addr)
return self.proto
class FakeEndpoint(object):
def __init__(self, failure=None):
self.failure = failure
self.deferred = None
def connect(self, fac):
self.factory = fac
if self.deferred:
return self.deferred
if self.failure:
return defer.fail(self.failure)
self.proto = fac.buildProtocol(None)
transport = proto_helpers.StringTransport()
self.aborted = []
transport.abortConnection = lambda: self.aborted.append(True)
self.tlsStarts = []
transport.startTLS = lambda ctx: self.tlsStarts.append(ctx)
self.proto.makeConnection(transport)
self.transport = transport
return defer.succeed(self.proto)
|