1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
|
import csv
import logging
import os
import numpy as np
from rsciio._docstrings import FILENAME_DOC, LAZY_UNSUPPORTED_DOC, RETURNS_DOC
_logger = logging.getLogger(__name__)
# At some point, if there is another readerw, whith also use csv file, it will
# be necessary to mention the other reader in this message (and to add an
# argument in the load function to specify the correct reader)
invalid_file_error = (
"The csv reader can't import the file, please"
" make sure, that this is a valid Impulse log file."
)
invalid_filenaming_error = {
"The filename does not match Impulse naming, please"
" make sure that the filenames for the logfile and metadata file are unchanged."
}
def file_reader(filename, lazy=False):
"""
Read a DENSsolutions Impulse logfile.
Parameters
----------
%s
%s
%s
"""
if lazy is not False:
raise NotImplementedError("Lazy loading is not supported.")
csv_file = ImpulseCSV(filename)
return _impulseCSV_log_reader(csv_file)
file_reader.__doc__ %= (FILENAME_DOC, LAZY_UNSUPPORTED_DOC, RETURNS_DOC)
def _impulseCSV_log_reader(csv_file):
csvs = []
for key in csv_file.logged_quantity_name_list:
csvs.append(csv_file.get_dictionary(key))
return csvs
class ImpulseCSV:
def __init__(self, filename):
self.filename = filename
self._parse_header()
self._read_data()
def _parse_header(self):
with open(self.filename, "r") as f:
s = f.readline()
self.column_names = s.strip().split(",")
if not self._is_impulse_csv_file():
raise IOError(invalid_file_error)
self._read_metadatafile()
self.logged_quantity_name_list = self.column_names[2:]
def _is_impulse_csv_file(self):
return "TimeStamp" in self.column_names and len(self.column_names) >= 3
def get_dictionary(self, quantity):
return {
"data": self._data_dictionary[quantity],
"axes": self._get_axes(),
"metadata": self._get_metadata(quantity),
"original_metadata": {"Impulse_header": self.original_metadata},
}
def _get_metadata(self, quantity):
return {
"General": {
"original_filename": os.path.split(self.filename)[1],
"title": "%s" % quantity,
"date": self.original_metadata["Experiment_date"],
"time": self.original_metadata["Experiment_time"],
},
"Signal": {
"quantity": self._parse_quantity_units(quantity),
},
}
def _parse_quantity_units(self, quantity):
quantity_split = quantity.strip().split(" ")
if (
len(quantity_split) > 1
and quantity_split[-1][0] == "("
and quantity_split[-1][-1] == ")"
):
return quantity_split[-1].replace("(", "").replace(")", "")
else:
return ""
def _read_data(self):
names = [
name.replace(" ", "_")
.replace("°C", "C")
.replace("#", "No")
.replace("(", "")
.replace(")", "")
.replace("/", "_")
.replace("%", "Perc")
for name in self.column_names
]
data = np.genfromtxt(
self.filename,
delimiter=",",
dtype=None,
names=names,
skip_header=1,
encoding="latin1",
)
self._data_dictionary = dict()
for i, (name, name_dtype) in enumerate(zip(self.column_names, names)):
if name == "Experiment time":
self.time_axis = data[name_dtype]
elif name == "MixValve":
mixvalvedatachanged = data[name_dtype]
for index, item in enumerate(data[name_dtype]):
mixvalvedatachanged[index] = (
int(int(item.split(";")[0]) + 2) * 100
+ (int(item.split(";")[1]) + 2) * 10
+ (int(item.split(";")[2]) + 2)
)
mixvalvedatachangedint = np.array(mixvalvedatachanged, dtype=np.int32)
self._data_dictionary[name] = mixvalvedatachangedint
else:
self._data_dictionary[name] = data[name_dtype]
def _read_metadatafile(self):
# Locate the experiment metadata file
self.original_metadata = {}
notes = []
notes_section = False
if "_Synchronized data" in str(self.filename) or "raw" in str(
self.filename
): # Check if Impulse filename formatting is intact
metadata_file = (
"_".join(str(self.filename).split("_")[:-1]) + "_Metadata.log"
).replace("\\", "/")
if os.path.isfile(metadata_file):
with open(metadata_file, newline="") as csvfile:
metadata_file_reader = csv.reader(csvfile, delimiter=",")
for row in metadata_file_reader:
if notes_section:
notes.append(row[0])
elif row[0] == "Live notes":
notes_section = True
notes = [row[1].strip()]
else:
self.original_metadata[row[0].replace(" ", "_")] = row[
1
].strip()
self.original_metadata["Notes"] = notes
else:
_logger.warning("No metadata file found in folder")
else:
raise IOError(invalid_filenaming_error)
def _get_axes(self):
return [
{
"size": self.time_axis.shape[0],
"index_in_array": 0,
"name": "Time",
"scale": np.diff(self.time_axis[1:-1]).mean(),
"offset": 0,
"units": "Seconds",
"navigate": False,
}
]
|