File: format_bcif.py

package info (click to toggle)
python-ihm 2.7-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 3,368 kB
  • sloc: python: 30,422; ansic: 5,990; sh: 24; makefile: 20
file content (655 lines) | stat: -rw-r--r-- 22,229 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
"""Utility classes to handle BinaryCIF format.

   See https://github.com/molstar/BinaryCIF for a description of the
   BinaryCIF file format.

   This module provides classes to read in and write out BinaryCIF files. It is
   only concerned with handling syntactically correct BinaryCIF -
   it does not know the set of tables or the mapping to ihm objects. For that,
   see :mod:`ihm.reader`.
"""

import struct
import sys
import inspect
import ihm.format
import ihm
try:
    from . import _format
except ImportError:
    _format = None

# ByteArray types
_Int8 = 1
_Int16 = 2
_Int32 = 3
_Uint8 = 4
_Uint16 = 5
_Uint32 = 6
_Float32 = 32
_Float64 = 33


class _Decoder:
    """Base class for all decoders."""

    _kind = None  # Encoder kind (in BinaryCIF specification)

    def __call__(self, enc, data):
        """Given encoding information `enc` and raw data `data`, return
           decoded data. This can be a generator."""
        pass


class _StringArrayDecoder(_Decoder):
    """Decode an array of strings stored as a concatenation of all unique
       strings, an array of offsets describing substrings, and indices into
       the offset array."""
    _kind = 'StringArray'

    def __call__(self, enc, data):
        offsets = list(_decode(enc['offsets'], enc['offsetEncoding']))
        indices = _decode(data, enc['dataEncoding'])
        substr = []
        string_data = enc['stringData']
        for i in range(0, len(offsets) - 1):
            substr.append(string_data[offsets[i]:offsets[i + 1]])
        # todo: return a listlike class instead?
        for i in indices:
            yield None if i < 0 else substr[i]


class _ByteArrayDecoder(_Decoder):
    """Decode an array of numbers of specified type stored as raw bytes"""

    _kind = 'ByteArray'

    # Map integer/float type to struct format string
    _struct_map = {
        _Int8: 'b',
        _Int16: 'h',
        _Int32: 'i',
        _Uint8: 'B',
        _Uint16: 'H',
        _Uint32: 'I',
        _Float32: 'f',
        _Float64: 'd',
    }

    def __call__(self, enc, data):
        fmt = self._struct_map[enc['type']]
        sz = len(data) // struct.calcsize(fmt)
        # All data is encoded little-endian in bcif
        return struct.unpack('<' + fmt * sz, data)


class _IntegerPackingDecoder(_Decoder):
    """Decode a (32-bit) integer array stored as 8- or 16-bit values."""
    _kind = 'IntegerPacking'

    def _unsigned_decode(self, enc, data):
        limit = 0xFF if enc['byteCount'] == 1 else 0xFFFF
        i = 0
        while i < len(data):
            value = 0
            t = data[i]
            while t == limit:
                value += t
                i += 1
                t = data[i]
            yield value + t
            i += 1

    def _signed_decode(self, enc, data):
        upper_limit = 0x7F if enc['byteCount'] == 1 else 0x7FFF
        lower_limit = -upper_limit - 1
        i = 0
        while i < len(data):
            value = 0
            t = data[i]
            while t == upper_limit or t == lower_limit:
                value += t
                i += 1
                t = data[i]
            yield value + t
            i += 1

    def __call__(self, enc, data):
        if enc['isUnsigned']:
            return self._unsigned_decode(enc, data)
        else:
            return self._signed_decode(enc, data)


class _DeltaDecoder(_Decoder):
    """Decode an integer array stored as an array of consecutive
       differences."""
    _kind = 'Delta'

    def __call__(self, enc, data):
        val = enc['origin']
        for d in data:
            val += d
            yield val


class _RunLengthDecoder(_Decoder):
    """Decode an integer array stored as pairs of (value, number of repeats)"""
    _kind = 'RunLength'

    def __call__(self, enc, data):
        data = list(data)
        for i in range(0, len(data), 2):
            for j in range(data[i + 1]):
                yield data[i]


