File: loom_validator.py

package info (click to toggle)
python-loompy 3.0.7%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 3,272 kB
  • sloc: python: 3,152; sh: 63; makefile: 16
file content (288 lines) | stat: -rw-r--r-- 15,571 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
import h5py
from typing import *
import logging
import numpy as np
import loompy

from .utils import get_loom_spec_version


class LoomValidator:
	def __init__(self, version: str = None) -> None:
		"""
		Args:
			version: 		The Loom file format version to validate against ("3.0.0", "2.0.1", "old"), or None to infer from file
		
		Remarks:
			"old" version will accept files that lack the "row_graphs" and "col_graphs" groups
		"""
		self.version = version  #: Version of the spec to validate against
		self.errors: List[str] = []  #: Errors found during validation
		self.warnings: List[str] = []  #: Warnings triggered during validation
		self.summary: List[str] = []  #: Summary of the file structure

	def _check(self, condition: bool, message: str) -> bool:
		if not condition:
			self.errors.append(message)
		return condition
	
	def _warn(self, condition: bool, message: str) -> bool:
		if not condition:
			self.warnings.append(message)
		return condition

	def validate(self, path: str, strictness: str = "speconly") -> bool:
		"""
		Validate a file for conformance to the Loom specification

		Args:
			path: 			Full path to the file to be validated
			strictness:		"speconly" or "conventions"

		Remarks:
			In "speconly" mode, conformance is assessed relative to the file format specification
			at http://linnarssonlab.org/loompy/format/. In "conventions" mode, conformance is additionally
			assessed relative to attribute name and data type conventions given at http://linnarssonlab.org/loompy/conventions/.
		"""
		valid1 = True
		with h5py.File(path, mode="r") as f:
			if self.version == None:
				self.version = get_loom_spec_version(f)
			valid1 = self.validate_spec(f)
			if not valid1:
				self.errors.append("For help, see http://linnarssonlab.org/loompy/format/")

		valid2 = True
		if strictness == "conventions":
			with loompy.connect(path, mode="r") as ds:
				valid2 = self.validate_conventions(ds)
				if not valid2:
					self.errors.append("For help, see http://linnarssonlab.org/loompy/conventions/")

		return valid1 and valid2

	def validate_conventions(self, ds: loompy.LoomConnection) -> bool:
		"""
		Validate the LoomConnection object against the attribute name/dtype conventions.

		Args:
			ds:			LoomConnection object
		
		Returns:
			True if the file conforms to the conventions, else False
		
		Remarks:
			Upon return, the instance attributes 'self.errors' and 'self.warnings' contain
			lists of errors and warnings.
		"""
		(n_genes, n_cells) = ds.shape

		self._warn("Description" in ds.attrs, "Optional global attribute 'Description' is missing")
		self._warn("Journal" in ds.attrs, "Optional global attribute 'Journal' is missing")
		self._warn("Authors" in ds.attrs, "Optional global attribute 'Authors' is missing")
		self._warn("Title" in ds.attrs, "Optional global attribute 'Title' is missing")
		self._warn("Year" in ds.attrs, "Optional global attribute 'Year' is missing")
		self._warn("CreationDate" in ds.attrs, "Optional global attribute 'CreationDate' is missing")

		if self._check("ClusterID" in ds.ca, "Column attribute 'ClusterID' is missing"):
			self._check(np.issubdtype(ds.ca.ClusterID.dtype, np.int_), "Column attribute 'ClusterID' must be integer dtype")
			self._check(len(np.unique(ds.ca.ClusterID)) == np.max(ds.ca.ClusterID) and np.min(ds.ca.ClusterID) == 0, "Column attribute 'ClusterID' must be integers 0, 1, 2, ... with no missing values")
			self._check(ds.ca.ClusterID.shape == (n_cells,), f"Column attribute 'ClusterID' must be 1-dimensional array of {n_cells} elements")

		if "ClusterName" in ds.ca:
			self._check(ds.ca.ClusterName.dtype == object and np.issubdtype(ds.ca.ClusterName[0].dtype, np.str_), "Column attribute 'ClusterName' must be an array of strings")
			self._check(ds.ca.ClusterName.shape == (n_cells,), f"Column attribute 'ClusterName' must be 1-dimensional array of {n_cells} elements")
			one_to_one = True
			for cid in np.unique(ds.ca.ClusterID):
				if len(np.unique(ds.ca.ClusterName[ds.ca.ClusterID == cid])) != 1:
					one_to_one = False
					break
			for cn in np.unique(ds.ca.ClusterName):
				if len(np.unique(ds.ca.ClusterID[ds.ca.ClusterName == cn])) != 1:
					one_to_one = False
					break
			if not one_to_one:
				self._check(False, "ClusterName must correspond 1:1 with ClusterID")
		else:
			self.warnings.append("Optional column attribute 'ClusterName' is missing")

		if self._check("CellID" in ds.ca, "Column attribute 'CellID' is missing"):
			self._check(ds.ca.CellID.dtype == object and np.issubdtype(ds.ca.CellID[0].dtype, np.str_), f"Column attribute 'CellID' must be an array of strings, not '{ds.ca.CellID[0].dtype}'")
			self._check(ds.ca.CellID.shape == (n_cells,), f"Column attribute 'CellID' must be 1-dimensional array of {n_cells} elements")
			self._check(len(np.unique(ds.ca.CellID)) == n_cells, "Column attribute 'CellID' cannot contain duplicate values")

		if "Valid" in ds.ca:
			self._check(np.issubdtype(ds.ca.Valid.dtype, np.int_), f"Column attribute 'Valid' must be integer dtype, not '{ds.ca.Valid.dtype}'")
			valids = np.unique(ds.ca.Valid)
			self._check(np.all(np.isin(ds.ca.Valid, [0, 1])), "Column attribute 'Valid' must be integers 0 or 1 only")
			self._check(ds.ca.Valid.shape == (n_cells,), f"Column attribute 'Valid' must be 1-dimensional array of {n_cells} elements")
		else:
			self.warnings.append("Optional column attribute 'Valid' is missing")

		if "Outliers" in ds.ca:
			self._check(np.issubdtype(ds.ca.Outliers.dtype, np.int_), f"Column attribute 'Outliers' must be integer dtype, not '{ds.ca.Outliers.dtype}'")
			self._check(np.all(np.isin(ds.ca.Outliers, [0, 1])), "Column attribute 'Outliers' must be integers 0 or 1 only")
			self._check(ds.ca.Outliers.shape == (n_cells,), f"Column attribute 'Outliers' must be 1-dimensional array of {n_cells} elements")
		else:
			self.warnings.append("Optional column attribute 'Outliers' is missing")

		if self._check("Accession" in ds.ra, "Row attribute 'Accession' is missing"):
			self._check(ds.ra.Accession.dtype == object and np.issubdtype(ds.ra.Accession[0].dtype, np.str_), f"Row attribute 'Accession' must be an array of strings, not '{ds.ra.Accession[0].dtype}'")
			self._check(ds.ra.Accession.shape == (n_genes,), f"Row attribute 'Accession' must be 1-dimensional array of {n_genes} elements")
			self._check(len(np.unique(ds.ra.Accession)) == n_genes, "Row attribute 'Accession' cannot contain duplicate values")

		if self._check("Gene" in ds.ra, "Row attribute 'Gene' is missing"):
			self._check(ds.ra.Gene.dtype == object and np.issubdtype(ds.ra.Gene[0].dtype, np.str_), f"Row attribute 'Gene' must be an array of strings, not '{ds.ra.Gene[0].dtype}'")
			self._check(ds.ra.Gene.shape == (n_genes,), f"Row attribute 'Gene' must be 1-dimensional array of {n_genes} elements")

		if "Valid" in ds.ra:
			self._check(np.issubdtype(ds.ra.Valid.dtype, np.int_), f"Row attribute 'Valid' must be integer dtype, not '{ds.ra.Valid.dtype}'")
			valids = np.unique(ds.ra.Valid)
			self._check(np.all(np.isin(ds.ra.Valid, [0, 1])), "Row attribute 'Valid' must be integers 0 or 1 only")
			self._check(ds.ra.Valid.shape == (n_cells,), f"Row attribute 'Valid' must be 1-dimensional array of {n_cells} elements")
		else:
			self.warnings.append("Optional row attribute 'Valid' is missing")

		if "Selected" in ds.ra:
			self._check(np.issubdtype(ds.ra.Selected.dtype, np.int_), f"Row attribute 'Selected' must be integer dtype, not '{ds.ra.Selected.dtype}'")
			valids = np.unique(ds.ra.Selected)
			self._check(np.all(np.isin(ds.ra.Selected, [0, 1])), "Row attribute 'Selected' must be integers 0 or 1 only")
			self._check(ds.ra.Selected.shape == (n_cells,), f"Row attribute 'Selected' must be 1-dimensional array of {n_cells} elements")
		else:
			self.warnings.append("Optional row attribute 'Selected' is missing")

		return len(self.errors) == 0
		
	def validate_spec(self, file: h5py.File) -> bool:
		"""
		Validate the LoomConnection object against the format specification.

		Args:
			file:			h5py File object
		
		Returns:
			True if the file conforms to the specs, else False
		
		Remarks:
			Upon return, the instance attributes 'self.errors' and 'self.warnings' contain
			lists of errors and warnings, and the 'self.summary' attribute contains a summary
			of the file contents.
		"""
		matrix_types = ["float16", "float32", "float64", "int8", "int16", "int32", "int64", "uint8", "uint16", "uint32", "uint64"]
		vertex_types = ["int8", "int16", "int32", "int64", "uint8", "uint16", "uint32", "uint64"]
		weight_types = ["float16", "float32", "float64"]

		def delay_print(text: str) -> None:
			self.summary.append(text)

		def dt(t: str) -> str:
			if str(t).startswith("|S"):
				return f"string"
			return str(t)

		width_ra = 0
		width_ca = 0
		width_globals = 0
		if self._check("row_attrs" in file, "'row_attrs' group is missing"):
			width_ra = max([len(x) for x in (file["row_attrs"].keys())], default=0)
		if self._check("col_attrs" in file, "'col_attrs' group is missing"):
			width_ca = max([len(x) for x in (file["col_attrs"].keys())], default=0)
		if self.version == "3.0.0":
			if self._check("attrs" in file, "Global attributes missing"):
				width_globals = max([len(x) for x in (file["attrs"].keys())], default=0)
		elif len(file.attrs) > 0:
			width_globals = max([len(x) for x in file.attrs.keys()])
		width_layers = 0
		if "layers" in file and len(file["layers"]) > 0:
			width_layers = max([len(x) for x in file["layers"].keys()])
		width_layers = max(width_layers, len("Main matrix"))
		width = max(width_ca, width_ra, width_globals)

		delay_print("Global attributes:")
		if self.version == "3.0.0":
			self._check("attrs" in file, "Global attributes missing")
			for attr in file["attrs"]:
				if type(attr) is np.ndarray:
					delay_print(f"{attr: >{width}} {attr.dtype} {attr.shape}")
				else:
					delay_print(f"{attr: >{width}} {type(attr).__name__} (scalar)")
		else:
			for key, value in file.attrs.items():
				if type(value) is str:
					self.warnings.append(f"Global attribute '{key}' has dtype string, which will be deprecated in future Loom versions")
					delay_print(f"{key: >{width}} string")
				elif type(value) is bytes:
					self.warnings.append(f"Global attribute '{key}' has dtype bytes, which will be deprecated in future Loom versions")
					delay_print(f"{key: >{width}} bytes")
				else:
					delay_print(f"{key: >{width}} {dt(file.attrs[key].dtype)}")
				
		if self._check("matrix" in file, "Main matrix missing"):
			self._check(file["matrix"].dtype in matrix_types, f"Main matrix dtype={file['matrix'].dtype} is not allowed")
			shape = file["matrix"].shape
			delay_print(f"Layers shape={shape}:")
			delay_print(f"{'Main matrix': >{width}} {file['matrix'].dtype}")

		if "layers" in file:
			for layer in file["layers"]:
				self._check(file["layers"][layer].shape == shape, f"Layer '{layer}' shape {file['layers'][layer].shape} does not match main matrix shape {shape}")
				self._check(file["layers"][layer].dtype in matrix_types, f"Layer '{layer}' dtype={file['layers'][layer].dtype} is not allowed")
				delay_print(f"{layer: >{width}} {file['layers'][layer].dtype}")

		if self.version == "3.0.0":
			expected_dtype = np.object_
		else:
			expected_dtype = np.string_
		delay_print("Row attributes:")
		if self._check("row_attrs" in file, "'row_attrs' group is missing"):
			for ra in file["row_attrs"]:
				self._check(file["row_attrs"][ra].shape[0] == shape[0], f"Row attribute '{ra}' shape {file['row_attrs'][ra].shape[0]} first dimension does not match row dimension {shape}")
				self._check(file["row_attrs"][ra].dtype in matrix_types or np.issubdtype(file['row_attrs'][ra].dtype, expected_dtype), f"Row attribute '{ra}' dtype {file['row_attrs'][ra].dtype} is not allowed")
				ra_shape = file['row_attrs'][ra].shape
				delay_print(f"{ra: >{width}} {dt(file['row_attrs'][ra].dtype)} {ra_shape if len(ra_shape) > 1 else ''}")
			if len(file["row_attrs"]) == 0:
				delay_print("    (none)")

		delay_print("Column attributes:")
		if self._check("col_attrs" in file, "'col_attrs' group is missing"):
			for ca in file["col_attrs"]:
				self._check(file["col_attrs"][ca].shape[0] == shape[1], f"Column attribute '{ca}' shape {file['col_attrs'][ca].shape[0]} first dimension does not match column dimension {shape}")
				self._check(file["col_attrs"][ca].dtype in matrix_types or np.issubdtype(file["col_attrs"][ca].dtype, expected_dtype), f"Column attribute '{ca}' dtype {file['col_attrs'][ca].dtype} is not allowed")
				ca_shape = file['col_attrs'][ca].shape
				delay_print(f"{ca: >{width}} {dt(file['col_attrs'][ca].dtype)} {ca_shape if len(ca_shape) > 1 else ''}")
			if len(file["col_attrs"]) == 0:
				delay_print("    (none)")

		delay_print("Row graphs:")
		if "row_graphs" in file:
			if self.version == "2.0.1" or self.version == "3.0.0":
				self._check("row_graphs" in file, "'row_graphs' group is missing (try spec_version='old')")
			for g in file["row_graphs"]:
				self._check("a" in file["row_graphs"][g], f"Row graph '{g}' is missing vector 'a', denoting start vertices")
				self._check(file["row_graphs"][g]['a'].dtype in vertex_types, f"/row_graphs/{g}/a.dtype {file['row_graphs'][g]['a'].dtype} must be integer")
				self._check("b" in file["row_graphs"][g], f"Row graph '{g}' is missing vector 'b', denoting end vertices")
				self._check(file["row_graphs"][g]['b'].dtype in vertex_types, f"/row_graphs/{g}/b.dtype {file['row_graphs'][g]['b'].dtype} must be integer")
				self._check("w" in file["row_graphs"][g], f"Row graph '{g}' is missing vector 'w', denoting vertex weights")
				self._check(file["row_graphs"][g]['w'].dtype in weight_types, f"/row_graphs/{g}/w.dtype {file['row_graphs'][g]['w'].dtype} must be float")
				self._check(file['row_graphs'][g]['a'].shape[0] == file['row_graphs'][g]['b'].shape[0] and file['row_graphs'][g]['a'].shape[0] == file['row_graphs'][g]['w'].shape[0], f"Row graph '{g}' sparse vectors a, b and w must have equal length")
				delay_print(f"    '{g}' with {file['row_graphs'][g]['a'].shape[0]} edges")
			if len(file["row_graphs"]) == 0:
				delay_print("    (none)")

		delay_print("Column graphs:")
		if "col_graphs" in file:
			if self.version == "2.0.1" or self.version == "3.0.0":
				self._check("col_graphs" in file, "'col_graphs' group is missing (try spec_version='old')")
			for g in file["col_graphs"]:
				self._check("a" in file["col_graphs"][g], f"Column graph '{g}' is missing vector 'a', denoting start vertices")
				self._check(file["col_graphs"][g]['a'].dtype in vertex_types, f"/col_graphs/{g}/a.dtype {file['col_graphs'][g]['a'].dtype} must be integer")
				self._check("b" in file["col_graphs"][g], f"Column graph '{g}' is missing vector 'b', denoting end vertices")
				self._check(file["col_graphs"][g]['b'].dtype in vertex_types, f"/col_graphs/{g}/b.dtype {file['col_graphs'][g]['b'].dtype} must be integer")
				self._check("w" in file["col_graphs"][g], f"Column graph '{g}' is missing vector 'w', denoting vertex weights")
				self._check(file["col_graphs"][g]['w'].dtype in weight_types, f"/col_graphs/{g}/w.dtype {file['col_graphs'][g]['w'].dtype} must be float")
				self._check(file['col_graphs'][g]['a'].shape[0] == file['col_graphs'][g]['b'].shape[0] and file['col_graphs'][g]['a'].shape[0] == file['col_graphs'][g]['w'].shape[0], f"Column graph '{g}' sparse vectors a, b and w must have equal length")
				delay_print(f"    '{g}' with {file['col_graphs'][g]['a'].shape[0]} edges")
			if len(file["col_graphs"]) == 0:
				delay_print("    (none)")

		return len(self.errors) == 0