1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
|
import io
import os
import struct
from tempfile import NamedTemporaryFile
import numpy
import pandas as pd
import pytest
from conftest import odc_modules
from pyodc import codec
from pyodc.constants import DataType
from pyodc.stream import LittleEndianStream
def _check_decode(cdc, encoded, check):
st = LittleEndianStream(encoded)
v = cdc.decode(st)
assert v == check
def test_null_terminated_constant_string():
"""
This tests the (somewhat dubious) 'missing' values in some (older) data
encoded from ODB-1 using the migrator. This data uses the integer missing value,
casted to a double, that happens to start with \x00 --> "NULL STRING"
We need to support decoding this data...
"""
constant_value = struct.unpack("<d", b"\x00\x00\xc0\xff\xff\xff\xdfA")[0]
cdc = codec.ConstantString("column", constant_value, constant_value, DataType.STRING, has_missing=False)
encoded = b""
_check_decode(cdc, encoded, "")
def test_stripped_constant_string():
constant_value = struct.unpack("<d", b"hello\x00\x00\x00")[0]
cdc = codec.ConstantString("column", constant_value, constant_value, DataType.STRING, has_missing=False)
encoded = b""
_check_decode(cdc, encoded, "hello")
def test_normal_constant_string():
constant_value = struct.unpack("<d", b"helloAAA")[0]
cdc = codec.ConstantString("column", constant_value, constant_value, DataType.STRING, has_missing=False)
encoded = b""
_check_decode(cdc, encoded, "helloAAA")
def check_codec_choice(testdata, expected_codec):
# Check that the correct codec is being selected
series = pd.Series(testdata)
selected_codec = codec.select_codec("column", series, DataType.STRING, False)
assert isinstance(selected_codec, expected_codec)
# Create a temporary stream
f = io.BytesIO()
st = LittleEndianStream(f)
# Encode the header and data for just this column
selected_codec.encode_header(st)
for val in testdata:
selected_codec.encode(st, val)
st.seek(0) # reset the stream to the start
# Check the header can be decoded correctly
decoded_codec = codec.read_codec(st)
assert decoded_codec.column_name == "column"
assert decoded_codec.type == DataType.STRING
assert decoded_codec.name == selected_codec.name
# Check the encoded data matches
for val in testdata:
decoded_val = selected_codec.decode(st)
assert val == decoded_val
def test_string_codec_selection():
# Deliberately using strings on length 7,8,9 to catch edges cases
testcases = [
[["constan", "constan"], codec.ConstantString],
[["constant", "constant"], codec.ConstantString],
[["longconst", "longconst"], codec.Int8String],
[["longconstant", "longconstant"], codec.Int8String],
[["not", "constant", "longnotconstant"], codec.Int8String],
[["longconstant"] + [str(num) for num in range(256)], codec.Int16String],
]
for testdata, expected_codec in testcases:
check_codec_choice(testdata, expected_codec)
os.environ["ODC_ENABLE_WRITING_LONG_STRING_CODEC"] = "true"
for testdata, expected_codec in testcases[2:3]:
check_codec_choice(testdata, codec.LongConstantString)
del os.environ["ODC_ENABLE_WRITING_LONG_STRING_CODEC"]
@pytest.mark.parametrize("odyssey", odc_modules)
def test_decode_odb1_missing_strings(odyssey):
"""
Tests that we can decode missing (NULL) strings from data encoded
from ODB-1 using the migrator. This data uses the integer missing value,
casted to a double, that happens to start with \x00.
The test sample contains valid data pre-encoded (which cannot be encoded
through the python API, as we (correctly) encode the missing value as a
null string).
"""
with open(os.path.join(os.path.dirname(__file__), "data/odb1_missing_string.odb"), "rb") as f:
df = odyssey.read_odb(f, single=True)
assert df.shape == (4, 1)
series = df["col1"]
assert series.dtype == "object"
for v in series:
assert isinstance(v, str)
assert v == ""
# Each case is a single column and the expected codec
testcases = [
[["abcd"] * 7, codec.ConstantString],
[["abcdefghi"] * 7, codec.Int8String],
[["aoeu", "aoeu", "aaaaaaaooooooo", "None", "boo", "squiggle", "a"], codec.Int8String],
[["longconstant"] + [str(num) for num in range(256)], codec.Int16String],
]
@pytest.mark.parametrize("encoder", odc_modules)
@pytest.mark.parametrize("decoder", odc_modules)
@pytest.mark.parametrize("testcase", testcases)
@pytest.mark.parametrize("type", ["string", "object"])
def test_new_strings(encoder, decoder, testcase, type):
"""
Tests that the new dedicated pandas string type "string" and
the older way of storing strings as objects both work
"""
testcase, codec = testcase
df = pd.DataFrame(testcase, dtype=type)
with NamedTemporaryFile() as fencode:
encoder.encode_odb(df, fencode.name)
round_tripped_data = decoder.read_odb(fencode.name, single=True)
# Check the data round tripped
numpy.testing.assert_array_equal(df.iloc[0].values, round_tripped_data.iloc[0].values)
|