File: cbf2eiger.py

package info (click to toggle)
python-fabio 2024.9.0-8
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 2,844 kB
  • sloc: python: 21,160; ansic: 1,126; lisp: 450; makefile: 253; sh: 244
file content (176 lines) | stat: -rwxr-xr-x 6,635 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#########################################
# Command line converter a bunch of files from CBF to eiger
# European Synchrotron Radiation Facility
#
#########################################
"""Converter a bunch of files from any format to an eiger-data
"""

__author__ = "Jérôme Kieffer"
__copyright__ = "2016 ESRF"
__date__ = "06/04/2020"
__licence__ = "MIT"

import logging
logging.basicConfig()

import os

import numpy
import fabio
from fabio import nexus

import argparse
from threading import Thread, Event

from queue import Queue

logger = logging.getLogger("to_eiger")


class Reader(Thread):
    """Reader with input and output queue 
    """

    def __init__(self, queue_in, queue_out, quit_event):
        """Constructor of the class
        
        :param queue_in: input queue with (index, filename to read) as input
        :param queue_out: output queue with (index, FabioImage) as output
        :param quit_event: the event which tells the thread to end 
        """
        Thread.__init__(self)
        self._queue_in = queue_in
        self._queue_out = queue_out
        self._quit_event = quit_event

    def run(self):
        while not self._quit_event.is_set():
            plop = self._queue_in.get()
            if plop is None:
                break
            idx, fname = plop
            try:
                fimg = fabio.open(fname)
            except Exception as err:
                logger.error(err)
                fimg = None
            self._queue_out.put((idx, fimg))
            self._queue_in.task_done()

    @classmethod
    def build_pool(cls, args, size=1):
        """Create a pool of worker of a given size. 
        
        :param worker: class of the worker (deriving  from Thread) 
        :param args: arguments to be passed to each of the worker
        :param size: size of the pool
        :return: a list of worker 
        """
        workers = []
        for _ in range(size):
            w = cls(*args)
            w.start()
            workers.append(w)
        return workers


def save_eiger(input_files, output_file, filter_=None, nbthreads=None):
    """Save a bunch of files in Eiger-like format
    
    :param input_files: list of input files
    :param output_file: name of the HDF5 file
    :param filter_: Type of compression filter: "gzip", "lz4" or "bitshuffle"
    :param nbthreads: number of parallel reader threads  
    """
    assert len(input_files), "Input file list is not empty"
    first_image = input_files[0]
    fimg = fabio.open(first_image)
    shape = fimg.data.shape
    stack_shape = (len(input_files),) + shape
    first_frame_timestamp = os.stat(first_image).st_ctime
    kwfilter = {}
    if filter_ == "gzip":
        kwfilter = {"compression": "gzip", "shuffle": True}
    elif filter_ == "lz4":
        kwfilter = {"compression": 32004, "shuffle": True}
    elif filter_ == "bitshuffle":
        kwfilter = {"compression": 32008, "compression_opts": (0, 2)}  # enforce lz4 compression

    if nbthreads:
        queue_in = Queue()
        queue_out = Queue()
        quit_event = Event()
        pool = Reader.build_pool((queue_in, queue_out, quit_event), nbthreads)
        for idx, fname in enumerate(input_files[1:]):
            queue_in.put((idx, fname))
    with nexus.Nexus(output_file) as nxs:
        entry = nxs.new_entry(entry='entry', program_name='fabio',
                              title='converted from single-frame files',
                              force_time=first_frame_timestamp,
                              force_name=True)
        data = nxs.new_class(grp=entry, name="data", class_type="NXdata")
        try:
            ds = data.require_dataset(name="data", shape=stack_shape,
                                      dtype=fimg.data.dtype,
                                      chunks=(1,) + shape,
                                      **kwfilter)
        except Exception as error:
            logger.error("Error in creating dataset, disabling compression:%s", error)
            ds = data.require_dataset(name="data", shape=stack_shape,
                                      dtype=fimg.data.dtype,
                                      chunks=(1,) + shape)

        ds[0] = fimg.data
        data["sources"] = [numpy.string_(i) for i in input_files]
        if nbthreads:
            for _ in range(len(input_files) - 1):
                idx, fimg = queue_out.get()
                if fimg.data is None:
                    logger.error("Failed reading file: %s", input_files[idx + 1])
                    continue
                ds[idx + 1] = fimg.data
                queue_out.task_done()

            queue_in.join()
            queue_out.join()
        else:  # don't use the pool of readers
            for idx, fname in enumerate(input_files[1:]):
                ds[idx + 1] = fabio.open(fname).data

    if nbthreads:  # clean up
        quit_event.set()
        for _ in pool:
            queue_in.put(None)


if __name__ == "__main__":
    epilog = "plop"
    parser = argparse.ArgumentParser(prog="cbf2eiger",
                                     description=__doc__,
                                     epilog=epilog)
    parser.add_argument("IMAGE", nargs="*",
                        help="Input file images")
    parser.add_argument("-V", "--version", action='version', version=fabio.version,
                        help="output version and exit")
    parser.add_argument("-v", "--verbose", action='store_true', dest="verbose", default=False,
                        help="show information for each conversions")
    parser.add_argument("--debug", action='store_true', dest="debug", default=False,
                        help="show debug information")

    group = parser.add_argument_group("main arguments")
    group.add_argument("-l", "--list", action="store_true", dest="list", default=None,
                       help="show the list of available formats and exit")
    group.add_argument("-o", "--output", dest='output', type=str,
                       help="output file or directory")
    group.add_argument("-f", "--filter", dest='filter', type=str, default=None,
                       help="Compression filter, may be lz4, bitshuffle or gzip")
    group.add_argument("-n", "--nbthreads", dest='nbthreads', type=int, default=None,
                       help="Numbre of reader threads in parallel")

    opts = parser.parse_args()
    input_files = [os.path.abspath(i) for i in opts.IMAGE if os.path.exists(i)]
    input_files.sort()
    save_eiger(input_files, opts.output, filter_=opts.filter, nbthreads=opts.nbthreads)