1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
|
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
import sys
from zope.interface import implements, Interface
from twisted.protocols import basic
from twisted.internet import protocol
from twisted.python import log
from twisted.cred import error
from twisted.cred import portal
from twisted.cred import checkers
from twisted.cred import credentials
class IProtocolUser(Interface):
def getPrivileges():
"""Return a list of privileges this user has."""
def logout():
"""Cleanup per-login resources allocated to this avatar"""
class AnonymousUser:
implements(IProtocolUser)
def getPrivileges(self):
return [1, 2, 3]
def logout(self):
print "Cleaning up anonymous user resources"
class RegularUser:
implements(IProtocolUser)
def getPrivileges(self):
return [1, 2, 3, 5, 6]
def logout(self):
print "Cleaning up regular user resources"
class Administrator:
implements(IProtocolUser)
def getPrivileges(self):
return range(50)
def logout(self):
print "Cleaning up administrator resources"
class Protocol(basic.LineReceiver):
user = None
portal = None
avatar = None
logout = None
def connectionMade(self):
self.sendLine("Login with USER <name> followed by PASS <password> or ANON")
self.sendLine("Check privileges with PRIVS")
def connectionLost(self, reason):
if self.logout:
self.logout()
self.avatar = None
self.logout = None
def lineReceived(self, line):
f = getattr(self, 'cmd_' + line.upper().split()[0])
if f:
try:
f(*line.split()[1:])
except TypeError:
self.sendLine("Wrong number of arguments.")
except:
self.sendLine("Server error (probably your fault)")
def cmd_ANON(self):
if self.portal:
self.portal.login(credentials.Anonymous(), None, IProtocolUser
).addCallbacks(self._cbLogin, self._ebLogin
)
else:
self.sendLine("DENIED")
def cmd_USER(self, name):
self.user = name
self.sendLine("Alright. Now PASS?")
def cmd_PASS(self, password):
if not self.user:
self.sendLine("USER required before PASS")
else:
if self.portal:
self.portal.login(
credentials.UsernamePassword(self.user, password),
None,
IProtocolUser
).addCallbacks(self._cbLogin, self._ebLogin
)
else:
self.sendLine("DENIED")
def cmd_PRIVS(self):
self.sendLine("You have the following privileges: ")
self.sendLine(" ".join(map(str, self.avatar.getPrivileges())))
def _cbLogin(self, (interface, avatar, logout)):
assert interface is IProtocolUser
self.avatar = avatar
self.logout = logout
self.sendLine("Login successful. Available commands: PRIVS")
def _ebLogin(self, failure):
failure.trap(error.UnauthorizedLogin)
self.sendLine("Login denied! Go away.")
class ServerFactory(protocol.ServerFactory):
protocol = Protocol
def __init__(self, portal):
self.portal = portal
def buildProtocol(self, addr):
p = protocol.ServerFactory.buildProtocol(self, addr)
p.portal = self.portal
return p
class Realm:
implements(portal.IRealm)
def requestAvatar(self, avatarId, mind, *interfaces):
if IProtocolUser in interfaces:
if avatarId == checkers.ANONYMOUS:
av = AnonymousUser()
elif avatarId.isupper():
# Capitalized usernames are administrators.
av = Administrator()
else:
av = RegularUser()
return IProtocolUser, av, av.logout
raise NotImplementedError("Only IProtocolUser interface is supported by this realm")
def main():
r = Realm()
p = portal.Portal(r)
c = checkers.InMemoryUsernamePasswordDatabaseDontUse()
c.addUser("auser", "thepass")
c.addUser("SECONDUSER", "secret")
p.registerChecker(c)
p.registerChecker(checkers.AllowAnonymousAccess())
f = ServerFactory(p)
log.startLogging(sys.stdout)
from twisted.internet import reactor
reactor.listenTCP(4738, f)
reactor.run()
if __name__ == '__main__':
main()
|