File: from_csv.py

package info (click to toggle)
python-agate 1.9.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,996 kB
  • sloc: python: 8,512; makefile: 126
file content (86 lines) | stat: -rw-r--r-- 2,829 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import itertools
from io import StringIO


@classmethod
def from_csv(cls, path, column_names=None, column_types=None, row_names=None, skip_lines=0, header=True, sniff_limit=0,
             encoding='utf-8', row_limit=None, **kwargs):
    """
    Create a new table from a CSV.

    This method uses agate's builtin CSV reader, which supplies encoding
    support for both Python 2 and Python 3.

    :code:`kwargs` will be passed through to the CSV reader.

    :param path:
        Filepath or file-like object from which to read CSV data. If a file-like
        object is specified, it must be seekable. If using Python 2, the file
        should be opened in binary mode (`rb`).
    :param column_names:
        See :meth:`.Table.__init__`.
    :param column_types:
        See :meth:`.Table.__init__`.
    :param row_names:
        See :meth:`.Table.__init__`.
    :param skip_lines:
        The number of lines to skip from the top of the file.
    :param header:
        If :code:`True`, the first row of the CSV is assumed to contain column
        names. If :code:`header` and :code:`column_names` are both specified
        then a row will be skipped, but :code:`column_names` will be used.
    :param sniff_limit:
        Limit CSV dialect sniffing to the specified number of bytes. Set to
        None to sniff the entire file. Defaults to 0 (no sniffing).
    :param encoding:
        Character encoding of the CSV file. Note: if passing in a file
        handle it is assumed you have already opened it with the correct
        encoding specified.
    :param row_limit:
        Limit how many rows of data will be read.
    """
    from agate import csv
    from agate.table import Table

    close = False

    try:
        if hasattr(path, 'read'):
            f = path
        else:
            f = open(path, encoding=encoding)

            close = True

        if isinstance(skip_lines, int):
            while skip_lines > 0:
                f.readline()
                skip_lines -= 1
        else:
            raise ValueError('skip_lines argument must be an int')

        contents = StringIO(f.read())

        if sniff_limit is None:
            kwargs['dialect'] = csv.Sniffer().sniff(contents.getvalue())
        elif sniff_limit > 0:
            kwargs['dialect'] = csv.Sniffer().sniff(contents.getvalue()[:sniff_limit])

        reader = csv.reader(contents, header=header, **kwargs)

        if header:
            if column_names is None:
                column_names = next(reader)
            else:
                next(reader)

        if row_limit is None:
            rows = tuple(reader)
        else:
            rows = tuple(itertools.islice(reader, row_limit))

    finally:
        if close:
            f.close()

    return Table(rows, column_names, column_types, row_names=row_names)