File: util.py

package info (click to toggle)
python-txi2p-tahoe 0.3.7-5
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 448 kB
  • sloc: python: 3,757; makefile: 163; sh: 3
file content (58 lines) | stat: -rw-r--r-- 2,163 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# Copyright (c) str4d <str4d@mail.i2p>
# See COPYING for details.

from builtins import object
try:
    # Python 3
    from unittest import mock
except:
    # Python 2 (library)
    import mock
from twisted.internet import defer, protocol
from twisted.test import proto_helpers

TEST_B64 = "2wDRF5nDfeTNgM4X-TI5xEk3R-WiaTABvkMQ2eYpvEzayUZQJgr9E2T6Y2m9HHn3xHYGEOg-RLisjW9AubTaUTx-v66AsEEtv745qPcuWuV1SP~w1bdzYEn8MSoK7Zh4mwHBg1uHq8z17TUNvWz19q76vHNth-2PDuBToD7ySBn3cGBFDUU83wJJXPD6OueLY8yosWWtksk7WZk60~6z~nVePPSEY8JDry3myLDe11szAVER4A8eX1sFpw247cXGGJK9wQhV-TXFj~m76GPVcFKh7u79zwTwZnZ1GXXKqqyRoj1c4-U69CvvJsQRLmdLFwFEpRkxwV8z6LIFclYJk443YpTnPXC7vNdFOzqqS4FLR1ra~DNfN5foMtR2~2VxuR5m2dYiOS6GzHDxA4acJJSGqnasJjcEIFNVSQKxMnFu9PvGLNJHZ83EraHCErENcOGkPlnVgcJCtPGNGiirwCbBz38jE0lfjkrNrWabc6uWeU559CobG8F8KUDx1irpAAAA"
TEST_B32 = "tv5iv4i5roywnv2rg6rjqufniqbogn4rokjkooa7n4jht3lex3ga.b32.i2p"


def fakeSession(nickname, **kwargs):
    m = mock.Mock()
    m.nickname = nickname
    m.kwargs = kwargs
    return m


class FakeFactory(protocol.ClientFactory):
    protocol = proto_helpers.AccumulatingProtocol

    def __init__(self, returnNoProtocol=False):
        self.returnNoProtocol = returnNoProtocol
        self.protocolConnectionMade = defer.Deferred()

    def buildProtocol(self, addr):
        if self.returnNoProtocol:
            return None
        self.proto = protocol.ClientFactory.buildProtocol(self, addr)
        return self.proto


class FakeEndpoint(object):
    def __init__(self, failure=None):
        self.failure = failure
        self.deferred = None

    def connect(self, fac):
        self.factory = fac
        if self.deferred:
            return self.deferred
        if self.failure:
            return defer.fail(self.failure)
        self.proto = fac.buildProtocol(None)
        transport = proto_helpers.StringTransport()
        self.aborted = []
        transport.abortConnection = lambda: self.aborted.append(True)
        self.tlsStarts = []
        transport.startTLS = lambda ctx: self.tlsStarts.append(ctx)
        self.proto.makeConnection(transport)
        self.transport = transport
        return defer.succeed(self.proto)