class _FixedPointDecoder(_Decoder):
    """Decode a floating point array stored as integers multiplied by
       a given factor."""
    _kind = 'FixedPoint'

    def __call__(self, enc, data):
        factor = float(enc['factor'])
        for d in data:
            yield float(d) / factor


class _IntervalQuantizationDecoder(_Decoder):
    """Decode a floating point array stored as integers quantized within
       a given interval into a number of discrete steps."""
    _kind = 'IntervalQuantization'

    def __call__(self, enc, data):
        minval = float(enc['min'])
        maxval = float(enc['max'])
        numsteps = int(enc['numSteps'])
        delta = (maxval - minval) / (numsteps - 1)
        for d in data:
            yield minval + delta * d


def _get_decoder_map():
    m = {}
    for d in [x[1] for x in inspect.getmembers(sys.modules[__name__],
                                               inspect.isclass)
              if issubclass(x[1], _Decoder)]:
        m[d._kind] = d()
    return m


# Mapping from BinaryCIF encoding names to _Decoder objects
_decoder_map = _get_decoder_map()


def _decode(data, encoding):
    """Decode the data using the list of encodings, and return it."""
    for enc in reversed(encoding):
        data = _decoder_map[enc['kind']](enc, data)
    return data


class _BoolTypeHandler:
    _bool_map = {'YES': True, 'NO': False}

    def __init__(self, omitted):
        self.omitted = omitted

    def __call__(self, txt):
        return self._bool_map.get(str(txt).upper(), self.omitted)


class BinaryCifReader(ihm.format._Reader):
    """Class to read a BinaryCIF file and extract some or all of its data.

       Use :meth:`read_file` to actually read the file.
       See :class:`ihm.format.CifReader` for a description of the parameters.
    """
    def __init__(self, fh, category_handler, unknown_category_handler=None,
                 unknown_keyword_handler=None):
        if _format is not None:
            c_file = _format.ihm_file_new_from_python(fh, True)
            self._c_format = _format.ihm_reader_new(c_file, True)
        self.category_handler = category_handler
        self.unknown_category_handler = unknown_category_handler
        self.unknown_keyword_handler = unknown_keyword_handler
        self.fh = fh
        self._file_blocks = None

    def __del__(self):
        if hasattr(self, '_c_format'):
            _format.ihm_reader_free(self._c_format)

    def read_file(self):
        """Read the file and extract data.

           If the C-accelerated _format module is available, then it is used
           instead of the (much slower) Python reader.

           :return: True iff more data blocks are available to be read.
        """
        self._add_category_keys()
        if hasattr(self, '_c_format'):
            return self._read_file_c()

        if self._file_blocks is None:
            self._file_blocks = self._read_msgpack()
        if len(self._file_blocks) > 0:
            for category in self._file_blocks[0]['categories']:
                cat_name = category['name'].lower()
                handler = self.category_handler.get(cat_name, None)
                if handler:
                    self._handle_category(handler, category, cat_name)
                elif self.unknown_category_handler is not None:
                    self.unknown_category_handler(cat_name, 0)
            del self._file_blocks[0]
        return len(self._file_blocks) > 0

    def _read_file_c(self):
        """Read the file using the C parser"""
        _format.ihm_reader_remove_all_categories(self._c_format)
        for category, handler in self.category_handler.items():
            func = getattr(handler, '_add_c_handler', None) \
                or _format.add_category_handler
            func(self._c_format, category, handler._keys,
                 frozenset(handler._int_keys), frozenset(handler._float_keys),
                 frozenset(handler._bool_keys), handler)
        if self.unknown_category_handler is not None:
            _format.add_unknown_category_handler(self._c_format,
                                                 self.unknown_category_handler)
        if self.unknown_keyword_handler is not None:
            _format.add_unknown_keyword_handler(self._c_format,
                                                self.unknown_keyword_handler)
        ret_ok, more_data = _format.ihm_read_file(self._c_format)
        return more_data

    def _get_type_handler(self, category_handler, keyword):
        """Return a function that converts keyword string into desired type"""
        if keyword in category_handler._int_keys:
            return int
        elif keyword in category_handler._bool_keys:
            return _BoolTypeHandler(category_handler.omitted)
        elif keyword in category_handler._float_keys:
            return float
        else:
            return str

    def _handle_category(self, handler, category, cat_name):
        """Extract data for the given category"""
        num_cols = len(handler._keys)
        type_handlers = [self._get_type_handler(handler, k)
                         for k in handler._keys]
        # Read all data for the category;
        # category_data[col][row]
        category_data = [None] * num_cols
        num_rows = 0
        # Only read columns that match a handler key (case insensitive)
        key_index = {}
        for i, key in enumerate(handler._keys):
            key_index[key] = i
        column_indices = []
        for c in category['columns']:
            key_name = c['name'].lower()
            ki = key_index.get(key_name, None)
            if ki is not None:
                column_indices.append(ki)
                r = self._read_column(c, handler, type_handlers[ki])
                num_rows = len(r)
                category_data[ki] = r
            elif self.unknown_keyword_handler is not None:
                self.unknown_keyword_handler(cat_name, key_name, 0)
        row_data = [handler.not_in_file] * num_cols
        for row in range(num_rows):
            # Only update data for columns that we read (others will
            # remain None)
            for i in column_indices:
                row_data[i] = category_data[i][row]
            handler(*row_data)

    def _read_column(self, column, handler, type_handler):
        """Read a single category column data"""
        data = _decode(column['data']['data'], column['data']['encoding'])
        # Handle 'unknown' values (mask==2) or 'omitted' (mask==1)
        if column['mask'] is not None:
            mask = _decode(column['mask']['data'],
                           column['mask']['encoding'])
            return [handler.unknown if m == 2 else handler.omitted if m == 1
                    else type_handler(d) for d, m in zip(data, mask)]
        else:
            return [type_handler(d) for d in data]

    def _read_msgpack(self):
        """Read the msgpack data from the file and return data blocks"""
        import msgpack
        d = msgpack.unpack(self.fh, raw=False)
        return d['dataBlocks']


