File: cred.py

package info (click to toggle)
twisted 12.0.0-1
  • links: PTS
  • area: main
  • in suites: wheezy
  • size: 9,720 kB
  • sloc: python: 78,364; ansic: 179; makefile: 113; sh: 34
file content (163 lines) | stat: -rw-r--r-- 4,546 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163

# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.



import sys
from zope.interface import implements, Interface

from twisted.protocols import basic
from twisted.internet import protocol
from twisted.python import log

from twisted.cred import error
from twisted.cred import portal
from twisted.cred import checkers
from twisted.cred import credentials

class IProtocolUser(Interface):
    def getPrivileges():
        """Return a list of privileges this user has."""

    def logout():
        """Cleanup per-login resources allocated to this avatar"""

class AnonymousUser:
    implements(IProtocolUser)
    
    def getPrivileges(self):
        return [1, 2, 3]

    def logout(self):
        print "Cleaning up anonymous user resources"

class RegularUser:
    implements(IProtocolUser)
    
    def getPrivileges(self):
        return [1, 2, 3, 5, 6]

    def logout(self):
        print "Cleaning up regular user resources"

class Administrator:
    implements(IProtocolUser)
    
    def getPrivileges(self):
        return range(50)

    def logout(self):
        print "Cleaning up administrator resources"

class Protocol(basic.LineReceiver):
    user = None
    portal = None
    avatar = None
    logout = None

    def connectionMade(self):
        self.sendLine("Login with USER <name> followed by PASS <password> or ANON")
        self.sendLine("Check privileges with PRIVS")

    def connectionLost(self, reason):
        if self.logout:
            self.logout()
            self.avatar = None
            self.logout = None
    
    def lineReceived(self, line):
        f = getattr(self, 'cmd_' + line.upper().split()[0])
        if f:
            try:
                f(*line.split()[1:])
            except TypeError:
                self.sendLine("Wrong number of arguments.")
            except:
                self.sendLine("Server error (probably your fault)")

    def cmd_ANON(self):
        if self.portal:
            self.portal.login(credentials.Anonymous(), None, IProtocolUser
                ).addCallbacks(self._cbLogin, self._ebLogin
                )
        else:
            self.sendLine("DENIED")
    
    def cmd_USER(self, name):
        self.user = name
        self.sendLine("Alright.  Now PASS?")
    
    def cmd_PASS(self, password):
        if not self.user:
            self.sendLine("USER required before PASS")
        else:
            if self.portal:
                self.portal.login(
                    credentials.UsernamePassword(self.user, password),
                    None,
                    IProtocolUser
                ).addCallbacks(self._cbLogin, self._ebLogin
                )
            else:
                self.sendLine("DENIED")

    def cmd_PRIVS(self):
        self.sendLine("You have the following privileges: ")
        self.sendLine(" ".join(map(str, self.avatar.getPrivileges())))

    def _cbLogin(self, (interface, avatar, logout)):
        assert interface is IProtocolUser
        self.avatar = avatar
        self.logout = logout
        self.sendLine("Login successful.  Available commands: PRIVS")
    
    def _ebLogin(self, failure):
        failure.trap(error.UnauthorizedLogin)
        self.sendLine("Login denied!  Go away.")

class ServerFactory(protocol.ServerFactory):
    protocol = Protocol
    
    def __init__(self, portal):
        self.portal = portal
    
    def buildProtocol(self, addr):
        p = protocol.ServerFactory.buildProtocol(self, addr)
        p.portal = self.portal
        return p

class Realm:
    implements(portal.IRealm)

    def requestAvatar(self, avatarId, mind, *interfaces):
        if IProtocolUser in interfaces:
            if avatarId == checkers.ANONYMOUS:
                av = AnonymousUser()
            elif avatarId.isupper():
                # Capitalized usernames are administrators.
                av = Administrator()
            else:
                av = RegularUser()
            return IProtocolUser, av, av.logout
        raise NotImplementedError("Only IProtocolUser interface is supported by this realm")

def main():
    r = Realm()
    p = portal.Portal(r)
    c = checkers.InMemoryUsernamePasswordDatabaseDontUse()
    c.addUser("auser", "thepass")
    c.addUser("SECONDUSER", "secret")
    p.registerChecker(c)
    p.registerChecker(checkers.AllowAnonymousAccess())
    
    f = ServerFactory(p)

    log.startLogging(sys.stdout)

    from twisted.internet import reactor
    reactor.listenTCP(4738, f)
    reactor.run()

if __name__ == '__main__':
    main()