File: progressbar.py

package info (click to toggle)
python-mne 0.19.1%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 100,440 kB
  • sloc: python: 120,243; pascal: 1,861; makefile: 225; sh: 15
file content (257 lines) | stat: -rw-r--r-- 9,354 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# -*- coding: utf-8 -*-
"""Some utility functions."""
# Authors: Alexandre Gramfort <alexandre.gramfort@inria.fr>
#
# License: BSD (3-clause)

from collections.abc import Iterable
import time
import logging
import tempfile
import sys
import os.path as op
import os
import shutil

import numpy as np

from ._logging import logger
from .misc import sizeof_fmt


class ProgressBar(object):
    """Generate a command-line progressbar.

    Parameters
    ----------
    max_value : int | iterable
        Maximum value of process (e.g. number of samples to process, bytes to
        download, etc.). If an iterable is given, then `max_value` will be set
        to the length of this iterable.
    initial_value : int
        Initial value of process, useful when resuming process from a specific
        value, defaults to 0.
    mesg : str
        Message to include at end of progress bar.
    max_chars : int | str
        Number of characters to use for progress bar itself.
        This does not include characters used for the message or percent
        complete. Can be "auto" (default) to try to set a sane value based
        on the terminal width.
    progress_character : char
        Character in the progress bar that indicates the portion completed.
    spinner : bool
        Show a spinner.  Useful for long-running processes that may not
        increment the progress bar very often.  This provides the user with
        feedback that the progress has not stalled.
    max_total_width : int | str
        Maximum total message width. Can use "auto" (default) to try to set
        a sane value based on the current terminal width.
    verbose_bool : bool | 'auto'
        If True, show progress. 'auto' will use the current MNE verbose level.

    Example
    -------
    >>> progress = ProgressBar(13000)
    >>> progress.update(3000) # doctest: +SKIP
    [.........                               ] 23.07692 |
    >>> progress.update(6000) # doctest: +SKIP
    [..................                      ] 46.15385 |

    >>> progress = ProgressBar(13000, spinner=True)
    >>> progress.update(3000) # doctest: +SKIP
    [.........                               ] 23.07692 |
    >>> progress.update(6000) # doctest: +SKIP
    [..................                      ] 46.15385 /
    """

    spinner_symbols = ['|', '/', '-', '\\']
    template = '\r[{0}{1}] {2} {3} {4}'

    def __init__(self, max_value, initial_value=0, mesg='', max_chars='auto',
                 progress_character='.', spinner=False,
                 max_total_width='auto', verbose_bool=True):  # noqa: D102
        self.cur_value = initial_value
        if isinstance(max_value, Iterable):
            self.max_value = len(max_value)
            self.iterable = max_value
        else:
            self.max_value = max_value
            self.iterable = None
        self.mesg = mesg
        self.progress_character = progress_character
        self.spinner = spinner
        self.spinner_index = 0
        self.n_spinner = len(self.spinner_symbols)
        if verbose_bool == 'auto':
            verbose_bool = True if logger.level <= logging.INFO else False
        self._do_print = verbose_bool
        self.cur_time = time.time()
        if max_total_width == 'auto':
            max_total_width = _get_terminal_width()
        self.max_total_width = int(max_total_width)
        if max_chars == 'auto':
            max_chars = min(max(max_total_width - 40, 10), 60)
        self.max_chars = int(max_chars)
        self.cur_rate = 0
        with tempfile.NamedTemporaryFile('wb', prefix='tmp_mne_prog') as tf:
            self._mmap_fname = tf.name
        del tf  # should remove the file
        self._mmap = None

    def update(self, cur_value, mesg=None):
        """Update progressbar with current value of process.

        Parameters
        ----------
        cur_value : number
            Current value of process.  Should be <= max_value (but this is not
            enforced).  The percent of the progressbar will be computed as
            (cur_value / max_value) * 100
        mesg : str
            Message to display to the right of the progressbar.  If None, the
            last message provided will be used.  To clear the current message,
            pass a null string, ''.
        """
        cur_time = time.time()
        cur_dt = max(float(cur_time - self.cur_time), 1e-6)
        cur_rate = (cur_value - self.cur_value) / cur_dt
        # Smooth the estimate a bit
        eps = min(cur_dt, 1.)
        cur_rate = eps * cur_rate + (1 - eps) * self.cur_rate
        # Ensure floating-point division so we can get fractions of a percent
        # for the progressbar.
        self.cur_time = cur_time
        self.cur_value = cur_value
        self.cur_rate = cur_rate
        max_value = float(self.max_value) if self.max_value else 1.
        progress = np.clip(self.cur_value / max_value, 0, 1)
        num_chars = int(progress * self.max_chars)
        num_left = self.max_chars - num_chars

        # Update the message
        if mesg is not None:
            if mesg == 'file_sizes':
                mesg = '(%s, %s/s)' % (
                    sizeof_fmt(self.cur_value).rjust(8),
                    sizeof_fmt(cur_rate).rjust(8))
            self.mesg = mesg
        del cur_rate

        # The \r tells the cursor to return to the beginning of the line rather
        # than starting a new line.  This allows us to have a progressbar-style
        # display in the console window.
        progress = '%6.02f%%' % (progress * 100,)
        progress = progress if self.cur_value <= max_value else 'unknown'
        bar = self.template.format(
            self.progress_character * num_chars, ' ' * num_left,
            progress, self.mesg, self.spinner_symbols[self.spinner_index])
        bar = bar[:self.max_total_width]
        # Force a flush because sometimes when using bash scripts and pipes,
        # the output is not printed until after the program exits.
        if self._do_print:
            sys.stdout.write(bar)
            sys.stdout.flush()
        # Increment the spinner
        if self.spinner:
            self.spinner_index = (self.spinner_index + 1) % self.n_spinner

    def update_with_increment_value(self, increment_value, mesg=None):
        """Update progressbar with an increment.

        Parameters
        ----------
        increment_value : int
            Value of the increment of process.  The percent of the progressbar
            will be computed as
            (self.cur_value + increment_value / max_value) * 100
        mesg : str
            Message to display to the right of the progressbar.  If None, the
            last message provided will be used.  To clear the current message,
            pass a null string, ''.
        """
        self.update(self.cur_value + increment_value, mesg)

    def __iter__(self):
        """Iterate to auto-increment the pbar with 1."""
        if self.iterable is None:
            raise ValueError("Must give an iterable to be used in a loop.")
        self.update(self.cur_value)
        for obj in self.iterable:
            yield obj
            self.update_with_increment_value(1)

    def __call__(self, seq):
        """Call the ProgressBar in a joblib-friendly way."""
        while True:
            try:
                yield next(seq)
            except StopIteration:
                return
            else:
                self.update_with_increment_value(1)

    def subset(self, idx):
        """Make a joblib-friendly index subset updater.

        Parameters
        ----------
        idx : ndarray
            List of indices for this subset.

        Returns
        -------
        updater : instance of PBSubsetUpdater
            Class with a ``.update(ii)`` method.
        """
        return _PBSubsetUpdater(self, idx)

    def __setitem__(self, idx, val):
        """Use alternative, mmap-based incrementing (max_value must be int)."""
        if not self._do_print:
            return
        assert val is True
        self._mmap[idx] = True
        self.update(self._mmap.sum())

    def __enter__(self):  # noqa: D105
        if op.isfile(self._mmap_fname):
            os.remove(self._mmap_fname)
        # prevent corner cases where self.max_value == 0
        self._mmap = np.memmap(self._mmap_fname, bool, 'w+',
                               shape=max(self.max_value, 1))
        self.update(0)  # must be zero as we just created the memmap
        return self

    def __exit__(self, type, value, traceback):  # noqa: D105
        """Clean up memmapped file."""
        # we can't put this in __del__ b/c then each worker will delete the
        # file, which is not so good
        self._mmap = None
        if op.isfile(self._mmap_fname):
            os.remove(self._mmap_fname)
        self.done()

    def done(self):
        """Print a newline."""
        if self._do_print:
            sys.stdout.write('\n')
            sys.stdout.flush()


class _PBSubsetUpdater(object):

    def __init__(self, pb, idx):
        self.pb = pb
        self.idx = idx

    def update(self, ii):
        self.pb[self.idx[:ii]] = True


def _get_terminal_width():
    """Get the terminal width."""
    if sys.version[0] == '2':
        return 80
    else:
        return shutil.get_terminal_size((80, 20)).columns