1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158
|
import io
import os
from pathlib import Path
from typing import List, Tuple
import pytest
import laspy
from laspy import LasData, LasHeader, ScaleAwarePointRecord
from tests.conftest import NonSeekableStream
from tests.test_common import simple_laz
def test_append(file_path):
"""
Test appending
"""
if file_path.suffix == ".laz" and not laspy.LazBackend.Lazrs.is_available():
pytest.skip("Only Lazrs backed supports appending")
append_self_and_check(file_path)
def test_raises_for_laszip_backend():
with pytest.raises(laspy.LaspyException):
with laspy.open(simple_laz, mode="a", laz_backend=laspy.LazBackend.Laszip):
...
def test_append_las_with_evlrs():
las = append_self_and_check(os.path.dirname(__file__) + "/data/1_4_w_evlr.las")
expected_evlr = laspy.VLR(
user_id="pylastest", record_id=42, description="just a test evlr"
)
expected_evlr.record_data = b"Test 1 2 ... 1 2"
assert len(las.evlrs) == 1
evlr = las.evlrs[0]
assert evlr.description == expected_evlr.description
assert evlr.record_id == expected_evlr.record_id
assert evlr.user_id == expected_evlr.user_id
assert evlr.record_data == expected_evlr.record_data
@pytest.mark.skipif(
not laspy.LazBackend.Lazrs.is_available(), reason="Lazrs is not installed"
)
def test_append_laz_with_evlrs():
las = append_self_and_check(os.path.dirname(__file__) + "/data/1_4_w_evlr.laz")
expected_evlr = laspy.VLR(
user_id="pylastest", record_id=42, description="just a test evlr"
)
expected_evlr.record_data = b"Test 1 2 ... 1 2"
assert len(las.evlrs) == 1
evlr = las.evlrs[0]
assert evlr.description == expected_evlr.description
assert evlr.record_id == expected_evlr.record_id
assert evlr.user_id == expected_evlr.user_id
assert evlr.record_data == expected_evlr.record_data
def append_self_and_check(las_path_fixture):
with open(las_path_fixture, mode="rb") as f:
file = io.BytesIO(f.read())
las = laspy.read(las_path_fixture)
with laspy.open(file, mode="a", closefd=False) as laz_file:
laz_file.append_points(las.points)
file.seek(0, io.SEEK_SET)
rlas = laspy.read(file)
assert rlas.header.point_count == 2 * las.header.point_count
assert rlas.points[: rlas.header.point_count // 2] == las.points
assert rlas.points[rlas.header.point_count // 2 :] == las.points
return rlas
def test_trying_to_append_in_non_seekable_raises():
with pytest.raises(TypeError):
with open(simple_laz, mode="rb") as f:
stream = NonSeekableStream(f)
with laspy.open(stream, mode="a") as lasf:
pass
@pytest.mark.parametrize(
"points_read_counts",
[
[-1],
[1, -1],
[49_999, -1],
[50_000, -1],
[50_001, -1],
],
)
@pytest.mark.skipif(
not laspy.LazBackend.Lazrs.is_available(), reason="Lazrs is not installed"
)
def test_write_then_append_produces_valid_cloud(points_read_counts: List[int]) -> None:
"""
In this test we read a LAZ file into chunks dictated by `points_read_counts`,
then we reconstruct it by first writting the first chunk into a new file using write mode
then we append the remaining chunks to that newly created file.
At the end the input file and the one we patchworked must be the same
"""
def iter_points(
points_list: List[ScaleAwarePointRecord],
) -> Tuple[ScaleAwarePointRecord, slice]:
"""
Returns a tuple that associates each point record in the input list
with the slice to be used to recover the same chunk from
the input file.
"""
offset: int = 0
for points in points_list:
yield points, slice(offset, offset + len(points))
offset += len(points)
def check_write_append_read(
points_list: List[ScaleAwarePointRecord], header: LasHeader
) -> None:
"""
Implements the logic of first using write mode for the first
chunk, then appending the remaining chunks.
Checks result at the end
"""
with io.BytesIO() as tmp_output:
with laspy.open(
tmp_output, "w", header=header, closefd=False, do_compress=True
) as las_writer:
las_writer.write_points(points_list[0])
for points in points_list[1:]:
tmp_output.seek(0)
if points:
with laspy.open(tmp_output, "a", closefd=False) as las_appender:
las_appender.append_points(points)
tmp_output.seek(0)
final_cloud: LasData = laspy.read(tmp_output)
for points, selector in iter_points(points_list):
assert final_cloud.points[selector] == points
input_cloud_path = Path(__file__).parent / "data" / "autzen_trim.laz"
with laspy.open(input_cloud_path) as las_reader:
points_list: List[ScaleAwarePointRecord] = [
las_reader.read_points(points_read_count)
for points_read_count in points_read_counts
]
check_write_append_read(points_list, las_reader.header)
|