1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
|
import h5py
from typing import *
import logging
import numpy as np
import loompy
from .utils import get_loom_spec_version
class LoomValidator:
def __init__(self, version: str = None) -> None:
"""
Args:
version: The Loom file format version to validate against ("3.0.0", "2.0.1", "old"), or None to infer from file
Remarks:
"old" version will accept files that lack the "row_graphs" and "col_graphs" groups
"""
self.version = version #: Version of the spec to validate against
self.errors: List[str] = [] #: Errors found during validation
self.warnings: List[str] = [] #: Warnings triggered during validation
self.summary: List[str] = [] #: Summary of the file structure
def _check(self, condition: bool, message: str) -> bool:
if not condition:
self.errors.append(message)
return condition
def _warn(self, condition: bool, message: str) -> bool:
if not condition:
self.warnings.append(message)
return condition
def validate(self, path: str, strictness: str = "speconly") -> bool:
"""
Validate a file for conformance to the Loom specification
Args:
path: Full path to the file to be validated
strictness: "speconly" or "conventions"
Remarks:
In "speconly" mode, conformance is assessed relative to the file format specification
at http://linnarssonlab.org/loompy/format/. In "conventions" mode, conformance is additionally
assessed relative to attribute name and data type conventions given at http://linnarssonlab.org/loompy/conventions/.
"""
valid1 = True
with h5py.File(path, mode="r") as f:
if self.version == None:
self.version = get_loom_spec_version(f)
valid1 = self.validate_spec(f)
if not valid1:
self.errors.append("For help, see http://linnarssonlab.org/loompy/format/")
valid2 = True
if strictness == "conventions":
with loompy.connect(path, mode="r") as ds:
valid2 = self.validate_conventions(ds)
if not valid2:
self.errors.append("For help, see http://linnarssonlab.org/loompy/conventions/")
return valid1 and valid2
def validate_conventions(self, ds: loompy.LoomConnection) -> bool:
"""
Validate the LoomConnection object against the attribute name/dtype conventions.
Args:
ds: LoomConnection object
Returns:
True if the file conforms to the conventions, else False
Remarks:
Upon return, the instance attributes 'self.errors' and 'self.warnings' contain
lists of errors and warnings.
"""
(n_genes, n_cells) = ds.shape
self._warn("Description" in ds.attrs, "Optional global attribute 'Description' is missing")
self._warn("Journal" in ds.attrs, "Optional global attribute 'Journal' is missing")
self._warn("Authors" in ds.attrs, "Optional global attribute 'Authors' is missing")
self._warn("Title" in ds.attrs, "Optional global attribute 'Title' is missing")
self._warn("Year" in ds.attrs, "Optional global attribute 'Year' is missing")
self._warn("CreationDate" in ds.attrs, "Optional global attribute 'CreationDate' is missing")
if self._check("ClusterID" in ds.ca, "Column attribute 'ClusterID' is missing"):
self._check(np.issubdtype(ds.ca.ClusterID.dtype, np.int_), "Column attribute 'ClusterID' must be integer dtype")
self._check(len(np.unique(ds.ca.ClusterID)) == np.max(ds.ca.ClusterID) and np.min(ds.ca.ClusterID) == 0, "Column attribute 'ClusterID' must be integers 0, 1, 2, ... with no missing values")
self._check(ds.ca.ClusterID.shape == (n_cells,), f"Column attribute 'ClusterID' must be 1-dimensional array of {n_cells} elements")
if "ClusterName" in ds.ca:
self._check(ds.ca.ClusterName.dtype == object and np.issubdtype(ds.ca.ClusterName[0].dtype, np.str_), "Column attribute 'ClusterName' must be an array of strings")
self._check(ds.ca.ClusterName.shape == (n_cells,), f"Column attribute 'ClusterName' must be 1-dimensional array of {n_cells} elements")
one_to_one = True
for cid in np.unique(ds.ca.ClusterID):
if len(np.unique(ds.ca.ClusterName[ds.ca.ClusterID == cid])) != 1:
one_to_one = False
break
for cn in np.unique(ds.ca.ClusterName):
if len(np.unique(ds.ca.ClusterID[ds.ca.ClusterName == cn])) != 1:
one_to_one = False
break
if not one_to_one:
self._check(False, "ClusterName must correspond 1:1 with ClusterID")
else:
self.warnings.append("Optional column attribute 'ClusterName' is missing")
if self._check("CellID" in ds.ca, "Column attribute 'CellID' is missing"):
self._check(ds.ca.CellID.dtype == object and np.issubdtype(ds.ca.CellID[0].dtype, np.str_), f"Column attribute 'CellID' must be an array of strings, not '{ds.ca.CellID[0].dtype}'")
self._check(ds.ca.CellID.shape == (n_cells,), f"Column attribute 'CellID' must be 1-dimensional array of {n_cells} elements")
self._check(len(np.unique(ds.ca.CellID)) == n_cells, "Column attribute 'CellID' cannot contain duplicate values")
if "Valid" in ds.ca:
self._check(np.issubdtype(ds.ca.Valid.dtype, np.int_), f"Column attribute 'Valid' must be integer dtype, not '{ds.ca.Valid.dtype}'")
valids = np.unique(ds.ca.Valid)
self._check(np.all(np.isin(ds.ca.Valid, [0, 1])), "Column attribute 'Valid' must be integers 0 or 1 only")
self._check(ds.ca.Valid.shape == (n_cells,), f"Column attribute 'Valid' must be 1-dimensional array of {n_cells} elements")
else:
self.warnings.append("Optional column attribute 'Valid' is missing")
if "Outliers" in ds.ca:
self._check(np.issubdtype(ds.ca.Outliers.dtype, np.int_), f"Column attribute 'Outliers' must be integer dtype, not '{ds.ca.Outliers.dtype}'")
self._check(np.all(np.isin(ds.ca.Outliers, [0, 1])), "Column attribute 'Outliers' must be integers 0 or 1 only")
self._check(ds.ca.Outliers.shape == (n_cells,), f"Column attribute 'Outliers' must be 1-dimensional array of {n_cells} elements")
else:
self.warnings.append("Optional column attribute 'Outliers' is missing")
if self._check("Accession" in ds.ra, "Row attribute 'Accession' is missing"):
self._check(ds.ra.Accession.dtype == object and np.issubdtype(ds.ra.Accession[0].dtype, np.str_), f"Row attribute 'Accession' must be an array of strings, not '{ds.ra.Accession[0].dtype}'")
self._check(ds.ra.Accession.shape == (n_genes,), f"Row attribute 'Accession' must be 1-dimensional array of {n_genes} elements")
self._check(len(np.unique(ds.ra.Accession)) == n_genes, "Row attribute 'Accession' cannot contain duplicate values")
if self._check("Gene" in ds.ra, "Row attribute 'Gene' is missing"):
self._check(ds.ra.Gene.dtype == object and np.issubdtype(ds.ra.Gene[0].dtype, np.str_), f"Row attribute 'Gene' must be an array of strings, not '{ds.ra.Gene[0].dtype}'")
self._check(ds.ra.Gene.shape == (n_genes,), f"Row attribute 'Gene' must be 1-dimensional array of {n_genes} elements")
if "Valid" in ds.ra:
self._check(np.issubdtype(ds.ra.Valid.dtype, np.int_), f"Row attribute 'Valid' must be integer dtype, not '{ds.ra.Valid.dtype}'")
valids = np.unique(ds.ra.Valid)
self._check(np.all(np.isin(ds.ra.Valid, [0, 1])), "Row attribute 'Valid' must be integers 0 or 1 only")
self._check(ds.ra.Valid.shape == (n_cells,), f"Row attribute 'Valid' must be 1-dimensional array of {n_cells} elements")
else:
self.warnings.append("Optional row attribute 'Valid' is missing")
if "Selected" in ds.ra:
self._check(np.issubdtype(ds.ra.Selected.dtype, np.int_), f"Row attribute 'Selected' must be integer dtype, not '{ds.ra.Selected.dtype}'")
valids = np.unique(ds.ra.Selected)
self._check(np.all(np.isin(ds.ra.Selected, [0, 1])), "Row attribute 'Selected' must be integers 0 or 1 only")
self._check(ds.ra.Selected.shape == (n_cells,), f"Row attribute 'Selected' must be 1-dimensional array of {n_cells} elements")
else:
self.warnings.append("Optional row attribute 'Selected' is missing")
return len(self.errors) == 0
def validate_spec(self, file: h5py.File) -> bool:
"""
Validate the LoomConnection object against the format specification.
Args:
file: h5py File object
Returns:
True if the file conforms to the specs, else False
Remarks:
Upon return, the instance attributes 'self.errors' and 'self.warnings' contain
lists of errors and warnings, and the 'self.summary' attribute contains a summary
of the file contents.
"""
matrix_types = ["float16", "float32", "float64", "int8", "int16", "int32", "int64", "uint8", "uint16", "uint32", "uint64"]
vertex_types = ["int8", "int16", "int32", "int64", "uint8", "uint16", "uint32", "uint64"]
weight_types = ["float16", "float32", "float64"]
def delay_print(text: str) -> None:
self.summary.append(text)
def dt(t: str) -> str:
if str(t).startswith("|S"):
return f"string"
return str(t)
width_ra = 0
width_ca = 0
width_globals = 0
if self._check("row_attrs" in file, "'row_attrs' group is missing"):
width_ra = max([len(x) for x in (file["row_attrs"].keys())], default=0)
if self._check("col_attrs" in file, "'col_attrs' group is missing"):
width_ca = max([len(x) for x in (file["col_attrs"].keys())], default=0)
if self.version == "3.0.0":
if self._check("attrs" in file, "Global attributes missing"):
width_globals = max([len(x) for x in (file["attrs"].keys())], default=0)
elif len(file.attrs) > 0:
width_globals = max([len(x) for x in file.attrs.keys()])
width_layers = 0
if "layers" in file and len(file["layers"]) > 0:
width_layers = max([len(x) for x in file["layers"].keys()])
width_layers = max(width_layers, len("Main matrix"))
width = max(width_ca, width_ra, width_globals)
delay_print("Global attributes:")
if self.version == "3.0.0":
self._check("attrs" in file, "Global attributes missing")
for attr in file["attrs"]:
if type(attr) is np.ndarray:
delay_print(f"{attr: >{width}} {attr.dtype} {attr.shape}")
else:
delay_print(f"{attr: >{width}} {type(attr).__name__} (scalar)")
else:
for key, value in file.attrs.items():
if type(value) is str:
self.warnings.append(f"Global attribute '{key}' has dtype string, which will be deprecated in future Loom versions")
delay_print(f"{key: >{width}} string")
elif type(value) is bytes:
self.warnings.append(f"Global attribute '{key}' has dtype bytes, which will be deprecated in future Loom versions")
delay_print(f"{key: >{width}} bytes")
else:
delay_print(f"{key: >{width}} {dt(file.attrs[key].dtype)}")
if self._check("matrix" in file, "Main matrix missing"):
self._check(file["matrix"].dtype in matrix_types, f"Main matrix dtype={file['matrix'].dtype} is not allowed")
shape = file["matrix"].shape
delay_print(f"Layers shape={shape}:")
delay_print(f"{'Main matrix': >{width}} {file['matrix'].dtype}")
if "layers" in file:
for layer in file["layers"]:
self._check(file["layers"][layer].shape == shape, f"Layer '{layer}' shape {file['layers'][layer].shape} does not match main matrix shape {shape}")
self._check(file["layers"][layer].dtype in matrix_types, f"Layer '{layer}' dtype={file['layers'][layer].dtype} is not allowed")
delay_print(f"{layer: >{width}} {file['layers'][layer].dtype}")
if self.version == "3.0.0":
expected_dtype = np.object_
else:
expected_dtype = np.string_
delay_print("Row attributes:")
if self._check("row_attrs" in file, "'row_attrs' group is missing"):
for ra in file["row_attrs"]:
self._check(file["row_attrs"][ra].shape[0] == shape[0], f"Row attribute '{ra}' shape {file['row_attrs'][ra].shape[0]} first dimension does not match row dimension {shape}")
self._check(file["row_attrs"][ra].dtype in matrix_types or np.issubdtype(file['row_attrs'][ra].dtype, expected_dtype), f"Row attribute '{ra}' dtype {file['row_attrs'][ra].dtype} is not allowed")
ra_shape = file['row_attrs'][ra].shape
delay_print(f"{ra: >{width}} {dt(file['row_attrs'][ra].dtype)} {ra_shape if len(ra_shape) > 1 else ''}")
if len(file["row_attrs"]) == 0:
delay_print(" (none)")
delay_print("Column attributes:")
if self._check("col_attrs" in file, "'col_attrs' group is missing"):
for ca in file["col_attrs"]:
self._check(file["col_attrs"][ca].shape[0] == shape[1], f"Column attribute '{ca}' shape {file['col_attrs'][ca].shape[0]} first dimension does not match column dimension {shape}")
self._check(file["col_attrs"][ca].dtype in matrix_types or np.issubdtype(file["col_attrs"][ca].dtype, expected_dtype), f"Column attribute '{ca}' dtype {file['col_attrs'][ca].dtype} is not allowed")
ca_shape = file['col_attrs'][ca].shape
delay_print(f"{ca: >{width}} {dt(file['col_attrs'][ca].dtype)} {ca_shape if len(ca_shape) > 1 else ''}")
if len(file["col_attrs"]) == 0:
delay_print(" (none)")
delay_print("Row graphs:")
if "row_graphs" in file:
if self.version == "2.0.1" or self.version == "3.0.0":
self._check("row_graphs" in file, "'row_graphs' group is missing (try spec_version='old')")
for g in file["row_graphs"]:
self._check("a" in file["row_graphs"][g], f"Row graph '{g}' is missing vector 'a', denoting start vertices")
self._check(file["row_graphs"][g]['a'].dtype in vertex_types, f"/row_graphs/{g}/a.dtype {file['row_graphs'][g]['a'].dtype} must be integer")
self._check("b" in file["row_graphs"][g], f"Row graph '{g}' is missing vector 'b', denoting end vertices")
self._check(file["row_graphs"][g]['b'].dtype in vertex_types, f"/row_graphs/{g}/b.dtype {file['row_graphs'][g]['b'].dtype} must be integer")
self._check("w" in file["row_graphs"][g], f"Row graph '{g}' is missing vector 'w', denoting vertex weights")
self._check(file["row_graphs"][g]['w'].dtype in weight_types, f"/row_graphs/{g}/w.dtype {file['row_graphs'][g]['w'].dtype} must be float")
self._check(file['row_graphs'][g]['a'].shape[0] == file['row_graphs'][g]['b'].shape[0] and file['row_graphs'][g]['a'].shape[0] == file['row_graphs'][g]['w'].shape[0], f"Row graph '{g}' sparse vectors a, b and w must have equal length")
delay_print(f" '{g}' with {file['row_graphs'][g]['a'].shape[0]} edges")
if len(file["row_graphs"]) == 0:
delay_print(" (none)")
delay_print("Column graphs:")
if "col_graphs" in file:
if self.version == "2.0.1" or self.version == "3.0.0":
self._check("col_graphs" in file, "'col_graphs' group is missing (try spec_version='old')")
for g in file["col_graphs"]:
self._check("a" in file["col_graphs"][g], f"Column graph '{g}' is missing vector 'a', denoting start vertices")
self._check(file["col_graphs"][g]['a'].dtype in vertex_types, f"/col_graphs/{g}/a.dtype {file['col_graphs'][g]['a'].dtype} must be integer")
self._check("b" in file["col_graphs"][g], f"Column graph '{g}' is missing vector 'b', denoting end vertices")
self._check(file["col_graphs"][g]['b'].dtype in vertex_types, f"/col_graphs/{g}/b.dtype {file['col_graphs'][g]['b'].dtype} must be integer")
self._check("w" in file["col_graphs"][g], f"Column graph '{g}' is missing vector 'w', denoting vertex weights")
self._check(file["col_graphs"][g]['w'].dtype in weight_types, f"/col_graphs/{g}/w.dtype {file['col_graphs'][g]['w'].dtype} must be float")
self._check(file['col_graphs'][g]['a'].shape[0] == file['col_graphs'][g]['b'].shape[0] and file['col_graphs'][g]['a'].shape[0] == file['col_graphs'][g]['w'].shape[0], f"Column graph '{g}' sparse vectors a, b and w must have equal length")
delay_print(f" '{g}' with {file['col_graphs'][g]['a'].shape[0]} edges")
if len(file["col_graphs"]) == 0:
delay_print(" (none)")
return len(self.errors) == 0
|