File: behave.step_durations.py

package info (click to toggle)
behave 1.2.6-6
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,160 kB
  • sloc: python: 19,857; makefile: 137; sh: 18
file content (163 lines) | stat: -rwxr-xr-x 5,932 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Utility script to retrieve duration information from behave JSON output.

REQUIRES: Python >= 2.6 (json module is part of Python standard library)
LICENSE:  BSD
"""

from __future__ import absolute_import

__author__    = "Jens Engel"
__copyright__ = "(c) 2013 by Jens Engel"
__license__   = "BSD"
VERSION = "0.1.0"

# -- IMPORTS:
from behave import json_parser
from behave.model import ScenarioOutline
from optparse import OptionParser
from operator import attrgetter
import os.path
import sys


# ----------------------------------------------------------------------------
# FUNCTIONS:
# ----------------------------------------------------------------------------
class StepDurationData(object):
    def __init__(self, step=None):
        self.step_name = None
        self.min_duration = sys.maxint
        self.max_duration = 0
        self.durations = []
        self.step = step
        if step:
            self.process_step(step)

    @staticmethod
    def make_step_name(step):
        step_name = "%s %s" % (step.step_type.capitalize(), step.name)
        return step_name

    def process_step(self, step):
        step_name = self.make_step_name(step)
        if not self.step_name:
            self.step_name = step_name
        if self.min_duration > step.duration:
            self.min_duration = step.duration
        if self.max_duration < step.duration:
            self.max_duration = step.duration
        self.durations.append(step.duration)


class BehaveDurationData(object):
    def __init__(self):
        self.step_registry = {}
        self.all_steps = []
        self.all_scenarios = []


    def process_features(self, features):
        for feature in features:
            self.process_feature(feature)

    def process_feature(self, feature):
        if feature.background:
            self.process_background(feature.background)
        for scenario in feature.scenarios:
            if isinstance(scenario, ScenarioOutline):
                self.process_scenario_outline(scenario)
            else:
                self.process_scenario(scenario)

    def process_step(self, step):
        step_name = StepDurationData.make_step_name(step)
        known_step = self.step_registry.get(step_name, None)
        if known_step:
            known_step.process_step(step)
        else:
            step_data = StepDurationData(step)
            self.step_registry[step_name] = step_data
        self.all_steps.append(step)

    def process_background(self, scenario):
        for step in scenario:
            self.process_step(step)

    def process_scenario(self, scenario):
        for step in scenario:
            self.process_step(step)

    def process_scenario_outline(self, scenario_outline):
        for scenario in scenario_outline:
            self.process_scenario(scenario)

    def report_step_durations(self, limit=None, min_duration=None, ostream=sys.stdout):
        step_datas = list(self.step_registry.values())
        steps_size = len(step_datas)
        steps_by_longest_duration_first = sorted(step_datas,
                                                 key=attrgetter("max_duration"),
                                                 reverse=True)
        ostream.write("STEP DURATIONS (longest first, size=%d):\n" % steps_size)
        ostream.write("-" * 80)
        ostream.write("\n")
        for index, step in enumerate(steps_by_longest_duration_first):
            ostream.write("% 4d.  %9.6fs  %s" % \
                          (index+1, step.max_duration, step.step_name))
            calls = len(step.durations)
            if calls > 1:
                ostream.write(" (%d calls, min: %.6fs)\n" % (calls, step.min_duration))
            else:
                ostream.write("\n")
            if ((limit and index+1 >= limit) or
                (step.max_duration < min_duration)):
                remaining = steps_size - (index+1)
                ostream.write("...\nSkip remaining %d steps.\n" % remaining)
                break


# ----------------------------------------------------------------------------
# MAIN FUNCTION:
# ----------------------------------------------------------------------------
def main(args=None):
    if args is None:
        args = sys.argv[1:]

    usage_ = """%prog [OPTIONS] JsonFile
Read behave JSON data file and extract steps with longest duration."""
    parser = OptionParser(usage=usage_, version=VERSION)
    parser.add_option("-e", "--encoding", dest="encoding",
                     default="UTF-8",
                     help="Encoding to use (default: %default).")
    parser.add_option("-l", "--limit", dest="limit", type="int",
                     help="Max. number of steps (default: %default).")
    parser.add_option("-m", "--min", dest="min_duration", default="0",
                     help="Min. duration threshold (default: %default).")
    options, filenames = parser.parse_args(args)
    if not filenames:
        parser.error("OOPS, no filenames provided.")
    elif len(filenames) > 1:
        parser.error("OOPS: Can only process one JSON file.")
    min_duration = float(options.min_duration)
    if min_duration < 0:
        min_duration = None
    json_filename = filenames[0]
    if not os.path.exists(json_filename):
        parser.error("JSON file '%s' not found" % json_filename)

    # -- NORMAL PROCESSING: Read JSON, extract step durations and report them.
    features = json_parser.parse(json_filename)
    processor = BehaveDurationData()
    processor.process_features(features)
    processor.report_step_durations(options.limit, min_duration)
    sys.stdout.write("Detected %d features.\n" % len(features))
    return 0


# ----------------------------------------------------------------------------
# AUTO-MAIN:
# ----------------------------------------------------------------------------
if __name__ == "__main__":
    sys.exit(main())