1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402
|
import codecs
import os.path
import subprocess
from datetime import datetime
from collections import defaultdict
from typing import Tuple, Optional, Literal, Sequence
import numpy as np
import pandas as pd
try:
from pandas.tseries.api import guess_datetime_format
except ImportError: # pandas < 2.2.0
from pandas.core.tools.datetimes import guess_datetime_format
from chardet.universaldetector import UniversalDetector
from Orange.data import (
is_discrete_values, MISSING_VALUES, Variable,
DiscreteVariable, StringVariable, ContinuousVariable, TimeVariable, Table,
)
from Orange.misc.collections import natural_sorted
from Orange.util import ftry, frompyfunc
__all__ = [
"Compression",
"open_compressed",
"detect_encoding",
"isnastr",
"guess_data_type",
"sanitize_variable",
"update_origin",
"isnatstr",
"array_strptime",
"parse_datetime",
"to_datetime",
]
class Compression:
"""Supported compression extensions"""
GZIP = '.gz'
BZIP2 = '.bz2'
XZ = '.xz'
all = (GZIP, BZIP2, XZ)
def open_compressed(filename, *args, _open=open, **kwargs):
"""Return seamlessly decompressed open file handle for `filename`"""
if isinstance(filename, str):
if filename.endswith(Compression.GZIP):
from gzip import open as _open
elif filename.endswith(Compression.BZIP2):
from bz2 import open as _open
elif filename.endswith(Compression.XZ):
from lzma import open as _open
return _open(filename, *args, **kwargs)
# Else already a file, just pass it through
return filename
def _is_utf8_sig(filename: str) -> bool:
"""Does filename start with an UTF-8 BOM."""
try:
with open(filename, "rb") as f:
bom = f.read(3)
return bom == codecs.BOM_UTF8
except OSError: # pragma: no cover
return False
def detect_encoding(filename):
"""
Detect encoding of `filename`, which can be a ``str`` filename, a
``file``-like object, or ``bytes``.
"""
# Try with Unix file utility first because it's faster (~10ms vs 100ms)
if isinstance(filename, str) and not filename.endswith(Compression.all):
try:
with subprocess.Popen(('file', '--brief', '--mime-encoding',
filename), stdout=subprocess.PIPE) as proc:
proc.wait()
if proc.returncode == 0:
encoding = proc.stdout.read().strip()
# file does not detect/report UTF-8 BOM
if encoding == b'utf-8':
return "utf-8-sig" if _is_utf8_sig(filename) else "utf-8"
# file only supports these encodings; for others it says
# unknown-8bit or binary. So we give chardet a chance to do
# better
if encoding in (b'utf-8', b'us-ascii', b'iso-8859-1',
b'utf-7', b'utf-16le', b'utf-16be',
b'ebcdic'):
return encoding.decode('us-ascii')
except OSError:
pass # windoze
# file not available or unable to guess the encoding, have chardet do it
detector = UniversalDetector()
# We examine only first N 4kB blocks of file because chardet is really slow
MAX_BYTES = 4 * 1024 * 12
def _from_file(f):
detector.feed(f.read(MAX_BYTES))
detector.close()
return (detector.result.get('encoding')
if detector.result.get('confidence', 0) >= .85 else
'utf-8')
if isinstance(filename, str):
with open_compressed(filename, 'rb') as f:
return _from_file(f)
elif isinstance(filename, bytes):
detector.feed(filename[:MAX_BYTES])
detector.close()
return detector.result.get('encoding')
elif hasattr(filename, 'encoding'):
return filename.encoding
else: # assume file-like object that you can iter through
return _from_file(filename)
__isnastr = np.frompyfunc(
{v for v in MISSING_VALUES if isinstance(v, str)}.__contains__, 1, 1)
# wrapper for __isnastr with proper default out dtype
def isnastr(arr, out=None):
"""
Given an (object) array of string values, return a boolean mask array
that is True where the `arr` contains one of the string constants
considered as N/A.
Parameters
----------
arr : np.ndarray
Input array of strings.
out : Optional[np.ndarray]
Optional output array of the same shape as arr
Returns
-------
mask : np.ndarray
"""
arr = np.asarray(arr)
if out is None and arr.shape != ():
out = np.empty_like(arr, dtype=bool)
return __isnastr(arr, out=out, casting="unsafe")
_as_string_array = np.frompyfunc(str, 1, 1)
def guess_data_type(orig_values, namask=None):
"""
Use heuristics to guess data type.
"""
valuemap, values = None, orig_values
is_discrete = is_discrete_values(orig_values)
orig_values = _as_string_array(orig_values)
if namask is None:
namask = isnastr(orig_values)
if is_discrete:
valuemap = natural_sorted(is_discrete)
coltype = DiscreteVariable
else:
# try to parse as float
values = np.empty_like(orig_values, dtype=float)
values[namask] = np.nan
try:
np.copyto(values, orig_values, where=~namask, casting="unsafe")
except ValueError:
values = orig_values
coltype = StringVariable
else:
coltype = ContinuousVariable
if coltype is not ContinuousVariable:
# when not continuous variable it can still be time variable even it
# was before recognized as a discrete
tvar = TimeVariable('_')
# introducing new variable prevent overwriting orig_values and values
temp_values = np.empty_like(orig_values, dtype=float)
try:
temp_values[~namask] = [
tvar.parse_exact_iso(i) for i in orig_values[~namask]]
except ValueError:
pass
else:
valuemap = None
coltype = TimeVariable
values = temp_values
return valuemap, values, coltype
def sanitize_variable(valuemap, values, orig_values, coltype, coltype_kwargs,
name=None):
assert issubclass(coltype, Variable)
def get_number_of_decimals(values):
len_ = len
ndecimals = max((len_(value) - value.find(".")
for value in values if "." in value),
default=1)
return ndecimals - 1
if issubclass(coltype, DiscreteVariable) and valuemap is not None:
coltype_kwargs.update(values=valuemap)
var = coltype.make(name, **coltype_kwargs)
if isinstance(var, DiscreteVariable):
# Map discrete data to 'ints' (or at least what passes as int around
# here)
mapping = defaultdict(
lambda: np.nan,
{val: i for i, val in enumerate(var.values)},
)
mapping[""] = np.nan
mapvalues_ = np.frompyfunc(mapping.__getitem__, 1, 1)
def mapvalues(arr):
arr = np.asarray(arr, dtype=object)
return mapvalues_(arr, out=np.empty_like(arr, dtype=float), casting="unsafe")
values = mapvalues(orig_values)
if coltype is StringVariable:
values = orig_values
# ContinuousVariable.number_of_decimals is supposed to be handled by
# ContinuousVariable.to_val. In the interest of speed, the reader bypasses
# it, so we set the number of decimals here.
# The number of decimals is increased if not set manually (in which case
# var.adjust_decimals would be 0).
if isinstance(var, ContinuousVariable) and var.adjust_decimals:
ndecimals = get_number_of_decimals(orig_values)
if var.adjust_decimals == 2 or ndecimals > var.number_of_decimals:
var.number_of_decimals = ndecimals
var.adjust_decimals = 1
if isinstance(var, TimeVariable) or coltype is TimeVariable:
# Re-parse the values because only now after coltype.make call
# above, variable var is the correct one
_var = var if isinstance(var, TimeVariable) else TimeVariable('_')
values = [_var.parse(i) for i in orig_values]
return values, var
def _extract_new_origin(attr: Variable, table: Table, lookup_dirs: Tuple[str]) -> Optional[str]:
# origin exists
if os.path.exists(attr.attributes["origin"]):
return attr.attributes["origin"]
# last dir of origin in lookup dirs
dir_ = os.path.basename(os.path.normpath(attr.attributes["origin"]))
for ld in lookup_dirs:
new_dir = os.path.join(ld, dir_)
if os.path.isdir(new_dir):
return new_dir
# all column paths in lookup dirs
for ld in lookup_dirs:
if all(
os.path.exists(os.path.join(ld, attr.str_val(v)))
for v in table.get_column(attr)
if v and not pd.isna(v)
):
return ld
return None
def update_origin(table: Table, file_path: str):
"""
When a dataset with file paths in the column is moved to another computer,
the absolute path may not be correct. This function updates the path for all
columns with an "origin" attribute.
The process consists of two steps. First, we identify directories to search
for files, and in the second step, we check if paths exist.
Lookup directories:
1. The directory where the file from file_path is placed
2. The parent directory of 1. The situation when the user places dataset
file in the directory with files (for example, workflow in a directory
with images)
Possible situations for file search:
1. The last directory of origin (basedir) is in one of the lookup directories
2. Origin doesn't exist in any lookup directories, but paths in a column can
be found in one of the lookup directories. This is usually a situation
when paths in a column are complex (e.g. a/b/c/d/file.txt).
Note: This function updates the existing table
Parameters
----------
table
Orange Table to be updated if origin exits in any column
file_path
Path of the loaded dataset for reference. Only paths inside datasets
directory or its parent directory will be considered for new origin.
"""
file_dir = os.path.dirname(file_path)
parent_dir = os.path.dirname(file_dir)
# if file_dir already root file_dir == parent_dir
lookup_dirs = tuple({file_dir: 0, parent_dir: 0})
for attr in table.domain.metas:
if "origin" in attr.attributes and (attr.is_string or attr.is_discrete):
new_orig = _extract_new_origin(attr, table, lookup_dirs)
if new_orig:
attr.attributes["origin"] = new_orig
isnatstr = frompyfunc(
(MISSING_VALUES | {"nat", "Nat", "NaT", "NAT"}).__contains__,
1, 1, dtype=bool,
)
def array_strptime(
values: Sequence[str],
format: str,
errors: Literal["raise", "coerce"] = "raise",
dtype=np.dtype("M8[us]"),
) -> 'np.ndarray[np.datetime64]':
"""
Parse an array `values` of date/time strings.
Parameters
----------
values: Sequence[str]
format: str
A `time.strptime` date/time format string.
dtype: np.dtype
The return dtype
errors: Literal["raise", "coerce"]
How to treat parse errors.
"""
values = np.asarray(values, dtype=object)
out = np.full(values.shape, np.datetime64("NaT"), dtype=dtype)
if errors == "raise":
f = np.frompyfunc(datetime.strptime, 2, 1)
elif errors == "coerce":
f = np.frompyfunc(ftry(datetime.strptime, ValueError, np.datetime64("NaT")), 2, 1)
else: # pragma: no cover
raise TypeError(f"Invalid 'errors' argument {errors}")
na_mask = isnatstr(values)
return f(values, format, where=~na_mask, out=out, casting="unsafe")
def first_non_natstr(c: Sequence[str]) -> int | None:
"""Return first element of `c` that is not a NaT string."""
mask = isnatstr(c)
idxs = np.flatnonzero(~mask)
if idxs.size:
return int(idxs[0])
return None
def parse_datetime(
values: Sequence[str],
format: str | None = None,
errors: Literal["raise", "coerce"] = "raise",
dtype=np.dtype("M8[us]"),
) -> 'np.ndarray[np.datetime64]':
values = np.asarray(values, dtype=object)
if format is None:
idx = first_non_natstr(values)
if idx is not None:
format = guess_datetime_format(values[idx])
if format is None: # pragma: no cover
raise ValueError("Cannot guess date/time format")
return array_strptime(values, format, errors=errors, dtype=dtype)
def to_datetime(
values: Sequence[str],
format: str | None = None,
errors: Literal["raise", "coerce"] = "raise",
) -> "np.ndarray[np.datetime64]":
"""
Similar to `pandas.to_datetime` but support parsing years before 1677
and after 2262.
https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#timeseries-timestamp-limits
"""
try:
# try with errors="raise" to catch OutOfBoundsDatetime, even if errors was
# "coerce"
return (pd.to_datetime(values, format=format, errors="raise", utc=True)
.values.astype("M8[us]"))
except pd.errors.OutOfBoundsDatetime:
# slower path
return parse_datetime(values, format, errors=errors)
except Exception: # pylint: disable=broad-except
if errors != "raise":
return (pd.to_datetime(values, format=format, errors=errors, utc=True)
.values.astype("M8[us]"))
else:
raise
|