1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156
|
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import re
import subunit
import testtools
DAY_SECONDS = 60 * 60 * 24
def get_duration(start, end):
if not start or not end:
duration = None
else:
delta = end - start
duration = '%d.%06d' % (
delta.days * DAY_SECONDS + delta.seconds, delta.microseconds)
return float(duration)
class ReadSubunit(object):
def __init__(self, stream_file, attachments=False, attr_regex=None,
targets=None, use_wall_time=False, non_subunit_name=None):
if targets is None:
targets = []
else:
targets = targets[:]
self.use_wall_time = use_wall_time
self.stream_file = stream_file
self.stream = subunit.ByteStreamToStreamResult(
self.stream_file,
non_subunit_name=non_subunit_name
)
starts = testtools.StreamResult()
summary = testtools.StreamSummary()
outcomes = testtools.StreamToDict(functools.partial(
self.parse_outcome))
targets.extend([starts, outcomes, summary])
self.result = testtools.CopyStreamResult(targets)
self.results = {}
self.attachments = attachments
if attr_regex:
self.attr_regex = re.compile(attr_regex)
# NOTE(mtreinish): Default to the previous implicit regex if None is
# specified for backwards compat
else:
self.attr_regex = re.compile('\[(.*)\]')
def get_results(self):
self.result.startTestRun()
try:
self.stream.run(self.result)
finally:
self.result.stopTestRun()
self.results['run_time'] = self.run_time()
return self.results
def get_attachments(self, test):
attach_dict = {}
for name, detail in test['details'].items():
name = name.split(':')[0]
attach_dict[name] = detail
return attach_dict
def parse_outcome(self, test):
metadata = {}
status = test['status']
if status == 'exists':
return
name = self.cleanup_test_name(test['id'])
attrs = self.get_attrs(test['id'])
if attrs:
metadata['attrs'] = attrs
if test['tags']:
metadata['tags'] = ",".join(test['tags'])
# Return code is a fail don't process it
if name == 'process-returncode':
return
timestamps = test['timestamps']
attachment_dict = {}
if self.attachments:
attachment_dict = self.get_attachments(test)
self.results[name] = {
'status': status,
'start_time': timestamps[0],
'end_time': timestamps[1],
'metadata': metadata,
'attachments': attachment_dict
}
def get_attrs(self, name):
matches = self.attr_regex.search(name)
if matches:
attrs = matches.group(1)
else:
attrs = None
return attrs
def cleanup_test_name(self, name, strip_tags=True, strip_scenarios=False):
"""Clean up the test name for display.
By default we strip out the tags in the test because they don't help us
in identifying the test that is run to it's result.
Make it possible to strip out the testscenarios information (not to
be confused with tempest scenarios) however that's often needed to
identify generated negative tests.
"""
if strip_tags:
matches = self.attr_regex.search(name)
if matches:
tags_start = matches.start(0)
tags_end = matches.end(0)
if tags_start > 0 and tags_end > tags_start:
newname = name[:tags_start]
newname += name[tags_end:]
name = newname
if strip_scenarios:
tags_start = name.find('(')
tags_end = name.find(')')
if tags_start > 0 and tags_end > tags_start:
newname = name[:tags_start]
newname += name[tags_end + 1:]
name = newname
return name
def run_time(self):
runtime = 0.0
if self.use_wall_time:
start_time = None
stop_time = None
for name, data in self.results.items():
if not start_time or data['start_time'] < start_time:
start_time = data['start_time']
if not stop_time or data['end_time'] > stop_time:
stop_time = data['end_time']
runtime = get_duration(start_time, stop_time)
else:
for name, data in self.results.items():
runtime += get_duration(data['start_time'], data['end_time'])
return runtime
|