1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205
|
##############################################################################
#
# LDAPPluginBase Base class for LDAP-based PAS-Plugins
#
##############################################################################
__doc__ = """ LDAPPluginBase module """
__version__ = '$Revision: 1347 $'[11:-2]
# General Python imports
import copy
import logging
import os
from urllib import quote_plus
# Zope imports
from Acquisition import aq_base
from OFS.Folder import Folder
from OFS.Cache import Cacheable
from Globals import InitializeClass
from AccessControl import ClassSecurityInfo
from AccessControl.SecurityManagement import getSecurityManager
from Products.PluggableAuthService.plugins.BasePlugin import BasePlugin
from Products.PluggableAuthService.interfaces.plugins import \
IAuthenticationPlugin, IRolesPlugin, ICredentialsUpdatePlugin, \
ICredentialsResetPlugin, IPropertiesPlugin
from Products.PluggableAuthService.utils import classImplements
logger = logging.getLogger('event.LDAPMultiPlugin')
class LDAPPluginBase(Folder, BasePlugin, Cacheable):
""" Base class for LDAP-based PAS plugins """
security = ClassSecurityInfo()
manage_options = ( BasePlugin.manage_options[:1]
+ Folder.manage_options
+ Cacheable.manage_options
)
_properties = BasePlugin._properties + Folder._properties
# default 'id' attribute for groups
groupid_attr = 'cn'
def __init__(self, id, title=''):
""" Initialize a new instance """
self.id = id
self.title = title
security.declarePrivate('_getLDAPUserFolder')
def _getLDAPUserFolder(self):
""" Safely retrieve a LDAPUserFolder to work with """
embedded_luf = getattr(aq_base(self), 'acl_users', None)
return embedded_luf
security.declarePrivate('authenticateCredentials')
def authenticateCredentials(self, credentials):
""" Fulfill AuthenticationPlugin requirements """
acl = self._getLDAPUserFolder()
login = credentials.get('login')
password = credentials.get('password')
if not acl or not login or not password:
return None, None
user = acl.getUser(login, pwd=password)
if user is None:
return None, None
return (user.getId(), user.getUserName())
security.declarePrivate('updateCredentials')
def updateCredentials(self, request, response, login, new_password):
""" Fulfill CredentialsUpdatePlugin requirements """
acl = self._getLDAPUserFolder()
if acl is not None:
user = acl.getUser(login)
if user is not None:
user_dn = user.getUserDN()
acl.manage_editUserPassword(user_dn, new_password)
security.declarePrivate('resetCredentials')
def resetCredentials(self, request, response):
""" Fulfill CredentialsResetPlugin requirements """
user = getSecurityManager().getUser()
acl = self._getLDAPUserFolder()
if user:
acl._expireUser(user)
security.declarePrivate('getPropertiesForUser')
def getPropertiesForUser(self, user, request=None):
""" Fullfill PropertiesPlugin requirements """
acl = self._getLDAPUserFolder()
if acl is None:
return {}
unmangled_userid = self._demangle(user.getId())
if unmangled_userid is None:
return {}
ldap_user = acl.getUserById(unmangled_userid)
if ldap_user is None:
return {}
# XXX Direct attribute access. Waaa!
properties = copy.deepcopy(ldap_user._properties)
# Need to clean up: The propertysheet mechanism will
# blow up if "None" is encountered
for key, val in properties.items():
if val is None:
properties[key] = ''
return properties
security.declarePrivate('getRolesForPrincipal')
def getRolesForPrincipal(self, user, request=None):
""" Fullfill RolesPlugin requirements """
acl = self._getLDAPUserFolder()
if acl is None:
return ()
unmangled_userid = self._demangle(user.getId())
if unmangled_userid is None:
return ()
ldap_user = acl.getUserById(unmangled_userid)
if ldap_user is None:
return ()
groups = self.getGroupsForPrincipal(user, request)
roles = list(acl._mapRoles(groups))
roles.extend(acl._roles)
return tuple(roles)
security.declarePrivate('_demangle')
def _demangle(self, princid):
# User must start with our prefix (which is likely to be blank anyway)
if not princid.startswith(self.prefix):
return None
return princid[len(self.prefix):]
# Helper methods for simple group caching
security.declarePrivate('_getGroupInfoCacheKey')
def _getGroupInfoCacheKey(self, gid):
"""_getGroupInfoCacheKey(id) -> (view_name, keywords)
given a group id, return view_name and keywords to be used when
querying and storing into the group cache
"""
view_name = self.getId() + '__GroupInfoCache'
keywords = { 'id' : gid }
return view_name, keywords
security.declarePrivate('_setGroupInfoCache')
def _setGroupInfoCache(self, info):
"""Cache a group info"""
gid = info['id']
view_name, keywords = self._getGroupInfoCacheKey(gid)
self.ZCacheable_set(info, view_name=view_name, keywords=keywords)
security.declarePrivate('_getGroupInfoCache')
def _getGroupInfoCache(self, gid, default=None):
"""Retrieve a group info from cache, given its group id.
Returns None or the passed-in default if the cache
has no group with such id
"""
view_name, keywords = self._getGroupInfoCacheKey(gid)
result = self.ZCacheable_get( view_name=view_name
, keywords=keywords
, default=default
)
return result
classImplements( LDAPPluginBase
, IAuthenticationPlugin
, ICredentialsUpdatePlugin
, ICredentialsResetPlugin
, IPropertiesPlugin
, IRolesPlugin
)
InitializeClass(LDAPPluginBase)
|