1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
|
from typing import Any, BinaryIO, List, Optional, Tuple, Union, cast
import numpy as np
from .._pointappender import IPointAppender
from .._pointreader import IPointReader
from .._pointwriter import IPointWriter
from ..header import LasHeader
from ..point.format import PointFormat
from ..point.record import PackedPointRecord
from ..vlrs.known import LasZipVlr
from .lazbackend import ILazBackend
from .selection import DecompressionSelection
try:
import lazrs
except ModuleNotFoundError:
lazrs = None
class LazrsBackend(ILazBackend):
def __init__(
self,
parallel: bool = True,
):
self._parallel = parallel
def is_available(self) -> bool:
return lazrs is not None
@property
def supports_append(self) -> bool:
return True
def create_appender(self, dest: BinaryIO, header: LasHeader) -> IPointAppender:
return LazrsAppender(dest, header, parallel=self._parallel)
def create_reader(
self,
source: Any,
header: LasHeader,
decompression_selection: Optional[DecompressionSelection] = None,
) -> IPointReader:
if decompression_selection is None:
decompression_selection = DecompressionSelection.all()
laszip_vlr: LasZipVlr = header.vlrs[header.vlrs.index("LasZipVlr")]
return LazrsPointReader(
source,
laszip_vlr,
parallel=self._parallel,
decompression_selection=decompression_selection,
)
def create_writer(
self,
dest: Any,
header: "LasHeader",
) -> IPointWriter:
return LazrsPointWriter(dest, header.point_format, parallel=self._parallel)
class LazrsPointReader(IPointReader):
"""Implementation for the laz-rs backend, supports single-threaded decompression
as well as multi-threaded decompression
"""
def __init__(
self,
source,
laszip_vlr: LasZipVlr,
parallel: bool,
decompression_selection: DecompressionSelection,
) -> None:
self._source = source
self.vlr = lazrs.LazVlr(laszip_vlr.record_data)
selection = decompression_selection.to_lazrs()
if parallel:
self.decompressor = lazrs.ParLasZipDecompressor(
source, laszip_vlr.record_data, selection
)
else:
self.decompressor = lazrs.LasZipDecompressor(
source, laszip_vlr.record_data, selection
)
@property
def source(self):
return self._source
def read_n_points(self, n: int) -> bytearray:
point_bytes = bytearray(n * self.vlr.item_size())
self.decompressor.decompress_many(point_bytes)
return point_bytes
def seek(self, point_index: int) -> None:
self.decompressor.seek(point_index)
def close(self) -> None:
self.source.close()
def read_chunk_table_only(self) -> List[Tuple[int, int]]:
"""
This function requires the source to be at the start of the chunk table
"""
assert isinstance(self.decompressor, lazrs.LasZipDecompressor)
return self.decompressor.read_chunk_table_only()
def read_raw_bytes(self, n: int) -> bytes:
"""
reads and returns exactly `n` bytes from the source used by
this point reader.
"""
b = bytearray(n)
self.decompressor.read_raw_bytes_into(b)
return bytes(b)
class LazrsPointWriter(IPointWriter):
"""
Compressed point writer using lazrs backend
"""
def __init__(
self, dest: BinaryIO, point_format: PointFormat, parallel: bool
) -> None:
self.dest = dest
self.vlr = lazrs.LazVlr.new_for_compression(
point_format.id, point_format.num_extra_bytes
)
self.parallel = parallel
self.compressor: Optional[
Union[lazrs.ParLasZipCompressor, lazrs.LasZipCompressor]
] = None
def write_initial_header_and_vlrs(
self, header: LasHeader, encoding_errors: str
) -> None:
laszip_vlr = LasZipVlr(self.vlr.record_data())
header.vlrs.append(laszip_vlr)
super().write_initial_header_and_vlrs(header, encoding_errors)
# We have to initialize our compressor here
# because on init, it writes the offset to chunk table
# so the header and vlrs have to be written
if self.parallel:
self.compressor = lazrs.ParLasZipCompressor(self.dest, self.vlr)
else:
self.compressor = lazrs.LasZipCompressor(self.dest, self.vlr)
@property
def destination(self) -> BinaryIO:
return self.dest
def write_points(self, points: PackedPointRecord) -> None:
assert (
self.compressor is not None
), "Trying to write points without having written header"
points_bytes = np.frombuffer(points.array, np.uint8)
self.compressor.compress_many(points_bytes)
def done(self) -> None:
if self.compressor is not None:
self.compressor.done()
class LazrsAppender(IPointAppender):
"""Appending in LAZ file
works by seeking to start of the last chunk
of compressed points, decompress it while keeping the points in
memory.
Then seek back to the start of the last chunk, and recompress
the points we just read, so that we have a compressor in the proper state
ready to compress new points.
"""
def __init__(self, dest: BinaryIO, header: LasHeader, parallel: bool) -> None:
self.offset_to_point_data = header.offset_to_point_data
laszip_vlr = cast(LasZipVlr, header.vlrs.get("LasZipVlr")[0])
if parallel:
self.appender = lazrs.ParLasZipAppender(dest, laszip_vlr.record_data)
else:
self.appender = lazrs.LasZipAppender(dest, laszip_vlr.record_data)
def append_points(self, points: PackedPointRecord) -> None:
points_bytes = np.frombuffer(points.array, np.uint8)
self.appender.compress_many(points_bytes)
def done(self) -> None:
self.appender.done()
|