1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
|
from __future__ import annotations
import re
from collections.abc import Iterable
from datetime import datetime
import numpy as np
import pytest
import awkward as ak
from awkward.types.numpytype import _primitive_to_dtype_dict, primitive_to_dtype
def test_issue() -> None:
"""Assert the GitHub issue #2377 is resolved."""
empty = ak.Array([]) # <Array [] type='0 * unknown'>
# No exception should be raised.
result = ak.flatten(empty[empty])
# The result should be an empty awkward array of the same type.
expected_layout = ak.Array([]).layout
assert result.layout.is_equal_to(expected_layout, all_parameters=True)
def _add_necessary_unit(dtype_name: str) -> str:
"""Completes datetime or timedelta dtype names with a unit if missing."""
UNIT_LESS_DT_RE = re.compile(r"^(?:datetime|timedelta)\d*$")
SAMPLE_UNIT = "15us"
if UNIT_LESS_DT_RE.fullmatch(dtype_name):
return f"{dtype_name}[{SAMPLE_UNIT}]"
return dtype_name
# (dtype('bool'), dtype('int8'), dtype('<M8[15us]'), ...) Supported dtypes
DTYPES = tuple(
primitive_to_dtype(_add_necessary_unit(k)) for k in _primitive_to_dtype_dict.keys()
)
# (dtype('bool'), dtype('int8'), ...) Only bool and integer types
INDEXABLE_DTYPES = tuple(d for d in DTYPES if d.kind in ("b", "i", "u"))
AWKWARD_ARRAYS = (
# Empty array of unknown type
ak.Array([]),
# Empty arrays of various dtypes
*[ak.from_numpy(np.array([], dtype=d)) for d in DTYPES],
# Non-empty arrays of specific types
ak.Array([1, 2, 3]),
ak.Array([[1, 2], [], [3]]),
ak.Array([[[1.1, 2.2], []], [[3.3]], []]),
ak.Array([1 + 1j, 2 + 2j, 3 + 3j]),
ak.Array([datetime(2020, 1, 1), datetime(2021, 1, 1), datetime(2022, 1, 1)]),
# Non-empty arrays of mixed types
ak.Array([[1, 2], [1.1, 2.2], [1 + 1j, 2 + 2j]]),
ak.Array(
[
[1, 2],
[1.1, 2.2],
[1 + 1j, 2 + 2j],
[datetime(2020, 1, 1), datetime(2021, 1, 1)],
]
),
)
EMPTY_INDEXES = (
# (), # NOTE: An empty tuple doesn't pass the test.
[],
ak.Array([]),
*[ak.from_numpy(np.array([], dtype=d)) for d in INDEXABLE_DTYPES],
*[np.array([], dtype=d) for d in INDEXABLE_DTYPES],
)
@pytest.mark.parametrize("a", AWKWARD_ARRAYS)
@pytest.mark.parametrize("idx", EMPTY_INDEXES)
def test_empty_index(a: ak.Array, idx: Iterable) -> None:
"""Assert indexing with an empty array preserves the type."""
result = a[idx]
# Assert an empty array.
assert result.to_list() == []
# Assert the type is preserved
# e.g., "0 * complex128", "0 * var * union[complex128, datetime64[us]]"
expected_typestr = str(ak.types.ArrayType(a.type.content, 0))
assert result.typestr == expected_typestr
def test_flatten_empty_array() -> None:
"""Assert empty arrays can be flattened unless its type forbids it."""
# An empty array of unknown type can be flattened as unknown type can be
# flattened, e.g., an empty array of nested arrays.
a = ak.Array([]) # type='0 * unknown'
assert ak.flatten(a).to_list() == []
# The empty sub-array is an empty array of arrays of integers.
a = ak.Array([[[1]], []]) # type='2 * var * var * int64'
assert ak.flatten(a, axis=2).to_list() == [[1], []]
# The type forbids flattening.
a = ak.from_numpy(np.array([], dtype=int)) # type='0 * int64'
with pytest.raises(ak.errors.AxisError):
ak.flatten(a)
|