1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186
|
import io
import os
import numpy as np
import pytest
from numpy.testing import assert_array_equal
import asdf
from asdf import Stream, generic_io
def test_stream():
buff = io.BytesIO()
tree = {"stream": Stream([6, 2], np.float64)}
ff = asdf.AsdfFile(tree)
ff.write_to(buff)
for i in range(100):
buff.write(np.array([i] * 12, np.float64).tobytes())
buff.seek(0)
with asdf.open(buff) as ff:
assert len(ff._blocks.blocks) == 1
assert ff.tree["stream"].shape == (100, 6, 2)
for i, row in enumerate(ff.tree["stream"]):
assert np.all(row == i)
def test_stream_write_nothing():
"""
Test that if you write nothing, you get a zero-length array
"""
buff = io.BytesIO()
tree = {"stream": Stream([6, 2], np.float64)}
ff = asdf.AsdfFile(tree)
ff.write_to(buff)
buff.seek(0)
with asdf.open(buff) as ff:
assert len(ff._blocks.blocks) == 1
assert ff.tree["stream"].shape == (0, 6, 2)
def test_stream_twice():
"""
Test that if you write nothing, you get a zero-length array
"""
buff = io.BytesIO()
tree = {"stream": Stream([6, 2], np.uint8), "stream2": Stream([12, 2], np.uint8)}
ff = asdf.AsdfFile(tree)
ff.write_to(buff)
for i in range(100):
buff.write(np.array([i] * 12, np.uint8).tobytes())
buff.seek(0)
ff = asdf.open(buff)
assert len(ff._blocks.blocks) == 1
assert ff.tree["stream"].shape == (100, 6, 2)
assert ff.tree["stream2"].shape == (50, 12, 2)
def test_stream_with_nonstream():
buff = io.BytesIO()
tree = {"nonstream": np.array([1, 2, 3, 4], np.int64), "stream": Stream([6, 2], np.float64)}
ff = asdf.AsdfFile(tree)
# Since we're testing with small arrays, force this array to be stored in
# an internal block rather than letting it be automatically put inline.
ff.set_array_storage(ff["nonstream"], "internal")
ff.write_to(buff)
for i in range(100):
buff.write(np.array([i] * 12, np.float64).tobytes())
buff.seek(0)
with asdf.open(buff) as ff:
assert len(ff._blocks.blocks) == 2
assert_array_equal(ff.tree["nonstream"], np.array([1, 2, 3, 4], np.int64))
assert ff.tree["stream"].shape == (100, 6, 2)
for i, row in enumerate(ff.tree["stream"]):
assert np.all(row == i)
def test_stream_real_file(tmp_path):
path = os.path.join(str(tmp_path), "test.asdf")
tree = {"nonstream": np.array([1, 2, 3, 4], np.int64), "stream": Stream([6, 2], np.float64)}
with open(path, "wb") as fd:
ff = asdf.AsdfFile(tree)
# Since we're testing with small arrays, force this array to be stored
# in an internal block rather than letting it be automatically put
# inline.
ff.set_array_storage(ff["nonstream"], "internal")
ff.write_to(fd)
for i in range(100):
fd.write(np.array([i] * 12, np.float64).tobytes())
with asdf.open(path) as ff:
assert len(ff._blocks.blocks) == 2
assert_array_equal(ff.tree["nonstream"], np.array([1, 2, 3, 4], np.int64))
assert ff.tree["stream"].shape == (100, 6, 2)
for i, row in enumerate(ff.tree["stream"]):
assert np.all(row == i)
def test_stream_to_stream():
tree = {"nonstream": np.array([1, 2, 3, 4], np.int64), "stream": Stream([6, 2], np.float64)}
buff = io.BytesIO()
fd = generic_io.OutputStream(buff)
ff = asdf.AsdfFile(tree)
ff.write_to(fd)
for i in range(100):
fd.write(np.array([i] * 12, np.float64).tobytes())
buff.seek(0)
with asdf.open(generic_io.InputStream(buff, "r")) as ff:
assert len(ff._blocks.blocks) == 2
assert_array_equal(ff.tree["nonstream"], np.array([1, 2, 3, 4], np.int64))
assert ff.tree["stream"].shape == (100, 6, 2)
for i, row in enumerate(ff.tree["stream"]):
assert np.all(row == i)
def test_array_to_stream(tmp_path):
tree = {
"stream": np.array([1, 2, 3, 4], np.int64),
}
buff = io.BytesIO()
ff = asdf.AsdfFile(tree)
ff.set_array_storage(tree["stream"], "streamed")
ff.write_to(buff)
buff.write(np.array([5, 6, 7, 8], np.int64).tobytes())
buff.seek(0)
ff = asdf.open(generic_io.InputStream(buff))
assert_array_equal(ff.tree["stream"], [1, 2, 3, 4, 5, 6, 7, 8])
buff.seek(0)
ff2 = asdf.AsdfFile(ff)
ff2.set_array_storage(ff2["stream"], "streamed")
ff2.write_to(buff)
assert b"shape: ['*']" in buff.getvalue()
with open(os.path.join(str(tmp_path), "test.asdf"), "wb") as fd:
ff = asdf.AsdfFile(tree)
ff.set_array_storage(tree["stream"], "streamed")
ff.write_to(fd)
fd.write(np.array([5, 6, 7, 8], np.int64).tobytes())
with asdf.open(os.path.join(str(tmp_path), "test.asdf")) as ff:
assert_array_equal(ff.tree["stream"], [1, 2, 3, 4, 5, 6, 7, 8])
ff2 = asdf.AsdfFile(ff)
ff2.write_to(buff)
assert b"shape: ['*']" in buff.getvalue()
def test_too_many_streams():
tree = {"stream1": np.array([1, 2, 3, 4], np.int64), "stream2": np.array([1, 2, 3, 4], np.int64)}
ff = asdf.AsdfFile(tree)
ff.set_array_storage(tree["stream1"], "streamed")
with pytest.raises(ValueError, match=r"Can not add second streaming block"):
ff.set_array_storage(tree["stream2"], "streamed")
def test_stream_repr_and_str():
tree = {"stream": Stream([16], np.int64)}
ff = asdf.AsdfFile(tree)
repr(ff.tree["stream"])
str(ff.tree["stream"])
|