File: widetree2.py

package info (click to toggle)
pytables 3.10.2-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 15,228 kB
  • sloc: ansic: 82,212; python: 65,296; cpp: 753; sh: 394; makefile: 100
file content (122 lines) | stat: -rw-r--r-- 3,684 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import unittest

import tables as tb

verbose = 0


class Test(tb.IsDescription):
    ngroup = tb.Int32Col(pos=1)
    ntable = tb.Int32Col(pos=2)
    nrow = tb.Int32Col(pos=3)
    # string = StringCol(itemsize=500, pos=4)


class WideTreeTestCase(unittest.TestCase):

    def test00_leafs(self):

        # Open a new empty HDF5 file
        filename = "test_widetree.h5"
        ngroups = 10
        ntables = 300
        nrows = 10
        complevel = 0
        complib = "lzo"

        print("Writing...")
        # Open a file in "w"rite mode
        fileh = tb.open_file(filename, mode="w", title="PyTables Stress Test")

        for k in range(ngroups):
            # Create the group
            group = fileh.create_group("/", "group%04d" % k, "Group %d" % k)

        fileh.close()

        # Now, create the tables
        rowswritten = 0
        for k in range(ngroups):
            print("Filling tables in group:", k)
            fileh = tb.open_file(filename, mode="a", root_uep="group%04d" % k)
            # Get the group
            group = fileh.root
            for j in range(ntables):
                # Create a table
                table = fileh.create_table(
                    group,
                    "table%04d" % j,
                    Test,
                    "Table%04d" % j,
                    tb.Filters(complevel, complib),
                    nrows,
                )
                # Get the row object associated with the new table
                row = table.row
                # Fill the table
                for i in range(nrows):
                    row["ngroup"] = k
                    row["ntable"] = j
                    row["nrow"] = i
                    row.append()

                rowswritten += nrows
                table.flush()

            # Close the file
            fileh.close()

        # read the file
        print("Reading...")
        rowsread = 0
        for ngroup in range(ngroups):
            fileh = tb.open_file(
                filename, mode="r", root_uep="group%04d" % ngroup
            )
            # Get the group
            group = fileh.root
            ntable = 0
            if verbose:
                print("Group ==>", group)
            for table in fileh.list_nodes(group, "Table"):
                if verbose > 1:
                    print("Table ==>", table)
                    print("Max rows in buf:", table.nrowsinbuf)
                    print("Rows in", table._v_pathname, ":", table.nrows)
                    print("Buffersize:", table.rowsize * table.nrowsinbuf)
                    print("MaxTuples:", table.nrowsinbuf)

                nrow = 0
                for row in table:
                    try:
                        assert row["ngroup"] == ngroup
                        assert row["ntable"] == ntable
                        assert row["nrow"] == nrow
                    except Exception:
                        print(
                            "Error in group: %d, table: %d, row: %d"
                            % (ngroup, ntable, nrow)
                        )
                        print("Record ==>", row)
                    nrow += 1

                assert nrow == table.nrows
                rowsread += table.nrows
                ntable += 1

            # Close the file (eventually destroy the extended type)
            fileh.close()


# ----------------------------------------------------------------------
def suite():
    suite_ = unittest.TestSuite()
    from tables.tests.common import make_suite

    suite_.addTest(make_suite(WideTreeTestCase))

    return suite_


if __name__ == "__main__":
    unittest.main(defaultTest="suite")