1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231
|
import logging
import sys
from copy import deepcopy
from typing import BinaryIO, Iterable, Optional, Union
from ._pointwriter import IPointWriter
from .compression import LazBackend
from .errors import LaspyException
from .header import LasHeader
from .point import dims
from .point.record import PackedPointRecord
from .vlrs.vlrlist import VLRList
logger = logging.getLogger(__name__)
class LasWriter:
"""
Allows to write a complete LAS/LAZ file to the destination.
"""
def __init__(
self,
dest: BinaryIO,
header: LasHeader,
do_compress: Optional[bool] = None,
laz_backend: Optional[Union[LazBackend, Iterable[LazBackend]]] = None,
closefd: bool = True,
encoding_errors: str = "strict",
) -> None:
"""
Parameters
----------
dest: file_object
file object where the LAS/LAZ will be written
header: LasHeader
The header of the file to be written
do_compress: bool, optional
Whether the file data should be written as LAS (uncompressed)
or LAZ (compressed).
If None, the file won't be compressed, unless a laz_backend is provided
laz_backend: LazBackend or list of LazBackend, optional
The LazBackend to use (or if it is a sequence the LazBackend to try)
for the compression
closefd: bool, default True
should the `dest` be closed when the writer is closed
encoding_errors: str, default 'strict'
How encoding errors should be treated.
Possible values and their explanation can be seen here:
https://docs.python.org/3/library/codecs.html#error-handlers.
"""
self.closefd = closefd
self.encoding_errors = encoding_errors
self.header = deepcopy(header)
# The point writer will take take of creating and writing
# the correct laszip vlr, however we have to make sure
# no prior laszip vlr exists
try:
self.header.vlrs.pop(header.vlrs.index("LasZipVlr"))
except ValueError:
pass
self.header.partial_reset()
self.dest = dest
self.done = False
dims.raise_if_version_not_compatible_with_fmt(
header.point_format.id, str(self.header.version)
)
if laz_backend is not None:
if do_compress is None:
do_compress = True
self.laz_backend = laz_backend
else:
if do_compress is None:
do_compress = False
self.laz_backend = LazBackend.detect_available()
self.header.are_points_compressed = do_compress
if do_compress:
self.point_writer: IPointWriter = self._create_laz_backend(self.laz_backend)
else:
self.point_writer: IPointWriter = UncompressedPointWriter(self.dest)
self.point_writer.write_initial_header_and_vlrs(
self.header, self.encoding_errors
)
def write_points(self, points: PackedPointRecord) -> None:
"""
.. note ::
If you are writing points coming from multiple different input files
into one output file, you have to make sure the point record
you write all use the same scales and offset of the writer.
You can use :meth:`.LasData.change_scaling` or :meth:`.ScaleAwarePointRecord.change_scaling`
to do that.
Parameters
----------
points: PackedPointRecord or ScaleAwarePointRecord
The points to be written
Raises
------
LaspyException
If the point format of the points does not match
the point format of the writer.
"""
if not points:
return
if self.done:
raise LaspyException("Cannot write points anymore")
if points.point_format != self.header.point_format:
raise LaspyException("Incompatible point formats")
if self.header.max_point_count() - self.header.point_count < len(points):
raise LaspyException(
"Cannot write {} points as it would exceed the maximum number of points the file"
"can store. Current point count: {}, max point count: {}".format(
len(points), self.header.point_count, self.header.max_point_count()
)
)
self.header.grow(points)
self.point_writer.write_points(points)
def write_evlrs(self, evlrs: VLRList) -> None:
"""Writes the EVLRs to the file
Parameters
----------
evlrs: VLRList
The EVLRs to be written
Raises
------
LaspyException
If the file's version is not >= 1.4
"""
if self.header.version.minor < 4:
raise LaspyException(
"EVLRs are not supported on files with version less than 1.4"
)
if len(evlrs) > 0:
self.point_writer.done()
self.done = True
self.header.number_of_evlrs = len(evlrs)
self.header.start_of_first_evlr = self.dest.tell()
evlrs.write_to(self.dest, as_extended=True)
def close(self) -> None:
"""Closes the writer.
flushes the points, updates the header, making it impossible
to write points afterwards.
"""
if self.point_writer is not None:
if not self.done:
self.point_writer.done()
if self.header.point_count == 0:
self.header.maxs = [0.0, 0.0, 0.0]
self.header.mins = [0.0, 0.0, 0.0]
self.point_writer.write_updated_header(self.header, self.encoding_errors)
if self.closefd:
self.dest.close()
self.done = True
def _create_laz_backend(
self, laz_backends: Union[LazBackend, Iterable[LazBackend]]
) -> "IPointWriter":
try:
laz_backends = iter(laz_backends)
except TypeError:
laz_backends = (laz_backends,)
last_error: Optional[Exception] = None
for backend in laz_backends:
try:
if not backend.is_available():
raise LaspyException(f"The '{backend}' is not available")
return backend.create_writer(self.dest, self.header)
except Exception as e:
logger.error(e)
last_error = e
if last_error is not None:
raise LaspyException(f"No LazBackend could be initialized: {last_error}")
else:
raise LaspyException("No LazBackend selected, cannot compress")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
class UncompressedPointWriter(IPointWriter):
"""
Writing points in the simple uncompressed case.
"""
def __init__(self, dest: BinaryIO) -> None:
self.dest = dest
@property
def destination(self) -> BinaryIO:
return self.dest
def write_points(self, points: PackedPointRecord) -> None:
if sys.byteorder == "little":
self.dest.write(points.memoryview())
else:
self.dest.write(memoryview(points.array.byteswap()))
def done(self) -> None:
pass
|