1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192
|
# -*- coding: utf-8 -*-
import json
from flask import current_app
from ldap3.utils.dn import safe_dn
from ldap3.utils.conv import check_json_dict, format_json
from ldap3.core.exceptions import LDAPAttributeError
from .query import BaseQuery
from .attribute import LDAPAttribute, LdapField
__all__ = ('LDAPEntry',)
class LDAPEntryMeta(type):
def __init__(cls, name, bases, attr):
cls._fields = {}
for key, value in attr.items():
if isinstance(value, LdapField):
cls._fields[key] = value
for base in bases:
if isinstance(base, LDAPEntryMeta):
cls._fields.update(base._fields)
# Deduplicate object classes
cls.object_classes = list(
set(cls.object_classes + base.object_classes))
@property
def query(cls):
return BaseQuery(cls)
class LDAPEntry(object, metaclass=LDAPEntryMeta):
base_dn = None
entry_rdn = ['cn']
object_classes = ['top']
sub_tree = True
operational_attributes = False
_changetype = 'add'
def __init__(self, dn=None, changetype='add', **kwargs):
self._attributes = {}
self._dn = dn
self._changetype = changetype
if kwargs:
for key, value in kwargs.items():
self._store_attr(key, value, init=True)
for key, ldap_attr in self._fields.items():
if not self._isstored(key):
self._store_attr(key, [])
@property
def dn(self):
if self._dn is None:
self.generate_dn_from_entry()
return self._dn
def generate_dn_from_entry(self):
rdn_list = list()
for key, attr in self._attributes.items():
if attr.name in self.entry_rdn:
if len(self._attributes[key]) == 1:
rdn = '{attr}={value}'.format(
attr=attr.name,
value=self._attributes[key].value
)
rdn_list.append(rdn)
dn = '{rdn},{base_dn}'.format(rdn='+'.join(rdn_list),
base_dn=self.base_dn)
self._dn = safe_dn(dn)
@classmethod
def _get_field(cls, attr):
return cls._fields.get(attr)
@classmethod
def _get_field_name(cls, attr):
if cls._get_field(attr):
return cls._get_field(attr).name
def _store_attr(self, attr, value=[], init=False):
if not self._get_field(attr):
raise LDAPAttributeError('attribute not found')
if value is None:
value = []
if not self._attributes.get(attr):
self._attributes[attr] = LDAPAttribute(self._get_field_name(attr))
self._attributes[attr].value = value
if init:
self._attributes[attr].__dict__['changetype'] = None
def _isstored(self, attr):
return self._attributes.get(attr)
def _get_attr(self, attr):
if self._isstored(attr):
return self._attributes[attr].value
return None
def __getattribute__(self, item):
if item != '_fields' and item in self._fields:
return self._get_attr(item)
return super(LDAPModel, self).__getattribute__(item)
def __setattr__(self, key, value):
if key != '_fields' and key in self._fields:
self._store_attr(key, value)
else:
return super(LDAPModel, self).__setattr__(key, value)
def get_attributes_dict(self):
return dict((attribute_key, attribute_value.values) for (attribute_key,
attribute_value) in self._attributes.items())
def get_entry_add_dict(self, attr_dict):
add_dict = dict()
for attribute_key, attribute_value in attr_dict.items():
if self._attributes[attribute_key].value != []:
add_dict.update({self._get_field_name(attribute_key): attribute_value})
return add_dict
def get_entry_modify_dict(self, attr_dict):
modify_dict = dict()
for attribute_key in attr_dict.keys():
if self._attributes[attribute_key].changetype is not None:
changes = self._attributes[attribute_key].get_changes_tuple()
modify_dict.update({self._get_field_name(attribute_key): changes})
return modify_dict
@property
def connection(self):
return current_app.extensions.get('ldap_conn')
def delete(self):
'''Delete this entry from LDAP server'''
return self.connection.connection.delete(self.dn)
def save(self):
'''Save the current instance'''
attrs = self.get_attributes_dict()
if self._changetype == 'add':
changes = self.get_entry_add_dict(attrs)
return self.connection.connection.add(self.dn,
self.object_classes,
changes)
elif self._changetype == 'modify':
changes = self.get_entry_modify_dict(attrs)
return self.connection.connection.modify(self.dn, changes)
return False
def authenticate(self, password):
'''Authenticate a user with an LDAPModel class
Args:
password (str): The user password.
'''
return self.connection.authenticate(self.dn, password)
def to_json(self, indent=2, sort=True, str_values=False):
json_entry = dict()
json_entry['dn'] = self.dn
# Get "single values" from attributes as str instead list if
# `str_values=True` else get all attributes as list. This only
# works if `FORCE_ATTRIBUTE_VALUE_AS_LIST` is False (default).
if str_values is True:
json_entry['attributes'] = {}
for attr in self._attributes.keys():
json_entry['attributes'][attr] = self._attributes[attr].value
else:
json_entry['attributes'] = self.get_attributes_dict()
if str == bytes:
check_json_dict(json_entry)
json_output = json.dumps(json_entry,
ensure_ascii=True,
sort_keys=sort,
indent=indent,
check_circular=True,
default=format_json,
separators=(',', ': '))
return json_output
LDAPModel = LDAPEntry
|