File: batch.py

package info (click to toggle)
python-streamz 0.6.4-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,040 kB
  • sloc: python: 6,722; sh: 18; makefile: 16
file content (85 lines) | stat: -rw-r--r-- 2,384 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from .collection import Streaming, _stream_types
import toolz
import toolz.curried


class Batch(Streaming):
    """ A Stream of tuples or lists

    This streaming collection manages batches of Python objects such as lists
    of text or dictionaries.  By batching many elements together we reduce
    overhead from Python.

    This library is typically used at the early stages of data ingestion before
    handing off to streaming dataframes

    Examples
    --------
    >>> text = Streaming.from_file(myfile)  # doctest: +SKIP
    >>> b = text.partition(100).map(json.loads)  # doctest: +SKIP
    """
    def __init__(self, stream=None, example=None):
        if example is None:
            example = []
        super(Batch, self).__init__(stream=stream, example=example)

    def sum(self):
        """ Sum elements """
        return self.accumulate_partitions(_accumulate_sum, start=0)

    def filter(self, predicate):
        """ Filter elements by a predicate """
        return self.map_partitions(_filter, self, predicate)

    def pluck(self, ind):
        """ Pick a field out of all elements

        Example
        -------
        >>> s.pluck('name').sink(print)  # doctest: +SKIP
        >>> s.emit({'name': 'Alice', 'x': 123})  # doctest: +SKIP
        'Alice'
        """
        return self.map_partitions(_pluck, self, ind)

    def map(self, func, **kwargs):
        """ Map a function across all elements """
        return self.map_partitions(_map_map, self, func, **kwargs)

    def to_dataframe(self):
        """
        Convert to a streaming dataframe

        This calls ``pd.DataFrame`` on all list-elements of this stream
        """
        import pandas as pd
        import streamz.dataframe  # noqa: F401
        return self.map_partitions(pd.DataFrame, self)

    def to_stream(self):
        """ Concatenate batches and return base Stream

        Returned stream will be composed of single elements
        """
        return self.stream.flatten()


def _filter(seq, predicate):
    return list(filter(predicate, seq))


def _pluck(seq, ind):
    return list(toolz.pluck(ind, seq))


def _map_map(seq, func, **kwargs):
    return list(map(func, seq, **kwargs))


def _accumulate_sum(accumulator, new):
    return accumulator + sum(new)


map_type = type(map(lambda x: x, []))

_stream_types['streaming'].append(((list, tuple, set), Batch))