File: oids.pyx

package info (click to toggle)
python-gssapi 1.10.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 876 kB
  • sloc: python: 3,707; sh: 198; makefile: 154; ansic: 60
file content (133 lines) | stat: -rw-r--r-- 4,535 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
GSSAPI="BASE"  # This ensures that a full module is generated by Cython

from libc.string cimport memcmp, memcpy
from libc.stdlib cimport free, malloc

from gssapi.raw.cython_types cimport gss_OID

cdef inline bint c_compare_oids(gss_OID a, gss_OID b):
    return (a.length == b.length and not
            memcmp(a.elements, b.elements, a.length))


cdef class OID:
    # defined in pxd
    # cdef gss_OID_desc raw_oid = NULL
    # cdef bint _free_on_dealloc = NULL

    def __cinit__(OID self, OID cpy=None, elements=None):
        """
        Note: cpy is named such for historical reasons. To perform a deep
        copy, specify the elements parameter; this will copy the value of the
        OID. To perform a shallow copy and take ownership of an existing OID,
        use the cpy (default) argument.
        """
        if cpy is not None and elements is not None:
            raise TypeError("Cannot instantiate a OID from both a copy and "
                            " a new set of elements")
        if cpy is not None:
            self.raw_oid = cpy.raw_oid
            # take ownership of this OID (for dynamic cases)
            self._free_on_dealloc = cpy._free_on_dealloc
            cpy._free_on_dealloc = False

        if elements is None:
            self._free_on_dealloc = False
        else:
            self._from_bytes(elements)

    cdef int _copy_from(OID self, gss_OID_desc base) except -1:
        self.raw_oid.length = base.length
        self.raw_oid.elements = malloc(self.raw_oid.length)
        if self.raw_oid.elements is NULL:
            raise MemoryError("Could not allocate memory for OID elements!")
        memcpy(self.raw_oid.elements, base.elements, self.raw_oid.length)
        self._free_on_dealloc = True
        return 0

    cdef int _from_bytes(OID self, object base) except -1:
        base_bytes = bytes(base)
        cdef char* byte_str = base_bytes

        self.raw_oid.length = len(base_bytes)
        self.raw_oid.elements = malloc(self.raw_oid.length)
        if self.raw_oid.elements is NULL:
            raise MemoryError("Could not allocate memory for OID elements!")
        self._free_on_dealloc = True
        memcpy(self.raw_oid.elements, byte_str, self.raw_oid.length)
        return 0

    @classmethod
    def from_int_seq(cls, integer_sequence):
        if isinstance(integer_sequence, str):
            integer_sequence = integer_sequence.split('.')

        oid_seq = [int(x) for x in integer_sequence]

        elements = cls._encode_asn1ber(oid_seq)

        return cls(elements=elements)

    @staticmethod
    def _encode_asn1ber(oid_seq):
        if len(oid_seq) < 2:
            raise ValueError("Sequence must be 2 or more elements long.")

        byte_seq = bytearray([oid_seq[0] * 40 + oid_seq[1]])
        for element in oid_seq[2:]:
            element_seq = [element & 0x7f]

            while element > 127:
                element >>= 7
                element_seq.insert(0, (element & 0x7f) | 0x80)

            byte_seq.extend(element_seq)

        return bytes(byte_seq)

    def __dealloc__(self):
        # NB(directxman12): MIT Kerberos has gss_release_oid
        #                   for this purpose, but it's not in the RFC
        if self._free_on_dealloc:
            free(self.raw_oid.elements)

    def __bytes__(self):
        return (<char*>self.raw_oid.elements)[:self.raw_oid.length]

    def _decode_asn1ber(self):
        ber_encoding = self.__bytes__()

        decoded = [ber_encoding[0] // 40, ber_encoding[0] % 40]
        pos = 1
        value = 0
        while pos < len(ber_encoding):
            byte = ber_encoding[pos]
            if byte & 0x80:
                # This is one of the leading bytes
                value <<= 7
                value += ((byte & 0x7f) * 128)
            else:
                # This is the last byte of this value
                value += (byte & 0x7f)
                decoded.append(value)
                value = 0
            pos += 1
        return decoded

    @property
    def dotted_form(self):
        return '.'.join(str(x) for x in self._decode_asn1ber())

    def __repr__(self):
        return "<OID {0}>".format(self.dotted_form)

    def __hash__(self):
        return hash(self.__bytes__())

    def __richcmp__(OID self, OID other, op):
        if op == 2:  # ==
            return c_compare_oids(&self.raw_oid, &other.raw_oid)
        elif op == 3:  # !=
            return not c_compare_oids(&self.raw_oid, &other.raw_oid)
        else:
            return NotImplemented