File: mockSync.py

package info (click to toggle)
python-ldap3 2.9.1-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm, forky, sid, trixie
  • size: 3,236 kB
  • sloc: python: 30,487; makefile: 3
file content (133 lines) | stat: -rw-r--r-- 7,315 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""
"""

# Created on 2014.11.17
#
# Author: Giovanni Cannata
#
# Copyright 2014 - 2020 Giovanni Cannata
#
# This file is part of ldap3.
#
# ldap3 is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# ldap3 is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with ldap3 in the COPYING and COPYING.LESSER files.
# If not, see <http://www.gnu.org/licenses/>.

from ..core.results import DO_NOT_RAISE_EXCEPTIONS
from .mockBase import MockBaseStrategy
from .. import ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES, NO_ATTRIBUTES
from .sync import SyncStrategy
from ..operation.bind import bind_response_to_dict
from ..operation.delete import delete_response_to_dict
from ..operation.add import add_response_to_dict
from ..operation.compare import compare_response_to_dict
from ..operation.modifyDn import modify_dn_response_to_dict
from ..operation.modify import modify_response_to_dict
from ..operation.search import search_result_done_response_to_dict, search_result_entry_response_to_dict
from ..operation.extended import extended_response_to_dict
from ..core.exceptions import LDAPSocketOpenError, LDAPOperationResult
from ..utils.log import log, log_enabled, ERROR, PROTOCOL


class MockSyncStrategy(MockBaseStrategy, SyncStrategy):  # class inheritance sequence is important, MockBaseStrategy must be the first one
    """
    This strategy create a mock LDAP server, with synchronous access
    It can be useful to test LDAP without accessing a real Server
    """
    def __init__(self, ldap_connection):
        SyncStrategy.__init__(self, ldap_connection)
        MockBaseStrategy.__init__(self)

    def post_send_search(self, payload):
        message_id, message_type, request, controls = payload
        self.connection.response = []
        self.connection.result = dict()
        if message_type == 'searchRequest':
            responses, result = self.mock_search(request, controls)
            for entry in responses:
                response = search_result_entry_response_to_dict(entry, self.connection.server.schema, self.connection.server.custom_formatter, self.connection.check_names)
                response['type'] = 'searchResEntry'
                ###
                if self.connection.empty_attributes:
                    for attribute_type in request['attributes']:
                        attribute_name = str(attribute_type)
                        if attribute_name not in response['raw_attributes'] and attribute_name not in (ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES, NO_ATTRIBUTES):
                            response['raw_attributes'][attribute_name] = list()
                            response['attributes'][attribute_name] = list()
                            if log_enabled(PROTOCOL):
                                log(PROTOCOL, 'attribute set to empty list for missing attribute <%s> in <%s>',
                                    attribute_type, self)
                    if not self.connection.auto_range:
                        attrs_to_remove = []
                        # removes original empty attribute in case a range tag is returned
                        for attribute_type in response['attributes']:
                            attribute_name = str(attribute_type)
                            if ';range' in attribute_name.lower():
                                orig_attr, _, _ = attribute_name.partition(';')
                                attrs_to_remove.append(orig_attr)
                        for attribute_type in attrs_to_remove:
                            if log_enabled(PROTOCOL):
                                log(PROTOCOL,
                                    'attribute type <%s> removed in response because of same attribute returned as range by the server in <%s>',
                                    attribute_type, self)
                            del response['raw_attributes'][attribute_type]
                            del response['attributes'][attribute_type]
                ###
                self.connection.response.append(response)
            result = search_result_done_response_to_dict(result)
            result['type'] = 'searchResDone'
            self.connection.result = result
            if self.connection.raise_exceptions and result and result['result'] not in DO_NOT_RAISE_EXCEPTIONS:
                if log_enabled(PROTOCOL):
                    log(PROTOCOL, 'operation result <%s> for <%s>', result, self.connection)
                raise LDAPOperationResult(result=result['result'], description=result['description'], dn=result['dn'], message=result['message'], response_type=result['type'])

        return self.connection.response

    def post_send_single_response(self, payload):  # payload is a tuple sent by self.send() made of message_type, request, controls
        message_id, message_type, request, controls = payload
        responses = []
        result = None
        if message_type == 'bindRequest':
            result = bind_response_to_dict(self.mock_bind(request, controls))
            result['type'] = 'bindResponse'
        elif message_type == 'unbindRequest':
            self.bound = None
        elif message_type == 'abandonRequest':
            pass
        elif message_type == 'delRequest':
            result = delete_response_to_dict(self.mock_delete(request, controls))
            result['type'] = 'delResponse'
        elif message_type == 'addRequest':
            result = add_response_to_dict(self.mock_add(request, controls))
            result['type'] = 'addResponse'
        elif message_type == 'compareRequest':
            result = compare_response_to_dict(self.mock_compare(request, controls))
            result['type'] = 'compareResponse'
        elif message_type == 'modDNRequest':
            result = modify_dn_response_to_dict(self.mock_modify_dn(request, controls))
            result['type'] = 'modDNResponse'
        elif message_type == 'modifyRequest':
            result = modify_response_to_dict(self.mock_modify(request, controls))
            result['type'] = 'modifyResponse'
        elif message_type == 'extendedReq':
            result = extended_response_to_dict(self.mock_extended(request, controls))
            result['type'] = 'extendedResp'
        self.connection.result = result
        responses.append(result)
        if self.connection.raise_exceptions and result and result['result'] not in DO_NOT_RAISE_EXCEPTIONS:
            if log_enabled(PROTOCOL):
                log(PROTOCOL, 'operation result <%s> for <%s>', result, self.connection)
            raise LDAPOperationResult(result=result['result'], description=result['description'], dn=result['dn'], message=result['message'], response_type=result['type'])
        return responses