File: functions.py

package info (click to toggle)
python-ldap 3.1.0-2
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 2,248 kB
  • sloc: python: 9,465; ansic: 2,828; makefile: 132; sh: 68
file content (125 lines) | stat: -rw-r--r-- 3,132 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
"""
functions.py - wraps functions of module _ldap

See https://www.python-ldap.org/ for details.
"""

from ldap import __version__

__all__ = [
  'open','initialize','init',
  'explode_dn','explode_rdn',
  'get_option','set_option',
  'escape_str',
  'strf_secs','strp_secs',
]

import sys,pprint,time,_ldap,ldap
from calendar import timegm

from ldap import LDAPError

from ldap.dn import explode_dn,explode_rdn

from ldap.ldapobject import LDAPObject

if __debug__:
  # Tracing is only supported in debugging mode
  import traceback

# See _raise_byteswarning in ldapobject.py
_LDAP_WARN_SKIP_FRAME = True


def _ldap_function_call(lock,func,*args,**kwargs):
  """
  Wrapper function which locks and logs calls to function

  lock
      Instance of threading.Lock or compatible
  func
      Function to call with arguments passed in via *args and **kwargs
  """
  if lock:
    lock.acquire()
  if __debug__:
    if ldap._trace_level>=1:
      ldap._trace_file.write('*** %s.%s %s\n' % (
        '_ldap',func.__name__,
        pprint.pformat((args,kwargs))
      ))
      if ldap._trace_level>=9:
        traceback.print_stack(limit=ldap._trace_stack_limit,file=ldap._trace_file)
  try:
    try:
      result = func(*args,**kwargs)
    finally:
      if lock:
        lock.release()
  except LDAPError as e:
    if __debug__ and ldap._trace_level>=2:
      ldap._trace_file.write('=> LDAPError: %s\n' % (str(e)))
    raise
  if __debug__ and ldap._trace_level>=2:
    ldap._trace_file.write('=> result:\n%s\n' % (pprint.pformat(result)))
  return result


def initialize(uri,trace_level=0,trace_file=sys.stdout,trace_stack_limit=None, bytes_mode=None):
  """
  Return LDAPObject instance by opening LDAP connection to
  LDAP host specified by LDAP URL

  Parameters:
  uri
        LDAP URL containing at least connection scheme and hostport,
        e.g. ldap://localhost:389
  trace_level
        If non-zero a trace output of LDAP calls is generated.
  trace_file
        File object where to write the trace output to.
        Default is to use stdout.
  bytes_mode
        Whether to enable :ref:`bytes_mode` for backwards compatibility under Py2.
  """
  return LDAPObject(uri,trace_level,trace_file,trace_stack_limit,bytes_mode)


def get_option(option):
  """
  get_option(name) -> value

  Get the value of an LDAP global option.
  """
  return _ldap_function_call(None,_ldap.get_option,option)


def set_option(option,invalue):
  """
  set_option(name, value)

  Set the value of an LDAP global option.
  """
  return _ldap_function_call(None,_ldap.set_option,option,invalue)


def escape_str(escape_func,s,*args):
  """
  Applies escape_func() to all items of `args' and returns a string based
  on format string `s'.
  """
  return s % tuple(escape_func(v) for v in args)


def strf_secs(secs):
    """
    Convert seconds since epoch to a string compliant to LDAP syntax GeneralizedTime
    """
    return time.strftime('%Y%m%d%H%M%SZ', time.gmtime(secs))


def strp_secs(dt_str):
    """
    Convert LDAP syntax GeneralizedTime to seconds since epoch
    """
    return timegm(time.strptime(dt_str, '%Y%m%d%H%M%SZ'))