1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257
|
from __future__ import absolute_import, print_function, division
import abc
import logging
import sys
import time
from petl.compat import PY3
from petl.util.base import Table
from petl.util.statistics import onlinestats
def progress(table, batchsize=1000, prefix="", out=None):
"""
Report progress on rows passing through to a file or file-like object
(defaults to sys.stderr). E.g.::
>>> import petl as etl
>>> table = etl.dummytable(100000)
>>> table.progress(10000).tocsv('example.csv') # doctest: +SKIP
10000 rows in 0.13s (78363 row/s); batch in 0.13s (78363 row/s)
20000 rows in 0.22s (91679 row/s); batch in 0.09s (110448 row/s)
30000 rows in 0.31s (96573 row/s); batch in 0.09s (108114 row/s)
40000 rows in 0.40s (99535 row/s); batch in 0.09s (109625 row/s)
50000 rows in 0.49s (101396 row/s); batch in 0.09s (109591 row/s)
60000 rows in 0.59s (102245 row/s); batch in 0.09s (106709 row/s)
70000 rows in 0.68s (103221 row/s); batch in 0.09s (109498 row/s)
80000 rows in 0.77s (103810 row/s); batch in 0.09s (108126 row/s)
90000 rows in 0.90s (99465 row/s); batch in 0.13s (74516 row/s)
100000 rows in 1.02s (98409 row/s); batch in 0.11s (89821 row/s)
100000 rows in 1.02s (98402 row/s); batches in 0.10 +/- 0.02s [0.09-0.13] (100481 +/- 13340 rows/s [74516-110448])
See also :func:`petl.util.timing.clock`.
"""
return ProgressView(table, batchsize, prefix, out)
def log_progress(table, batchsize=1000, prefix="", logger=None, level=logging.INFO):
"""
Report progress on rows passing through to a python logger. If logger is
none, a new logger will be created that, by default, streams to stdout. E.g.::
>>> import petl as etl
>>> table = etl.dummytable(100000)
>>> table.log_progress(10000).tocsv('example.csv') # doctest: +SKIP
10000 rows in 0.13s (78363 row/s); batch in 0.13s (78363 row/s)
20000 rows in 0.22s (91679 row/s); batch in 0.09s (110448 row/s)
30000 rows in 0.31s (96573 row/s); batch in 0.09s (108114 row/s)
40000 rows in 0.40s (99535 row/s); batch in 0.09s (109625 row/s)
50000 rows in 0.49s (101396 row/s); batch in 0.09s (109591 row/s)
60000 rows in 0.59s (102245 row/s); batch in 0.09s (106709 row/s)
70000 rows in 0.68s (103221 row/s); batch in 0.09s (109498 row/s)
80000 rows in 0.77s (103810 row/s); batch in 0.09s (108126 row/s)
90000 rows in 0.90s (99465 row/s); batch in 0.13s (74516 row/s)
100000 rows in 1.02s (98409 row/s); batch in 0.11s (89821 row/s)
100000 rows in 1.02s (98402 row/s); batches in 0.10 +/- 0.02s [0.09-0.13] (100481 +/- 13340 rows/s [74516-110448])
See also :func:`petl.util.timing.clock`.
"""
return LoggingProgressView(table, batchsize, prefix, logger, level=level)
Table.progress = progress
Table.log_progress = log_progress
class ProgressViewBase(Table):
"""
Abstract base class for reporting on proecessing status
"""
def __init__(self, inner, batchsize, prefix):
self.inner = inner
self.batchsize = batchsize
self.prefix = prefix
@abc.abstractmethod
def print_message(self, message):
pass
def __iter__(self):
start = time.time()
batchstart = start
batchn = 0
batchtimemin, batchtimemax = None, None
batchtimemean, batchtimevar = 0, 0
batchratemean, batchratevar = 0, 0
for n, r in enumerate(self.inner):
if n % self.batchsize == 0 and n > 0:
batchn += 1
batchend = time.time()
batchtime = batchend - batchstart
if batchtimemin is None or batchtime < batchtimemin:
batchtimemin = batchtime
if batchtimemax is None or batchtime > batchtimemax:
batchtimemax = batchtime
elapsedtime = batchend - start
try:
rate = int(n / elapsedtime)
except ZeroDivisionError:
rate = 0
try:
batchrate = int(self.batchsize / batchtime)
except ZeroDivisionError:
batchrate = 0
v = (n, elapsedtime, rate, batchtime, batchrate)
message = self.prefix + \
'%s rows in %.2fs (%s row/s); ' \
'batch in %.2fs (%s row/s)' % v
self.print_message(message)
batchstart = batchend
batchtimemean, batchtimevar = \
onlinestats(batchtime, batchn, mean=batchtimemean,
variance=batchtimevar)
batchratemean, batchratevar = \
onlinestats(batchrate, batchn, mean=batchratemean,
variance=batchratevar)
yield r
# compute total elapsed time and rate
end = time.time()
elapsedtime = end - start
try:
rate = int(n / elapsedtime)
except ZeroDivisionError:
rate = 0
# construct the final message
if batchn > 1:
if batchtimemin is None:
batchtimemin = 0
if batchtimemax is None:
batchtimemax = 0
try:
batchratemin = int(self.batchsize / batchtimemax)
except ZeroDivisionError:
batchratemin = 0
try:
batchratemax = int(self.batchsize / batchtimemin)
except ZeroDivisionError:
batchratemax = 0
v = (n, elapsedtime, rate, batchtimemean, batchtimevar**.5,
batchtimemin, batchtimemax, int(batchratemean),
int(batchratevar**.5), int(batchratemin), int(batchratemax))
message = self.prefix + '%s rows in %.2fs (%s row/s); batches in ' \
'%.2f +/- %.2fs [%.2f-%.2f] ' \
'(%s +/- %s rows/s [%s-%s])' % v
else:
v = (n, elapsedtime, rate)
message = self.prefix + '%s rows in %.2fs (%s row/s)' % v
self.print_message(message)
class ProgressView(ProgressViewBase):
"""
Reports progress to a file_object like sys.stdout or a file handler
"""
def __init__(self, inner, batchsize, prefix, out):
if out is None:
self.file_object = sys.stderr
else:
self.file_object = out
super(ProgressView, self).__init__(inner, batchsize, prefix)
def print_message(self, message):
print(message, file=self.file_object)
if hasattr(self.file_object, 'flush'):
self.file_object.flush()
class LoggingProgressView(ProgressViewBase):
"""
Reports progress to a logger, log handler, or log adapter
"""
def __init__(self, inner, batchsize, prefix, logger, level=logging.INFO):
if logger is None:
self.logger = logging.getLogger(__name__)
self.logger.setLevel(level)
else:
self.logger = logger
self.level = level
super(LoggingProgressView, self).__init__(inner, batchsize, prefix)
def print_message(self, message):
self.logger.log(self.level, message)
def clock(table):
"""
Time how long is spent retrieving rows from the wrapped container. Enables
diagnosis of which steps in a pipeline are taking the most time. E.g.::
>>> import petl as etl
>>> t1 = etl.dummytable(100000)
>>> c1 = etl.clock(t1)
>>> t2 = etl.convert(c1, 'foo', lambda v: v**2)
>>> c2 = etl.clock(t2)
>>> p = etl.progress(c2, 10000)
>>> etl.tocsv(p, 'example.csv') # doctest: +SKIP
10000 rows in 0.23s (44036 row/s); batch in 0.23s (44036 row/s)
20000 rows in 0.38s (52167 row/s); batch in 0.16s (63979 row/s)
30000 rows in 0.54s (55749 row/s); batch in 0.15s (64624 row/s)
40000 rows in 0.69s (57765 row/s); batch in 0.15s (64793 row/s)
50000 rows in 0.85s (59031 row/s); batch in 0.15s (64707 row/s)
60000 rows in 1.00s (59927 row/s); batch in 0.15s (64847 row/s)
70000 rows in 1.16s (60483 row/s); batch in 0.16s (64051 row/s)
80000 rows in 1.31s (61008 row/s); batch in 0.15s (64953 row/s)
90000 rows in 1.47s (61356 row/s); batch in 0.16s (64285 row/s)
100000 rows in 1.62s (61703 row/s); batch in 0.15s (65012 row/s)
100000 rows in 1.62s (61700 row/s); batches in 0.16 +/- 0.02s [0.15-0.23] (62528 +/- 6173 rows/s [44036-65012])
>>> # time consumed retrieving rows from t1
... c1.time # doctest: +SKIP
0.7243089999999492
>>> # time consumed retrieving rows from t2
... c2.time # doctest: +SKIP
1.1704209999999766
>>> # actual time consumed by the convert step
... c2.time - c1.time # doctest: +SKIP
0.4461120000000274
See also :func:`petl.util.timing.progress`.
"""
return ClockView(table)
Table.clock = clock
class ClockView(Table):
def __init__(self, wrapped):
self.wrapped = wrapped
def __iter__(self):
self.time = 0
it = iter(self.wrapped)
while True:
before = time.perf_counter() if PY3 else time.clock()
try:
row = next(it)
except StopIteration:
return
after = time.perf_counter() if PY3 else time.clock()
self.time += (after - before)
yield row
|