1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451
|
## @package pipeline
# Module caffe2.python.pipeline
from caffe2.python import core, queue_util
from caffe2.python.dataio import Reader, Writer
from caffe2.python.net_builder import NetBuilder, ops
from caffe2.python.schema import as_record, Field
from caffe2.python.task import Node, Task, TaskGroup
class Output(object):
"""
Represents the result of a processor function. A processor can either
return an Output, or it can return a record, in which case an Output will be
created for it afterwards.
"""
def __init__(self, nets=None, record=None, should_stop=None):
builder_children = NetBuilder.current().get()
assert nets is None or len(builder_children) == 0, (
'Cannot both use `ops` syntax and return a list of nets.')
if nets is None:
nets = builder_children
if isinstance(nets, core.Net):
nets = [nets]
self.nets = [] if nets is None else list(nets)
self.record = None if record is None else as_record(record)
self.should_stop = should_stop
DEFAULT_QUEUE_CAPACITY = 100
def _init_output(output, capacity, global_init_net, global_exit_net):
if output is None:
out_queue = queue_util.Queue(
capacity=(
capacity if capacity is not None
else DEFAULT_QUEUE_CAPACITY))
writer = out_queue.writer()
elif isinstance(output, Writer):
assert capacity is None, 'capacity would not be used.'
out_queue = None
writer = output
elif hasattr(output, 'writer'):
assert capacity is None, 'capacity would not be used.'
out_queue = output
writer = output.writer()
else:
raise ValueError('output must be a reader, queue or stream.')
writer.setup_ex(global_init_net, global_exit_net)
return out_queue, writer
def make_processor(processor, reader=None):
if processor is None:
return lambda rec: rec
elif isinstance(processor, core.Net):
return NetProcessor(processor)
else:
if reader is not None and hasattr(processor, "schema_func"):
def processor_schema():
return processor.schema_func(reader)
processor.schema = processor_schema
return processor
def normalize_processor_output(output):
"""
Allow for processors to return results in several formats.
TODO(azzolini): simplify once all processors use NetBuilder API.
"""
if isinstance(output, Output):
""" Processor returned an Output. """
return output
elif isinstance(output, Field):
""" Processor returned a record. """
return Output(record=output)
elif isinstance(output, tuple):
is_record_and_blob = (
len(output) == 2 and
isinstance(output[0], Field) and
isinstance(output[1], core.BlobReference))
if is_record_and_blob:
""" Processor returned (record, stop_blob) """
return Output(None, *output)
else:
""" Processor returned (nets, record, stop_blob) """
return Output(*output)
else:
""" Processor returned nets, no output """
return Output(output)
def pipe(
input, output=None, num_threads=1, processor=None, name=None,
capacity=None, group=None, num_runtime_threads=1):
"""
Given a Reader, Queue or DataStream in `input`, and optionally, a Writer,
Queue or DataStream in `output`, creates a Task that, when run, will
pipe the input into the output, using multiple parallel threads.
Additionally, if a processor is given, it will be called between reading
and writing steps, allowing it to transform the record.
Args:
input: either a Reader, Queue or DataStream that will be read
until a stop is signaled either by the reader or the
writer.
output: either a Writer, a Queue or a DataStream that will be
written to as long as neither reader nor writer signal
a stop condition. If output is not provided or is None,
a Queue is created with given `capacity` and written to.
num_threads: number of concurrent threads used for processing and
piping. If set to 0, no Task is created, and a
reader is returned instead -- the reader returned will
read from the reader passed in and process it.
** DEPRECATED **. Use `num_runtime_threads` instead.
This option will be removed once all readers/processors
support `num_runtime_threads`.
processor: (optional) function that takes an input record and
optionally returns a record; this will be called
between read and write steps. If the processor does
not return a record, a writer will not be instantiated.
Processor can also be a core.Net with input and output
records properly set. In that case, a NetProcessor is
instantiated, cloning the net for each of the threads.
name: (optional) name of the task to be created.
capacity: when output is not passed, a queue of given `capacity`
is created and written to.
group: (optional) explicitly add the created Task to this
TaskGroup, instead of using the currently active one.
num_runtime_threads: Similar to `num_threads`, but instead of expanding
the tasks with a `for` loop in python, does that at
runtime. This is preferable to `num_threads`, but some
processors/readers still require to be called multiple
times in python.
Returns:
Output Queue, DataStream, Reader, or None, depending on the parameters
passed.
"""
result, _ = _pipe_step(
input, output, num_threads, processor, name, capacity, group,
num_runtime_threads)
return result
def pipe_and_output(
input, output=None, num_threads=1, processor=None, name=None,
capacity=None, group=None, num_runtime_threads=1, final_outputs=None):
"""
Similar to `pipe`, with the additional ability for the pipe Task to
return output values to the `Session` once done.
Returns:
Tuple (out_queue, *task_outputs)
out_queue: same as return value of `pipe`.
task_outputs: TaskOutput object, fetchable from the client after
session.run() returns.
"""
assert num_threads > 0
result, task = _pipe_step(
input, output, num_threads, processor, name, capacity, group,
num_runtime_threads, final_outputs)
output = None
if final_outputs is not None:
output = task.outputs()
if type(final_outputs) not in (list, tuple):
output = output[0]
return result, output
def processor_name(processor):
if hasattr(processor, 'name'):
return processor.name
if hasattr(processor, 'func_name'):
if processor.func_name == '<lambda>':
return processor.__module__
if hasattr(processor, 'im_class'):
return '%s.%s' % (processor.im_class.__name__, processor.func_name)
return processor.func_name
return processor.__class__.__name__
def _runtime_threads_task(name, group, final_outputs, reader, num_threads,
output, capacity):
node_name = str(Node.current())
profiler_name = "{0}/{1}/{2}/{3}/{4}".format(
node_name,
"pipe",
name,
processor_name(input) if input else "NoInput",
processor_name(output) if output else "NoOutput")
with Task(name=name, group=group, outputs=final_outputs,
num_instances=num_threads) as task:
global_exit_net = core.Net('pipe:exit')
global_init_net = core.Net('pipe:init')
reader.setup_ex(global_init_net, global_exit_net)
init_net = core.Net('pipe:instance:init')
exit_net = core.Net('pipe:instance:exit')
read_nets, status, rec = reader.read_record_ex(init_net, exit_net)
init_net.ConstantFill(
[], [status],
shape=[],
value=False,
dtype=core.DataType.BOOL
)
if rec is not None:
out_queue, writer = _init_output(
output, capacity, global_init_net, global_exit_net)
write_nets, _ = writer.write_record_ex(
rec, init_net, exit_net, status)
else:
out_queue = None
write_nets = []
with ops.task_init():
ops.net(global_init_net)
with ops.task_instance_init():
ops.net(init_net)
timer_start_net = core.Net('timer_start')
timer = timer_start_net.TimerBegin([], counter_name=profiler_name)
timer_end_net = core.Net('timer_end')
timer_end_net.TimerEnd(timer, [])
ops.net(core.execution_step(
'body',
[timer_start_net] + list(read_nets) + list(write_nets) +
[timer_end_net],
should_stop_blob=status))
ops.net(timer_end_net)
with ops.task_instance_exit():
ops.net(exit_net)
with ops.task_exit():
ops.net(global_exit_net)
return out_queue, task
def _static_threads_task(name, group, final_outputs, reader, num_threads,
output, capacity):
node_name = str(Node.current())
profiler_name = "{0}/{1}/{2}/{3}/{4}".format(
node_name,
"pipe",
name,
processor_name(input) if input else "NoInput",
processor_name(output) if output else "NoOutput")
with Task(name=name, group=group, outputs=final_outputs) as task:
global_exit_net = core.Net('exit')
global_init_net = core.Net('init')
reader.setup_ex(global_init_net, global_exit_net)
out_queue = None
writer = None
steps = []
for thread_id in range(num_threads):
with NetBuilder(name='t:%d' % thread_id) as nb:
init_net = core.Net('init')
exit_net = core.Net('exit')
read_nets, status, rec = reader.read_record_ex(
init_net, exit_net)
init_net.ConstantFill(
[], [status],
shape=[],
value=False,
dtype=core.DataType.BOOL
)
if rec is not None:
if writer is None:
# hack so that the out queue gets the right name prefix
# (otherwise they would be prefixed with the thread id)
with NetBuilder(_fullname=task.name):
out_queue, writer = _init_output(
output, capacity, global_init_net,
global_exit_net)
write_nets, _ = writer.write_record_ex(
rec, init_net, exit_net, status)
else:
write_nets = []
timer_start_net = core.Net('timer_start')
timer = timer_start_net.TimerBegin([], counter_name=profiler_name)
timer_end_net = core.Net('timer_end')
timer_end_net.TimerEnd(timer, [])
ops.net(init_net)
ops.net(core.execution_step(
'body',
[timer_start_net] + list(read_nets) + list(write_nets) +
[timer_end_net],
should_stop_blob=status))
ops.net(timer_end_net)
ops.net(exit_net)
steps.append(core.to_execution_step(nb))
ops.net(global_init_net)
ops.net(core.execution_step('body', steps, concurrent_substeps=True))
ops.net(global_exit_net)
return out_queue, task
def _pipe_step(
input, output=None, num_threads=1, processor=None, name=None,
capacity=None, group=None, num_runtime_threads=None, final_outputs=None):
"""
"""
assert num_threads <= 1 or num_runtime_threads <= 1, (
'Only one of num_threads or num_runtime_threads must be set.')
if isinstance(input, Reader):
reader = input
elif hasattr(input, 'reader'):
reader = input.reader()
else:
raise ValueError(
'Input must be a reader, queue or stream. Got {}'.format(type(input)))
if processor is not None:
reader = ProcessingReader(reader, processor)
if num_threads == 0 or num_runtime_threads == 0:
assert output is None
return reader, None
if name is None and processor is not None:
name = processor_name(processor)
if name is None and output is not None:
name = 'pipe_into:%s' % processor_name(output)
if name is None:
name = 'pipe_from:%s' % processor_name(input)
if num_threads > 1:
return _static_threads_task(
name, group, final_outputs, reader, num_threads, output, capacity)
else:
return _runtime_threads_task(
name, group, final_outputs, reader, num_runtime_threads, output,
capacity)
class ProcessingReader(Reader):
"""
Reader that reads from an upstream reader, calls the processor, and returns
the processed record.
"""
def __init__(self, reader, processor):
Reader.__init__(self)
self.reader = reader
self.processor = make_processor(processor, reader)
def schema(self):
return self.processor.schema()
def setup_ex(self, init_net, finish_net):
self.reader.setup_ex(init_net, finish_net)
def read_ex(self, init_net, exit_net):
read_nets, status, rec = self.reader.read_record_ex(init_net, exit_net)
# We don't use status as stop_blob of NetBuilder it's not guarantee that
# it would end up being the true stob_blob. For example,
# ReaderWithLimitBase doesn't pass the status through but rather copy
# from it.
with NetBuilder() as nb:
# Current NetBuilder is optionally used inside the processor,
# then its children are retrieved inside of
# normalize_processor_output.
# Once readers and writers also use NetBuilder,
# this logic will be more natural.
result = normalize_processor_output(self.processor(rec))
read_nets += result.nets
if result.should_stop or nb._stop_blob:
stop_net = core.Net('stop_net')
if result.should_stop:
stop_net.Or([status, result.should_stop], [status])
if nb._stop_blob:
stop_net.Or([status, nb._stop_blob], [status])
read_nets.append(stop_net)
if hasattr(self.processor, 'setup'):
init_net.add_attribute(TaskGroup.LOCAL_SETUP, self.processor)
self._set_schema(result.record)
fields = result.record.field_blobs() if result.record else None
return read_nets, status, fields
class NetProcessor(object):
"""
Processor that clones a core.Net each time it's called, executing
the cloned net as the processor. It requires the Net to have input
and (optionally) output records set, with net.set_input_record() and
net.set_output_record().
"""
def __init__(self, net, stop_signal=None, thread_init_nets=None, name=None):
assert isinstance(net, core.Net)
assert stop_signal is None or isinstance(
stop_signal, core.BlobReference)
self.name = name or str(net)
self.thread_init_nets = thread_init_nets or []
self.net = net
self._stop_signal = stop_signal
self._blob_maps = []
self._frozen = False
self._cloned_init_nets = []
def schema(self):
return self.net.output_record()
def setup(self, init_net):
self._frozen = True
cloned_init_nets = self._cloned_init_nets
self._cloned_init_nets = []
return cloned_init_nets
def __call__(self, rec):
assert not self._frozen
prefix = NetBuilder.current().name + '/'
blob_remap = {}
for net in self.thread_init_nets:
new_net, _ = core.clone_and_bind_net(
net, str(net) + prefix, prefix, blob_remap)
self._cloned_init_nets.append(new_net)
new_net, remappings = core.clone_and_bind_net(
self.net, str(self.net) + prefix, prefix, blob_remap, rec)
if self._stop_signal is None:
stop_signal = None
elif str(self._stop_signal) in remappings:
stop_signal = core.BlobReference(
remappings[str(self._stop_signal)],
net=new_net)
else:
stop_signal = self._stop_signal
self._blob_maps.append(remappings)
return Output([new_net], new_net.output_record(), stop_signal)
def blob_maps(self):
self._frozen = True
return self._blob_maps
|