class _CategoryWriter:
    def __init__(self, writer, category):
        self.writer = writer
        self.category = category
        self._data = {}

    def write(self, **kwargs):
        self._data.update(kwargs)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        for k in self._data:
            self._data[k] = [self._data[k]]
        self.writer._add_category(self.category, self._data)


class _LoopWriter:
    def __init__(self, writer, category, keys):
        self.writer = writer
        self.category = category
        self.keys = keys
        # Remove characters that we can't use in Python identifiers
        self.python_keys = [k.replace('[', '').replace(']', '') for k in keys]
        self._values = []
        for i in range(len(keys)):
            self._values.append([])

    def write(self, **kwargs):
        for i, k in enumerate(self.python_keys):
            val = kwargs.get(k, None)
            self._values[i].append(val)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        data = {}
        for key, value in zip(self.keys, self._values):
            data[key] = value
        self.writer._add_category(self.category, data)


class EncodeError(Exception):
    """Exception raised if input data cannot be encoded"""
    pass


class _Encoder:
    """Base class for all encoders"""
    _kind = None  # Encoder kind (in BinaryCIF specification)

    def __call__(self, data):
        """Given raw data `data`, return encoded data and a BinaryCIF
           encoder information dict."""
        pass


def _get_int_float_type(data):
    """Determine the int/float type of the given data"""
    # If anything is float, treat everything as single-precision float
    for d in data:
        if isinstance(d, float):
            return _Float32
    # Otherwise, figure out the most appropriate int type
    min_val = min(data)
    max_val = max(data)
    if min_val >= 0:
        # Unsigned types
        for typ, limit in [(_Uint8, 0xFF), (_Uint16, 0xFFFF),
                           (_Uint32, 0xFFFFFFFF)]:
            if max_val <= limit:
                return typ
    else:
        # Signed types
        for typ, up_limit in [(_Int8, 0x7F), (_Int16, 0x7FFF),
                              (_Int32, 0x7FFFFFFF)]:
            low_limit = -up_limit - 1
            if min_val >= low_limit and max_val <= up_limit:
                return typ
    raise TypeError("Cannot represent data as BinaryCIF")


