File: tests_operations.py

package info (click to toggle)
rows 0.5.0~dev0~1~1d5a326-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 2,340 kB
  • sloc: python: 12,672; sh: 117; makefile: 67
file content (99 lines) | stat: -rw-r--r-- 4,065 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# coding: utf-8

# Copyright 2014-2025 Álvaro Justen <https://github.com/turicas/rows/>
#    This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
#    Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
#    any later version.
#    This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
#    warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
#    more details.
#    You should have received a copy of the GNU Lesser General Public License along with this program.  If not, see
#    <http://www.gnu.org/licenses/>.

from __future__ import unicode_literals

import datetime
import unittest
from collections import OrderedDict

import rows
import rows.operations
import tests.utils as utils


class OperationsTestCase(utils.RowsTestMixIn, unittest.TestCase):
    def test_join_imports(self):
        assert rows.join is rows.operations.join

    def test_join_feature(self):
        tables = [
            rows.import_from_csv("tests/data/to-merge-1.csv"),
            rows.import_from_csv("tests/data/to-merge-2.csv"),
            rows.import_from_csv("tests/data/to-merge-3.csv"),
        ]
        merged = rows.join(keys=("id", "username"), tables=tables)
        expected = rows.import_from_csv("tests/data/merged.csv")
        self.assert_table_equal(merged, expected)

    def test_transform_imports(self):
        assert rows.transform is rows.operations.transform

    def test_transform_feature(self):
        def transformation_function(row, table):
            if row.percent_column is None or row.percent_column < 0.1269:
                return None  # discard this row

            new = row._asdict()
            new["meta"] = ", ".join(
                ["{} => {}".format(key, value) for key, value in table._meta.items()]
            )
            return new

        fields = utils.table.fields.copy()
        fields.update({"meta": rows.fields.TextField})
        tables = [utils.table] * 3
        result = rows.transform(fields, transformation_function, *tables)
        assert result.fields == fields
        not_discarded = [
            transformation_function(row, utils.table) for row in utils.table
        ] * 3
        not_discarded = [row for row in not_discarded if row is not None]
        assert len(result) == len(not_discarded)

        for expected_row, row in zip(not_discarded, result):
            assert expected_row == dict(row._asdict())

    def test_transpose_imports(self):
        assert rows.transpose is rows.operations.transpose

    def test_transpose_feature(self):
        new_fields = OrderedDict(
            [
                ("key", rows.fields.TextField),
                ("value_1", rows.fields.TextField),
                ("value_2", rows.fields.TextField),
            ]
        )
        table = rows.Table(fields=new_fields)
        table.append(
            {"key": "first_key", "value_1": "first_value_1", "value_2": "first_value_2"}
        )
        table.append({"key": "second_key", "value_1": 1, "value_2": 2})
        table.append({"key": "third_key", "value_1": 3.14, "value_2": 2.71})
        table.append(
            {"key": "fourth_key", "value_1": "2015-09-04", "value_2": "2015-08-29"}
        )

        new_table = rows.transpose(table, fields_column="key")

        assert len(new_table) == 2
        assert len(new_table.fields) == len(table)
        assert new_table.field_names == [row.key for row in table]
        assert new_table[0].first_key == "first_value_1"
        assert new_table[0].second_key == 1
        assert new_table[0].third_key == 3.14
        assert new_table[0].fourth_key == datetime.date(2015, 9, 4)
        assert new_table[1].first_key == "first_value_2"
        assert new_table[1].second_key == 2
        assert new_table[1].third_key == 2.71
        assert new_table[1].fourth_key == datetime.date(2015, 8, 29)