File: test_chunk_read_write.py

package info (click to toggle)
python-laspy 2.5.4-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,928 kB
  • sloc: python: 9,065; makefile: 20
file content (116 lines) | stat: -rw-r--r-- 4,035 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
"""
Tests related to the 'chunked' reading and writing
"""

import io
import math

import numpy as np
import pytest

import laspy


def check_chunked_reading_gives_expected_points(
    groundtruth_las: laspy.LasData, reader: laspy.LasReader, iter_size: int
):
    """Checks that the points read by the reader are the same as groundtruth points."""
    assert groundtruth_las.point_format == reader.header.point_format
    for i, points in enumerate(reader.chunk_iterator(iter_size)):
        expected_points = groundtruth_las.points[i * iter_size : (i + 1) * iter_size]
        for dim_name in points.array.dtype.names:
            all_close = np.allclose(
                expected_points[dim_name], points[dim_name]
            )
            if not all_close and dim_name == "gps_time":
                pytest.xfail("known weirdness of gps_time dimension")
            assert all_close, f"{dim_name} not equal"


def test_chunked_las_reading_gives_expected_points(las_file_path):
    """
    Test chunked LAS reading
    """
    with laspy.open(las_file_path) as las_reader:
        with laspy.open(las_file_path) as reader:
            las = las_reader.read()
            check_chunked_reading_gives_expected_points(las, reader, iter_size=50)


def test_chunked_laz_reading_gives_expected_points(laz_file_path, laz_backend):
    """
    Test LAZ reading in chunked mode with different backends
    """
    with laspy.open(laz_file_path) as las_reader:
        with laspy.open(laz_file_path, laz_backend=laz_backend) as laz_reader:
            expected_las = las_reader.read()
            check_chunked_reading_gives_expected_points(
                expected_las, laz_reader, iter_size=50
            )


@pytest.mark.parametrize("backend", laspy.LazBackend.detect_available() + (None,))
def test_chunked_writing_gives_expected_points(file_path, backend):
    """
    Write in chunked mode then test that the points are correct
    """
    original_las = laspy.read(file_path)
    iter_size = 51

    do_compress = True if backend is not None else False

    with io.BytesIO() as tmp_output:
        with laspy.open(
            tmp_output,
            mode="w",
            closefd=False,
            header=original_las.header,
            do_compress=do_compress,
            laz_backend=backend,
        ) as las:
            for i in range(int(math.ceil(len(original_las.points) / iter_size))):
                original_points = original_las.points[
                    i * iter_size : (i + 1) * iter_size
                ]
                las.write_points(original_points)

        tmp_output.seek(0)
        with laspy.open(tmp_output, closefd=False) as reader:
            check_chunked_reading_gives_expected_points(original_las, reader, iter_size)


@pytest.mark.parametrize("backend", laspy.LazBackend.detect_available() + (None,))
def test_chunked_dimension_modification(file_path, backend):
    """
    Test that when using chunked mode, if we modify the values in the chunks
    before writing them the values are actually modified.
    """

    mem_dest = io.BytesIO()

    # Read of file in chunked mode modify some values before writing them
    # to a new file
    with laspy.open(file_path, laz_backend=backend) as reader:
        with laspy.open(
            mem_dest, mode="w", laz_backend=backend, header=reader.header, closefd=False
        ) as dest:
            for points in reader.chunk_iterator(50):
                # try a sub field
                points.classification += 1
                # try a normal field
                points.user_data += 2
                # try a scaled field
                points.x += 10.0

                dest.write_points(points)

    mem_dest.seek(0)

    # Prepare out ground truth
    ground_truth_las = laspy.read(file_path)
    ground_truth_las.classification += 1
    ground_truth_las.user_data += 2
    ground_truth_las.x += 10.0

    reader = laspy.open(mem_dest, laz_backend=backend)
    check_chunked_reading_gives_expected_points(ground_truth_las, reader, 42)