File: laswriter.py

package info (click to toggle)
python-laspy 2.5.4-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 3,928 kB
  • sloc: python: 9,065; makefile: 20
file content (231 lines) | stat: -rw-r--r-- 7,426 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
import logging
import sys
from copy import deepcopy
from typing import BinaryIO, Iterable, Optional, Union

from ._pointwriter import IPointWriter
from .compression import LazBackend
from .errors import LaspyException
from .header import LasHeader
from .point import dims
from .point.record import PackedPointRecord
from .vlrs.vlrlist import VLRList

logger = logging.getLogger(__name__)


class LasWriter:
    """
    Allows to write a complete LAS/LAZ file to the destination.
    """

    def __init__(
        self,
        dest: BinaryIO,
        header: LasHeader,
        do_compress: Optional[bool] = None,
        laz_backend: Optional[Union[LazBackend, Iterable[LazBackend]]] = None,
        closefd: bool = True,
        encoding_errors: str = "strict",
    ) -> None:
        """
        Parameters
        ----------
        dest: file_object
            file object where the LAS/LAZ will be written

        header: LasHeader
            The header of the file to be written

        do_compress: bool, optional
            Whether the file data should be written as LAS (uncompressed)
            or LAZ (compressed).
            If None, the file won't be compressed, unless a laz_backend is provided

        laz_backend: LazBackend or list of LazBackend, optional
            The LazBackend to use (or if it is a sequence the LazBackend to try)
            for the compression

        closefd: bool, default True
            should the `dest` be closed when the writer is closed

        encoding_errors: str, default 'strict'
            How encoding errors should be treated.
            Possible values and their explanation can be seen here:
            https://docs.python.org/3/library/codecs.html#error-handlers.
        """
        self.closefd = closefd
        self.encoding_errors = encoding_errors
        self.header = deepcopy(header)
        # The point writer will take take of creating and writing
        # the correct laszip vlr, however we have to make sure
        # no prior laszip vlr exists
        try:
            self.header.vlrs.pop(header.vlrs.index("LasZipVlr"))
        except ValueError:
            pass
        self.header.partial_reset()

        self.dest = dest
        self.done = False

        dims.raise_if_version_not_compatible_with_fmt(
            header.point_format.id, str(self.header.version)
        )

        if laz_backend is not None:
            if do_compress is None:
                do_compress = True
            self.laz_backend = laz_backend
        else:
            if do_compress is None:
                do_compress = False
            self.laz_backend = LazBackend.detect_available()
        self.header.are_points_compressed = do_compress

        if do_compress:
            self.point_writer: IPointWriter = self._create_laz_backend(self.laz_backend)
        else:
            self.point_writer: IPointWriter = UncompressedPointWriter(self.dest)

        self.point_writer.write_initial_header_and_vlrs(
            self.header, self.encoding_errors
        )

    def write_points(self, points: PackedPointRecord) -> None:
        """
        .. note ::

            If you are writing points coming from multiple different input files
            into one output file, you have to make sure the point record
            you write all use the same scales and offset of the writer.

            You can use :meth:`.LasData.change_scaling` or :meth:`.ScaleAwarePointRecord.change_scaling`
            to do that.

        Parameters
        ----------
        points: PackedPointRecord or ScaleAwarePointRecord
                The points to be written

        Raises
        ------
        LaspyException
            If the point format of the points does not match
            the point format of the writer.
        """
        if not points:
            return

        if self.done:
            raise LaspyException("Cannot write points anymore")

        if points.point_format != self.header.point_format:
            raise LaspyException("Incompatible point formats")

        if self.header.max_point_count() - self.header.point_count < len(points):
            raise LaspyException(
                "Cannot write {} points as it would exceed the maximum number of points the file"
                "can store. Current point count: {}, max point count: {}".format(
                    len(points), self.header.point_count, self.header.max_point_count()
                )
            )

        self.header.grow(points)
        self.point_writer.write_points(points)

    def write_evlrs(self, evlrs: VLRList) -> None:
        """Writes the EVLRs to the file

        Parameters
        ----------
        evlrs: VLRList
               The EVLRs to be written

        Raises
        ------
        LaspyException
            If the file's version is not >= 1.4
        """
        if self.header.version.minor < 4:
            raise LaspyException(
                "EVLRs are not supported on files with version less than 1.4"
            )

        if len(evlrs) > 0:
            self.point_writer.done()
            self.done = True
            self.header.number_of_evlrs = len(evlrs)
            self.header.start_of_first_evlr = self.dest.tell()
            evlrs.write_to(self.dest, as_extended=True)

    def close(self) -> None:
        """Closes the writer.

        flushes the points, updates the header, making it impossible
        to write points afterwards.
        """
        if self.point_writer is not None:
            if not self.done:
                self.point_writer.done()

            if self.header.point_count == 0:
                self.header.maxs = [0.0, 0.0, 0.0]
                self.header.mins = [0.0, 0.0, 0.0]

            self.point_writer.write_updated_header(self.header, self.encoding_errors)
        if self.closefd:
            self.dest.close()
        self.done = True

    def _create_laz_backend(
        self, laz_backends: Union[LazBackend, Iterable[LazBackend]]
    ) -> "IPointWriter":
        try:
            laz_backends = iter(laz_backends)
        except TypeError:
            laz_backends = (laz_backends,)

        last_error: Optional[Exception] = None
        for backend in laz_backends:
            try:
                if not backend.is_available():
                    raise LaspyException(f"The '{backend}' is not available")

                return backend.create_writer(self.dest, self.header)
            except Exception as e:
                logger.error(e)
                last_error = e

        if last_error is not None:
            raise LaspyException(f"No LazBackend could be initialized: {last_error}")
        else:
            raise LaspyException("No LazBackend selected, cannot compress")

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()


class UncompressedPointWriter(IPointWriter):
    """
    Writing points in the simple uncompressed case.
    """

    def __init__(self, dest: BinaryIO) -> None:
        self.dest = dest

    @property
    def destination(self) -> BinaryIO:
        return self.dest

    def write_points(self, points: PackedPointRecord) -> None:
        if sys.byteorder == "little":
            self.dest.write(points.memoryview())
        else:
            self.dest.write(memoryview(points.array.byteswap()))

    def done(self) -> None:
        pass