File: read_subunit.py

package info (click to toggle)
python-subunit2sql 1.10.0-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 2,788 kB
  • sloc: python: 5,799; sh: 75; makefile: 28; sql: 12
file content (156 lines) | stat: -rw-r--r-- 5,409 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import functools
import re

import subunit
import testtools

DAY_SECONDS = 60 * 60 * 24


def get_duration(start, end):
    if not start or not end:
        duration = None
    else:
        delta = end - start
        duration = '%d.%06d' % (
            delta.days * DAY_SECONDS + delta.seconds, delta.microseconds)
        return float(duration)


class ReadSubunit(object):

    def __init__(self, stream_file, attachments=False, attr_regex=None,
                 targets=None, use_wall_time=False, non_subunit_name=None):
        if targets is None:
            targets = []
        else:
            targets = targets[:]
        self.use_wall_time = use_wall_time
        self.stream_file = stream_file
        self.stream = subunit.ByteStreamToStreamResult(
            self.stream_file,
            non_subunit_name=non_subunit_name
        )
        starts = testtools.StreamResult()
        summary = testtools.StreamSummary()
        outcomes = testtools.StreamToDict(functools.partial(
            self.parse_outcome))
        targets.extend([starts, outcomes, summary])
        self.result = testtools.CopyStreamResult(targets)
        self.results = {}
        self.attachments = attachments
        if attr_regex:
            self.attr_regex = re.compile(attr_regex)
        # NOTE(mtreinish): Default to the previous implicit regex if None is
        # specified for backwards compat
        else:
            self.attr_regex = re.compile('\[(.*)\]')

    def get_results(self):
        self.result.startTestRun()
        try:
            self.stream.run(self.result)
        finally:
            self.result.stopTestRun()
        self.results['run_time'] = self.run_time()
        return self.results

    def get_attachments(self, test):
        attach_dict = {}
        for name, detail in test['details'].items():
            name = name.split(':')[0]
            attach_dict[name] = detail
        return attach_dict

    def parse_outcome(self, test):
        metadata = {}
        status = test['status']
        if status == 'exists':
            return
        name = self.cleanup_test_name(test['id'])
        attrs = self.get_attrs(test['id'])
        if attrs:
            metadata['attrs'] = attrs
        if test['tags']:
            metadata['tags'] = ",".join(test['tags'])
        # Return code is a fail don't process it
        if name == 'process-returncode':
            return
        timestamps = test['timestamps']
        attachment_dict = {}
        if self.attachments:
            attachment_dict = self.get_attachments(test)
        self.results[name] = {
            'status': status,
            'start_time': timestamps[0],
            'end_time': timestamps[1],
            'metadata': metadata,
            'attachments': attachment_dict
        }

    def get_attrs(self, name):
        matches = self.attr_regex.search(name)
        if matches:
            attrs = matches.group(1)
        else:
            attrs = None
        return attrs

    def cleanup_test_name(self, name, strip_tags=True, strip_scenarios=False):
        """Clean up the test name for display.

        By default we strip out the tags in the test because they don't help us
        in identifying the test that is run to it's result.

        Make it possible to strip out the testscenarios information (not to
        be confused with tempest scenarios) however that's often needed to
        identify generated negative tests.
        """
        if strip_tags:
            matches = self.attr_regex.search(name)
            if matches:
                tags_start = matches.start(0)
                tags_end = matches.end(0)
                if tags_start > 0 and tags_end > tags_start:
                    newname = name[:tags_start]
                    newname += name[tags_end:]
                    name = newname

        if strip_scenarios:
            tags_start = name.find('(')
            tags_end = name.find(')')
            if tags_start > 0 and tags_end > tags_start:
                newname = name[:tags_start]
                newname += name[tags_end + 1:]
                name = newname
        return name

    def run_time(self):
        runtime = 0.0
        if self.use_wall_time:
            start_time = None
            stop_time = None
            for name, data in self.results.items():
                if not start_time or data['start_time'] < start_time:
                    start_time = data['start_time']
                if not stop_time or data['end_time'] > stop_time:
                    stop_time = data['end_time']
            runtime = get_duration(start_time, stop_time)
        else:
            for name, data in self.results.items():
                runtime += get_duration(data['start_time'], data['end_time'])
        return runtime