File: parquet_io.py

package info (click to toggle)
python-bioframe 0.4.1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 2,000 kB
  • sloc: python: 5,860; makefile: 38; sh: 13
file content (125 lines) | stat: -rw-r--r-- 3,957 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import pandas as pd


def to_parquet(
    pieces,
    outpath,
    row_group_size=None,
    compression="snappy",
    use_dictionary=True,
    version=2.0,
    **kwargs
):
    """
    Save an iterable of dataframe chunks to a single Apache Parquet file. For
    more info about Parquet, see https://arrow.apache.org/docs/python/parquet.html.

    Parameters
    ----------
    pieces : DataFrame or iterable of DataFrame
        Chunks to write
    outpath : str
        Path to output file
    row_group_size : int
        Number of rows per row group
    compression : {'snappy', 'gzip', 'brotli', 'none'}, optional
        Compression algorithm. Can be set on a per-column basis with a
        dictionary of column names to compression lib.
    use_dictionary : bool, optional
        Use dictionary encoding. Can be set on a per-column basis with a list
        of column names.

    See also
    --------
    pyarrow.parquet.write_table
    pyarrow.parquet.ParquetFile
    fastparquet

    """
    try:
        import pyarrow.parquet
        import pyarrow as pa
    except ImportError:
        raise ImportError("Saving to parquet requires the `pyarrow` package")

    if isinstance(pieces, pd.DataFrame):
        pieces = (pieces,)

    try:
        for i, piece in enumerate(pieces):
            table = pa.Table.from_pandas(piece, preserve_index=False)
            if i == 0:
                writer = pa.parquet.ParquetWriter(
                    outpath,
                    table.schema,
                    compression=compression,
                    use_dictionary=use_dictionary,
                    version=version,
                    **kwargs
                )
            writer.write_table(table, row_group_size=row_group_size)
    finally:
        writer.close()


def read_parquet(filepath, columns=None, iterator=False, **kwargs):
    """
    Load DataFrames from Parquet files, optionally in pieces.

    Parameters
    ----------
    filepath : str, pathlib.Path, pyarrow.NativeFile, or file-like object
        Readable source. For passing bytes or buffer-like file containing a
        Parquet file, use pyarorw.BufferReader
    columns: list
        If not None, only these columns will be read from the row groups. A
        column name may be a prefix of a nested field, e.g. 'a' will select
        'a.b', 'a.c', and 'a.d.e'
    iterator : boolean, default False
        Return an iterator object that yields row group DataFrames and
        provides the ParquetFile interface.
    use_threads : boolean, default True
        Perform multi-threaded column reads
    memory_map : boolean, default True
        If the source is a file path, use a memory map to read file, which can
        improve performance in some environments

    Returns
    -------
    DataFrame or ParquetFileIterator

    """
    use_threads = kwargs.pop("use_threads", True)

    if not iterator:
        return pd.read_parquet(
            filepath, columns=columns, use_threads=use_threads, **kwargs
        )
    else:
        try:
            from pyarrow.parquet import ParquetFile
        except ImportError:
            raise ImportError(
                "Iterating over Parquet data requires the `pyarrow` package."
            )

        class ParquetFileIterator(ParquetFile):
            def __iter__(self):
                return self

            def __next__(self):
                if not hasattr(self, "_rgid"):
                    self._rgid = 0
                if self._rgid < self.num_row_groups:
                    rg = self.read_row_group(
                        self._rgid,
                        columns=columns,
                        use_threads=use_threads,
                        use_pandas_metadata=True,
                    )
                    self._rgid += 1
                else:
                    raise StopIteration
                return rg.to_pandas()

        return ParquetFileIterator(filepath, **kwargs)