1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
|
# Copyright (c) DataLab Platform Developers, BSD 3-Clause license, see LICENSE file.
"""
Sigima Common tools for signal and image io support
"""
# pylint: disable=invalid-name # Allows short reference names like x, y, ...
from __future__ import annotations
import dataclasses
import enum
import os
import os.path as osp
import re
from typing import Generic, Literal, Sequence
from sigima.config import _
from sigima.objects.base import BaseObj, TypeObj
from sigima.worker import CallbackWorkerProtocol
class IOAction(enum.Enum):
"""I/O action type"""
LOAD = enum.auto()
SAVE = enum.auto()
@dataclasses.dataclass
class FormatInfo:
"""Format info"""
name: str = None # e.g. "Foobar camera image files"
extensions: str = None # e.g. "*.foobar *.fb"
readable: bool = False # True if format can be read
writeable: bool = False # True if format can be written
requires: list[str] = None # e.g. ["foobar"] if format requires foobar package
def __str__(self) -> str:
"""Return string representation of format info"""
return f"""{self.name}:
Extensions: {self.extensions}
Readable: {"Yes" if self.readable else "No"}
Writeable: {"Yes" if self.writeable else "No"}
Requires: {", ".join(self.requires) if self.requires else "None"}"""
def to_rst_table_row(self) -> str:
"""Return reStructuredText table row for format info
(table `.. list-table::` format, with 5 columns)"""
return f""" * - {self.name}
- {self.extensions}
- {"•" if self.readable else ""}
- {"•" if self.writeable else ""}
- {", ".join(self.requires) if self.requires else "-"}"""
class FormatBase(Generic[TypeObj]):
"""Object representing a data file io"""
FORMAT_INFO: FormatInfo = None
def __init__(self):
self.info = self.FORMAT_INFO
if self.info is None:
raise ValueError(f"Format info not set for {self.__class__.__name__}")
if self.info.name is None:
raise ValueError(f"Format name not set for {self.__class__.__name__}")
if self.info.extensions is None:
raise ValueError(f"Format extensions not set for {self.__class__.__name__}")
if not self.info.readable and not self.info.writeable:
raise ValueError(f"Format {self.info.name} is not readable nor writeable")
self.extlist = get_file_extensions(self.info.extensions)
if not self.extlist:
raise ValueError(f"Invalid format extensions for {self.__class__.__name__}")
if self.info.requires:
for package in self.info.requires:
try:
__import__(package)
except ImportError as exc:
raise ImportError(
f"Format {self.info.name} requires {package} package"
) from exc
def get_filter(self, action: IOAction) -> str | None:
"""Return file filter for Qt file dialog
Args:
action: I/O action type
Returns:
File filter string
"""
assert action in (IOAction.LOAD, IOAction.SAVE)
if action == IOAction.LOAD and not self.info.readable:
return None
if action == IOAction.SAVE and not self.info.writeable:
return None
return f"{self.info.name} ({self.info.extensions})"
def read(
self, filename: str, worker: CallbackWorkerProtocol | None = None
) -> Sequence[TypeObj]:
"""Read list of native objects (signal or image) from file.
For single object, return a list with one object.
Args:
filename: file name
worker: Callback worker object
Raises:
NotImplementedError: if format is not supported
Returns:
List of native objects (signal or image)
"""
raise NotImplementedError(f"Reading from {self.info.name} is not supported")
def write(self, filename: str, obj: BaseObj) -> None:
"""Write data to file
Args:
filename: file name
obj: native object (signal or image)
Raises:
NotImplementedError: if format is not supported
"""
raise NotImplementedError(f"Writing to {self.info.name} is not supported")
# pylint: disable=bad-mcs-classmethod-argument
class BaseIORegistry(Generic[TypeObj], type):
"""Metaclass for registering I/O handler classes"""
REGISTRY_INFO: str = "" # Registry info, override in subclasses
_io_format_instances: list[FormatBase] = []
def __init__(cls, name, bases, attrs):
super().__init__(name, bases, attrs)
if not name.endswith("FormatBase"):
try:
# pylint: disable=no-value-for-parameter
cls._io_format_instances.append(cls())
except ImportError:
# This format is not supported
pass
@classmethod
def get_formats(cls) -> list[FormatBase]:
"""Return I/O format handlers
Returns:
List of I/O format handlers
"""
return cls._io_format_instances
@classmethod
def get_format_info(cls, mode: Literal["rst", "text"] = "rst") -> str:
"""Return I/O format info
Args:
mode: Output format, either 'rst' (reStructuredText) or 'text'
Returns:
Text description for all I/O formats
"""
if mode == "rst":
txt = f"{cls.REGISTRY_INFO}:\n\n.. list-table::\n :header-rows: 1\n\n"
txt += " * - Name\n - Extensions\n "
txt += "- Readable\n - Writeable\n - Requires\n"
txt += "\n".join([fmt.info.to_rst_table_row() for fmt in cls.get_formats()])
else:
txt = f"{cls.REGISTRY_INFO}:{os.linesep}"
indent = " " * 4
finfo = "\n".join([str(fmt.info) for fmt in cls.get_formats()])
txt += "\n".join([indent + line for line in finfo.splitlines()])
return txt
@classmethod
def __get_all_supported_filter(cls, action: IOAction) -> str:
"""Return all supported file filter for Qt file dialog
Args:
action: I/O action type
Returns:
File filter
"""
extlist = [] # file extension list
for fmt in cls.get_formats():
fmt: FormatBase
if not fmt.info.readable and action == IOAction.LOAD:
continue
if not fmt.info.writeable and action == IOAction.SAVE:
continue
extlist.extend(fmt.extlist)
allsupported = _("All supported files")
return f"{allsupported} ({'*.' + ' *.'.join(extlist)})"
@classmethod
def get_filters(cls, action: IOAction) -> str:
"""Return file filters for Qt file dialog
Args:
action: I/O action type
Returns:
File filters
"""
flist = [] # file filter list
flist.append(cls.__get_all_supported_filter(action))
for fmt in cls.get_formats():
fmt: FormatBase
flt = fmt.get_filter(action)
if flt is not None:
flist.append(flt)
return "\n".join(flist)
@classmethod
def get_read_filters(cls) -> str:
"""Return file filters for Qt open file dialog
Returns:
File filters
"""
return cls.get_filters(IOAction.LOAD)
@classmethod
def get_write_filters(cls) -> str:
"""Return file filters for Qt save file dialog
Returns:
File filters
"""
return cls.get_filters(IOAction.SAVE)
@classmethod
def get_format(cls, filename: str, action: IOAction) -> FormatBase:
"""Return format handler for filename
Args:
filename: file name
action: I/O action type
Raises:
NotImplementedError: if file data type is not supported
Returns:
Format handler
"""
for fmt in cls.get_formats():
fmt: FormatBase
if osp.splitext(filename)[1][1:].lower() in fmt.extlist:
if not fmt.info.readable and action == IOAction.LOAD:
continue
if not fmt.info.writeable and action == IOAction.SAVE:
continue
return fmt
raise NotImplementedError(
f"{filename} is not supported for {action.name.lower()}"
)
@classmethod
def read(
cls, filename: str, worker: CallbackWorkerProtocol | None = None
) -> Sequence[TypeObj]:
"""Read data from file, return native object (signal or image) list.
For single object, return a list with one object.
Args:
filename: file name
worker: Callback worker object
Raises:
NotImplementedError: if file data type is not supported
Returns:
List of native objects (signal or image)
"""
fmt = cls.get_format(filename, IOAction.LOAD)
return fmt.read(filename, worker)
@classmethod
def write(cls, filename: str, obj: BaseObj) -> None:
"""Write data to file from native object (signal or image).
Args:
filename: file name
obj: native object (signal or image)
Raises:
NotImplementedError: if file data type is not supported
"""
fmt = cls.get_format(filename, IOAction.SAVE)
fmt.write(filename, obj)
def get_file_extensions(string: str) -> list[str]:
"""Return a sorted list of unique file extensions contained in `string`.
Args:
string: String containing file extensions.
Returns:
List of file extensions.
"""
pattern = r"\S+\.[\w-]+"
matches = re.findall(pattern, string)
return sorted({match.split(".")[-1].lower() for match in matches})
|