class _ByteArrayEncoder(_Encoder):

    # Map integer/float type to struct format string
    _struct_map = _ByteArrayDecoder._struct_map

    def __call__(self, data):
        ba_type = _get_int_float_type(data)
        encdict = {'kind': 'ByteArray', 'type': ba_type}
        fmt = self._struct_map[ba_type]
        # All data is encoded little-endian in bcif
        return struct.pack('<' + fmt * len(data), *data), encdict


class _DeltaEncoder(_Encoder):
    """Encode an integer array as an array of consecutive differences."""
    def __call__(self, data):
        # Don't try to compress small arrays; the overhead of the compression
        # probably will exceed the space savings
        if len(data) <= 40:
            return data, None
        data_type = _get_int_float_type(data)
        encdict = {'kind': 'Delta', 'origin': data[0],
                   'srcType': data_type}
        encdata = [0] + [data[i] - data[i - 1] for i in range(1, len(data))]
        return encdata, encdict


class _RunLengthEncoder(_Encoder):
    """Encode an integer array as pairs of (value, number of repeats)"""
    def __call__(self, data):
        # Don't try to compress small arrays; the overhead of the compression
        # probably will exceed the space savings
        if len(data) <= 40:
            return data, None
        data_type = _get_int_float_type(data)
        encdict = {'kind': 'RunLength',
                   'srcType': data_type, 'srcSize': len(data)}
        encdata = []
        val = None
        for d in data:
            if d != val:
                if val is not None:
                    encdata.extend((val, repeat))   # noqa: F821
                val = d
                repeat = 1
            else:
                repeat += 1
        encdata.extend((val, repeat))
        # If we didn't save any space, return the original unchanged
        if len(encdata) > len(data):
            return data, None
        else:
            return encdata, encdict


def _encode(data, encoders):
    """Encode data using the given encoder objects. Return the encoded data
       and a list of BinaryCIF encoding dicts."""
    encdicts = []
    for enc in encoders:
        data, encdict = enc(data)
        if encdict is not None:
            encdicts.append(encdict)
    return data, encdicts


class _MaskedEncoder:
    """Base class for all encoders that handle potentially masked data"""

    def __call__(self, data, mask):
        """Given raw data `data`, and `mask`, return encoded data"""
        pass


class _StringArrayMaskedEncoder(_MaskedEncoder):
    _int_encoders = [_DeltaEncoder(), _RunLengthEncoder(),
                     _ByteArrayEncoder()]

    def __call__(self, data, mask):
        seen_substrs = {}  # keys are substrings, values indices
        sorted_substrs = []
        indices = []
        for i, reals in enumerate(data):
            if mask is not None and mask[i]:
                indices.append(-1)
            else:
                s = reals
                # Map bool to YES/NO strings
                if isinstance(s, bool):
                    s = ihm.format._Writer._boolmap[s]
                else:
                    s = str(s)  # coerce non-str data to str
                if s not in seen_substrs:
                    seen_substrs[s] = len(seen_substrs)
                    sorted_substrs.append(s)
                indices.append(seen_substrs[s])
        offsets = [0]
        total_len = 0
        for s in sorted_substrs:
            total_len += len(s)
            offsets.append(total_len)

        data_offsets, enc_offsets = _encode(offsets, self._int_encoders)
        data_indices, enc_indices = _encode(indices, self._int_encoders)

        enc_dict = {'kind': 'StringArray',
                    'dataEncoding': enc_indices,
                    'stringData': ''.join(sorted_substrs),
                    'offsetEncoding': enc_offsets,
                    'offsets': data_offsets}
        return data_indices, [enc_dict]


class _IntArrayMaskedEncoder(_MaskedEncoder):
    _encoders = [_DeltaEncoder(), _RunLengthEncoder(), _ByteArrayEncoder()]

    def __call__(self, data, mask):
        if mask:
            masked_data = [-1 if m else d for m, d in zip(mask, data)]
        else:
            masked_data = data
        encdata, encoders = _encode(masked_data, self._encoders)
        return encdata, encoders


