1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
|
# coding: utf-8
# Copyright 2014-2025 Álvaro Justen <https://github.com/turicas/rows/>
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General
# Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option)
# any later version.
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
# more details.
# You should have received a copy of the GNU Lesser General Public License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
from __future__ import unicode_literals
import datetime
import unittest
from collections import OrderedDict
import rows
import rows.operations
import tests.utils as utils
class OperationsTestCase(utils.RowsTestMixIn, unittest.TestCase):
def test_join_imports(self):
assert rows.join is rows.operations.join
def test_join_feature(self):
tables = [
rows.import_from_csv("tests/data/to-merge-1.csv"),
rows.import_from_csv("tests/data/to-merge-2.csv"),
rows.import_from_csv("tests/data/to-merge-3.csv"),
]
merged = rows.join(keys=("id", "username"), tables=tables)
expected = rows.import_from_csv("tests/data/merged.csv")
self.assert_table_equal(merged, expected)
def test_transform_imports(self):
assert rows.transform is rows.operations.transform
def test_transform_feature(self):
def transformation_function(row, table):
if row.percent_column is None or row.percent_column < 0.1269:
return None # discard this row
new = row._asdict()
new["meta"] = ", ".join(
["{} => {}".format(key, value) for key, value in table._meta.items()]
)
return new
fields = utils.table.fields.copy()
fields.update({"meta": rows.fields.TextField})
tables = [utils.table] * 3
result = rows.transform(fields, transformation_function, *tables)
assert result.fields == fields
not_discarded = [
transformation_function(row, utils.table) for row in utils.table
] * 3
not_discarded = [row for row in not_discarded if row is not None]
assert len(result) == len(not_discarded)
for expected_row, row in zip(not_discarded, result):
assert expected_row == dict(row._asdict())
def test_transpose_imports(self):
assert rows.transpose is rows.operations.transpose
def test_transpose_feature(self):
new_fields = OrderedDict(
[
("key", rows.fields.TextField),
("value_1", rows.fields.TextField),
("value_2", rows.fields.TextField),
]
)
table = rows.Table(fields=new_fields)
table.append(
{"key": "first_key", "value_1": "first_value_1", "value_2": "first_value_2"}
)
table.append({"key": "second_key", "value_1": 1, "value_2": 2})
table.append({"key": "third_key", "value_1": 3.14, "value_2": 2.71})
table.append(
{"key": "fourth_key", "value_1": "2015-09-04", "value_2": "2015-08-29"}
)
new_table = rows.transpose(table, fields_column="key")
assert len(new_table) == 2
assert len(new_table.fields) == len(table)
assert new_table.field_names == [row.key for row in table]
assert new_table[0].first_key == "first_value_1"
assert new_table[0].second_key == 1
assert new_table[0].third_key == 3.14
assert new_table[0].fourth_key == datetime.date(2015, 9, 4)
assert new_table[1].first_key == "first_value_2"
assert new_table[1].second_key == 2
assert new_table[1].third_key == 2.71
assert new_table[1].fourth_key == datetime.date(2015, 8, 29)
|