1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
|
# -*- coding: utf-8 -*-
#
# License: BSD (3-clause)
from datetime import datetime
from glob import glob
from os.path import basename, join, splitext
from xml.etree.ElementTree import parse
import numpy as np
from ...utils import logger
def _read_events(input_fname, info):
"""Read events for the record.
Parameters
----------
input_fname : str
The file path.
info : dict
Header info array.
"""
mff_events, event_codes = _read_mff_events(input_fname, info['sfreq'],
info['n_samples'])
info['n_events'] = len(event_codes)
info['event_codes'] = np.asarray(event_codes).astype('<U4')
events = np.zeros([info['n_events'],
info['n_segments'] * info['n_samples']])
for n, event in enumerate(event_codes):
for i in mff_events[event]:
events[n][i] = n + 1
return events, info
def _read_mff_events(filename, sfreq, nsamples):
"""Extract the events.
Parameters
----------
filename : str
File path.
sfreq : float
The sampling frequency
"""
orig = {}
for xml_file in glob(join(filename, '*.xml')):
xml_type = splitext(basename(xml_file))[0]
orig[xml_type] = _parse_xml(xml_file)
xml_files = orig.keys()
xml_events = [x for x in xml_files if x[:7] == 'Events_']
for item in orig['info']:
if 'recordTime' in item:
start_time = _ns2py_time(item['recordTime'])
break
markers = []
code = []
for xml in xml_events:
for event in orig[xml][2:]:
event_start = _ns2py_time(event['beginTime'])
start = (event_start - start_time).total_seconds()
if event['code'] not in code:
code.append(event['code'])
marker = {'name': event['code'],
'start': start,
'start_sample': int(np.fix(start * sfreq)),
'end': start + float(event['duration']) / 1e9,
'chan': None,
}
markers.append(marker)
events_tims = dict()
for ev in code:
trig_samp = list(c['start_sample'] for n,
c in enumerate(markers) if (c['name'] == ev and
c['start_sample'] < nsamples))
events_tims.update({ev: trig_samp})
return events_tims, code
def _parse_xml(xml_file):
"""Parse XML file."""
xml = parse(xml_file)
root = xml.getroot()
return _xml2list(root)
def _xml2list(root):
"""Parse XML item."""
output = []
for element in root:
if len(element) > 0:
if element[0].tag != element[-1].tag:
output.append(_xml2dict(element))
else:
output.append(_xml2list(element))
elif element.text:
text = element.text.strip()
if text:
tag = _ns(element.tag)
output.append({tag: text})
return output
def _ns(s):
"""Remove namespace, but only if there is a namespace to begin with."""
if '}' in s:
return '}'.join(s.split('}')[1:])
else:
return s
def _xml2dict(root):
"""Use functions instead of Class.
remove namespace based on
http://stackoverflow.com/questions/2148119
"""
output = {}
if root.items():
output.update(dict(root.items()))
for element in root:
if len(element) > 0:
if len(element) == 1 or element[0].tag != element[1].tag:
one_dict = _xml2dict(element)
else:
one_dict = {_ns(element[0].tag): _xml2list(element)}
if element.items():
one_dict.update(dict(element.items()))
output.update({_ns(element.tag): one_dict})
elif element.items():
output.update({_ns(element.tag): dict(element.items())})
else:
output.update({_ns(element.tag): element.text})
return output
def _ns2py_time(nstime):
"""Parse times."""
nsdate = nstime[0:10]
nstime0 = nstime[11:26]
nstime00 = nsdate + " " + nstime0
pytime = datetime.strptime(nstime00, '%Y-%m-%d %H:%M:%S.%f')
return pytime
def _combine_triggers(data, remapping=None):
"""Combine binary triggers."""
new_trigger = np.zeros(data.shape[1])
if data.astype(bool).sum(axis=0).max() > 1: # ensure no overlaps
logger.info(' Found multiple events at the same time '
'sample. Cannot create trigger channel.')
return
if remapping is None:
remapping = np.arange(data) + 1
for d, event_id in zip(data, remapping):
idx = d.nonzero()
if np.any(idx):
new_trigger[idx] += event_id
return new_trigger
|