class _FloatArrayMaskedEncoder(_MaskedEncoder):
    _encoders = [_ByteArrayEncoder()]

    def __call__(self, data, mask):
        if mask:
            masked_data = [0. if m else d for m, d in zip(mask, data)]
        else:
            masked_data = data
        encdata, encoders = _encode(masked_data, self._encoders)
        return encdata, encoders


def _get_mask_and_type(data):
    """Detect missing/omitted values in `data` and determine the type of
       the remaining values (str, int, float)"""
    mask = None
    seen_types = set()
    for i, val in enumerate(data):
        if val is None or val == ihm.unknown:
            if mask is None:
                mask = [0] * len(data)
            mask[i] = 1 if val is None else 2
        else:
            seen_types.add(type(val))
    # If a mix of types, coerce to that of the highest precedence
    # (mixed int/float can be represented as float; mix int/float/str can
    # be represented as str; bool is represented as str)
    if not seen_types or bool in seen_types or str in seen_types:
        return mask, str
    elif float in seen_types:
        return mask, float
    elif int in seen_types:
        return mask, int
    for t in seen_types:
        # Handle numpy float types like Python float
        # todo: this is a hack
        if 'numpy.float' in str(t):
            return mask, float
    raise ValueError("Cannot determine type of data %s" % data)


class BinaryCifWriter(ihm.format._Writer):
    """Write information to a BinaryCIF file. See :class:`ihm.format.CifWriter`
       for more information. The constructor takes a single argument - a Python
       filelike object, open for writing in binary mode."""

    _mask_encoders = [_DeltaEncoder(), _RunLengthEncoder(),
                      _ByteArrayEncoder()]

    def __init__(self, fh):
        super().__init__(fh)
        self._blocks = []
        self._masked_encoder = {str: _StringArrayMaskedEncoder(),
                                int: _IntArrayMaskedEncoder(),
                                float: _FloatArrayMaskedEncoder()}

    def category(self, category):
        """See :meth:`ihm.format.CifWriter.category`."""
        return _CategoryWriter(self, category)

    def loop(self, category, keys):
        """See :meth:`ihm.format.CifWriter.loop`."""
        return _LoopWriter(self, category, keys)

    def write_comment(self, comment):
        """See :meth:`ihm.format.CifWriter.write_comment`.

           .. note::

              BinaryCIF does not support comments, so this is a noop.
        """
        pass

    def _encode_data(self, data):
        mask, typ = _get_mask_and_type(data)
        enc = self._masked_encoder[typ]
        encdata, encs = enc(data, mask)
        if mask:
            data_mask, enc_mask = _encode(mask, self._mask_encoders)
            mask = {'data': data_mask, 'encoding': enc_mask}
        return mask, encdata, encs

    def _encode_column(self, name, data):
        mask, encdata, encs = self._encode_data(data)
        return {'name': name, 'mask': mask,
                'data': {'data': encdata, 'encoding': encs}}

    def start_block(self, name):
        """See :meth:`ihm.format.CifWriter.start_block`."""
        block = {'header': name, 'categories': []}
        self._categories = block['categories']
        self._blocks.append(block)

    def end_block(self):
        # noop - end-of-block is handled by start_block() and flush()
        pass

    def _add_category(self, category, data):
        row_count = 0
        cols = []
        for k, v in data.items():
            row_count = len(v)
            # Do nothing if the category has no data
            if row_count == 0:
                return
            cols.append(self._encode_column(k, v))
        self._categories.append({'name': category,
                                 'columns': cols, 'rowCount': row_count})

    def flush(self):
        data = {'version': ihm.__version__,
                'encoder': 'python-ihm library',
                'dataBlocks': self._blocks}
        self._write_msgpack(data)

    def _write_msgpack(self, data):
        """Read the msgpack data from the file and return data blocks"""
        import msgpack
        msgpack.pack(data, self.fh, use_bin_type=True)