File: dms.pyx

package info (click to toggle)
obitools 3.0.1~b26%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 26,756 kB
  • sloc: ansic: 24,299; python: 657; sh: 27; makefile: 21
file content (374 lines) | stat: -rwxr-xr-x 10,855 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
#cython: language_level=3


from libc.stdlib  cimport free, atexit
from cpython.list cimport PyList_Size

from .capi.obidms cimport obi_open_dms, \
                          obi_create_dms, \
                          obi_close_dms, \
                          obi_dms_exists, \
                          obi_dms_get_full_path, \
                          obi_close_atexit, \
                          obi_dms_write_comments, \
                          obi_dms_formatted_infos
                                                      
from .capi.obitypes cimport const_char_p
                            
from obitools3.utils cimport bytes2str, \
                             str2bytes, \
                             tobytes, \
                             tostr, \
                             bytes2str_object, \
                             str2bytes_object, \
                             clean_empty_values_from_object
                             
from .object cimport OBIDeactivatedInstanceError

from pathlib import Path

from .view import view
from .object import OBIWrapper

import json
import time

from libc.stdlib cimport free


cdef class DMS(OBIWrapper):    
    
    cdef inline OBIDMS_p pointer(self) :
        return <OBIDMS_p>(self._pointer)
    
    
    @staticmethod
    def obi_atexit() :
        atexit(obi_close_atexit)
    
    
    @staticmethod
    def open_or_new(object dms_name) :
        cdef OBIDMS_p pointer
        cdef DMS dms
        cdef bytes dms_name_b = tobytes(dms_name)
        if DMS.exists(dms_name_b) :
            pointer = obi_open_dms(<const_char_p> dms_name_b, False)      
        else :
            pointer = obi_create_dms(<const_char_p> dms_name_b)  
        if pointer == NULL :
            raise Exception("Failed opening or creating an OBIDMS")
        dms = OBIWrapper.new_wrapper(DMS, pointer)
        return dms
    
    
    @staticmethod
    def exists(object dms_name) :
        cdef bytes dms_name_b = tobytes(dms_name)
        cdef int rep
        rep = obi_dms_exists(dms_name_b)
        if rep < 0 :
            raise RuntimeError("Error checking if a DMS exists")
        else :
            return bool(rep)
        
    
    @staticmethod
    def new(object dms_name) :
        cdef OBIDMS_p pointer
        cdef DMS dms
        cdef bytes dms_name_b = tobytes(dms_name)
        pointer = obi_create_dms(<const_char_p> dms_name_b)      
        if pointer == NULL :
            raise Exception("Failed creating an OBIDMS")
        dms = OBIWrapper.new_wrapper(DMS, pointer)
        return dms


    @staticmethod
    def open(object dms_name) :
        cdef OBIDMS_p pointer
        cdef DMS dms
        cdef bytes dms_name_b = tobytes(dms_name)
        pointer = obi_open_dms(<const_char_p> dms_name_b, False)      
        if pointer == NULL :
            raise Exception("Failed opening an OBIDMS")
        dms = OBIWrapper.new_wrapper(DMS, pointer)
        return dms
    
    
    def close(self, force=False) :
        '''
        Closes the DMS instance and free the associated memory (no counter, closing is final)
        
        The `close` method is automatically called by the object destructor.
        '''
        cdef OBIDMS_p pointer = self.pointer()
        if self.active() :
            OBIWrapper.close(self)
            if (obi_close_dms(pointer, force=force)) < 0 :
                raise Exception("Problem closing an OBIDMS")


    # name property getter
    @property
    def name(self) :
        '''
        Returns the name of the DMS instance
        
            @rtype: bytes
        '''
        return <bytes> self.pointer().dms_name


    # DMS full path property getter
    @property
    def full_path(self) :
        '''
        Returns the full path including the name with the extension of the DMS directory
        
            @rtype: bytes
        '''
        return <bytes> self.pointer().directory_path


    # DMS name with full path property getter
    @property
    def name_with_full_path(self) :
        '''
        Returns the full path with the name (meaning without the '.obidms' extension) of the DMS directory
        
            @rtype: bytes
        '''
        return <bytes> self.full_path[:-7]


    # command history DOT graph property getter in the form of a bytes string
    @property
    def dot_history_graph(self):
        complete_graph = []
        for view_name in self:
            complete_graph.extend(self[view_name].dot_history_graph_list)
        uniq_graph = []
        for elt in complete_graph:
            if elt not in uniq_graph:
                uniq_graph.append(elt)
        uniq_graph.insert(0, b"digraph \""+self.name+b"\" {\n")
        uniq_graph.append(b"}")
        return b"".join(uniq_graph)


    @OBIWrapper.checkIsActive
    def keys(self) :
        
        cdef const_char_p path = obi_dms_get_full_path(self.pointer(), b"VIEWS")
        
        if path == NULL:
            raise RuntimeError("Cannot retrieve the view database path")
        
        p = Path(bytes2str(path))
        
        free(path)
        
        for v in p.glob("*.obiview") :
            yield str2bytes(v.stem)


    @OBIWrapper.checkIsActive
    def values(self) :
        cdef bytes view_name
        for view_name in self.keys():
            yield self.get_view(view_name)
    
    
    @OBIWrapper.checkIsActive
    def items(self) :
        cdef bytes view_name
        for view_name in self.keys():
            yield (view_name, self.get_view(view_name))
    
    
    @OBIWrapper.checkIsActive
    def __contains__(self, key) :
        
        cdef str key_s = tostr(key)
        
        cdef const_char_p path = obi_dms_get_full_path(self.pointer(), b"VIEWS")
        p = Path(bytes2str(path),key_s)

        free(path)
        
        return p.with_suffix(".obiview").is_file()
    
    
    cpdef int view_count(self) :
        return PyList_Size(list(self.keys()))
    
    
    @OBIWrapper.checkIsActive
    def __len__(self) :
        return self.view_count()
    
    
    @OBIWrapper.checkIsActive
    def __getitem__(self, object view_name):
        return self.get_view(view_name)
    
    
    @OBIWrapper.checkIsActive
    def __iter__(self) :
        return self.keys()
    
    
    @OBIWrapper.checkIsActive
    def get_view(self, object view_name) :
        return view.View.open(self, view_name)


    @OBIWrapper.checkIsActive
    def __repr__(self) :
        cdef str s
        cdef char* sc
        cdef OBIDMS_p pointer = self.pointer()
        sc = obi_dms_formatted_infos(pointer, False)
        s = bytes2str(sc)
        free(sc)
        return s


    @OBIWrapper.checkIsActive
    def repr_longformat(self) :
        cdef str s
        cdef char* sc
        cdef OBIDMS_p pointer = self.pointer()
        sc = obi_dms_formatted_infos(pointer, True)
        s = bytes2str(sc)
        free(sc)
        return s
    

    @OBIWrapper.checkIsActive    
    def record_command_line(self, command_line):
        t = time.asctime(time.localtime(time.time()))
        if "command_line_history" not in self.comments:
            l = []
        else:
            l = self.comments["command_line_history"]
        l.append({"command":command_line, "time":t})
        self.comments["command_line_history"] = l
        

    # comments property getter
    @property
    def comments(self):
        return DMS_comments(self)
    @comments.setter
    def comments(self, object value):
        DMS_comments(self, value)


    # bash command history property getter
    @property
    def bash_history(self):
        #s = b"#!${bash}/bin/bash\n\n"
        s = b""
        first = True
        for command in self.command_line_history:
            s+=b"#"
            s+=command[b"time"]
            s+=b"\nobi "
            s+=command[b"command"]
            s+=b"\n"
        return s


    # command line history property getter
    @property
    def command_line_history(self):
        return self.comments[b"command_line_history"]

          
cdef class DMS_comments(dict):   # Not thread safe
    def __init__(self, DMS dms, value=None) :
        if not dms.active() :
            raise OBIDeactivatedInstanceError()
        self._dms = dms
        if value is not None:
            self.update(value)     # TODO test and discuss not overwriting (could use replace bool)
        self._update_from_file()

    def _update_from_file(self):
        cdef bytes comments_json
        cdef str comments_json_str
        cdef OBIDMS_p dms_p
        cdef DMS dms
        if not self._dms.active() :
            raise OBIDeactivatedInstanceError()
        dms = self._dms
        dms_p = <OBIDMS_p>(dms.pointer())
        comments_json = <bytes> dms_p.infos.comments
        comments_json_str = bytes2str(comments_json)
        comments_dict = json.loads(comments_json_str)
        str2bytes_object(comments_dict)
        super(DMS_comments, self).update(comments_dict)
        
    def __getitem__(self, object key):
        if not self._dms.active() :
            raise OBIDeactivatedInstanceError()
        if type(key) == str:
            key = str2bytes(key)
        self._update_from_file()
        return super(DMS_comments, self).__getitem__(key)

    def __setitem__(self, object key, object value):
        cdef OBIDMS_p dms_p
        cdef DMS dms

        if not self._dms.active() :
            raise OBIDeactivatedInstanceError()

        dms = self._dms
        dms_p = <OBIDMS_p>(dms.pointer())

        # Remove virtually empty values from the object    # TODO discuss
        clean_empty_values_from_object(value)
        
        # If value is virtually empty, don't add it    # TODO discuss
        if value is None or len(value) == 0:
            return

        # Convert to bytes
        if type(key) == str:
            key = str2bytes(key)
        value_bytes = str2bytes_object(value)

       # Update dict with comments already written in file
        self._update_from_file()
        
        # Add new element  # TODO don't overwrite?
        super(DMS_comments, self).__setitem__(key, value_bytes)

        # Convert to str because json library doens't like bytes
        dict_str = {key:item for key,item in self.items()}
        dict_str = bytes2str_object(dict_str)
        
        # Convert to json string
        comments_json = json.dumps(dict_str)
        
        # Write new comments
        if obi_dms_write_comments(dms_p, tobytes(comments_json)) < 0:
            raise Exception("Could not write DMS comments, DMS:", dms.name, "comments:", comments_json)

    def update(self, value):
        for k,v in value.items():
            self[k] = v
      
    def __contains__(self, key):
        return super(DMS_comments, self).__contains__(tobytes(key))

    def __str__(self):
        return bytes2str(self._dms.pointer().infos.comments)