1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
|
###################################################################################
# LAVA QA tool
# Copyright (C) 2015, 2016 Collabora Ltd.
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 US
###################################################################################
from __future__ import print_function
import base64
import os.path
from lqa_api.job import Job, JobError
from lqa_api.exit_codes import APPLICATION_ERROR
from lqa_tool.commands import Command
from lqa_tool.settings import lqa_logger
class TestCmd(Command):
def __init__(self, args):
Command.__init__(self, args)
def run(self):
try:
self._run(Job(self.args.job_id, self.server))
except JobError as err:
lqa_logger.error("test command: job {}: {} {}" \
.format(self.args.job_id, err.code, err.msg))
exit(APPLICATION_ERROR)
def _run(self, job):
for test in job.tests:
# Exclude any test passed with -e/--exclude
if self.args.exclude and test.name in self.args.exclude:
continue
# Print all tests if no test specified in the command line
if (not self.args.test_name) or \
test.name in self.args.test_name or \
test.uuid in self.args.test_name:
print(test)
# Show further job metadata with --info
if self.args.info:
print(" description: {}".format(test.description))
print(" version: {}".format(test.version))
print(" format: {}".format(test.format))
print(" params: {}".format(test.params))
print(" project: {}".format(test.project))
# Strip image field (if it exists) since it can contain newline
if test.image:
print(" image: {}".format(test.image.strip()))
else:
print(" image: {}".format(test.image))
print(" location: {}".format(test.location))
print(" branch_url: {}".format(test.branch_url))
print(" branch_rev: {}".format(test.branch_rev))
print(" branch_vcs: {}".format(test.branch_vcs))
# Show installed packages (package environment) for this test
if self.args.show_packages and test.packages:
for pkg in test.packages:
print("P: {} {}".format(pkg['name'], pkg['version']))
# Save test run attachments if --attachments is true
# and there are attachments available.
if self.args.attachments and test.attachments:
tr_attachments_path = _make_attachments_dir(
self.args.attachments,
"{}-{}".format(test.name, test.uuid))
for attachment in test.attachments:
_save_attachment(attachment, tr_attachments_path)
# Show test results if -r/--results passed and/or save test case
# attachments if --attachments option passed.
if self.args.results or self.args.attachments:
for result in test.results:
# Print results if --results is true
if self.args.results:
ms = ("{} {}".format(result.get('measurement', ''),
result.get('units', ''))).strip()
print("- {}: {} {}".format(result['test_case_id'],
result['result'],
(ms and '/ '+ ms) or ''))
# Save test case attachments if --attachments is true
# and there are attachments available.
tc_attachments = \
self.args.attachments and result.get('attachments', [])
if tc_attachments:
# Test case attachments path is the
# test_run_atttachments path + test_case_id.
tc_attachments_path = _make_attachments_dir(
tr_attachments_path,
result['test_case_id'])
for attachment in tc_attachments:
_save_attachment(attachment, tc_attachments_path)
if self.args.attachments:
lqa_logger.info("Attachments saved to: {}".format(\
tr_attachments_path))
def _make_attachments_dir(attachments_dir, test_dir):
dir_pathname = os.path.join(attachments_dir, test_dir)
if not os.path.exists(dir_pathname):
try:
os.makedirs(dir_pathname)
except EnvironmentError as e:
lqa_logger.error(e)
exit(APPLICATION_ERROR)
return dir_pathname
def _save_attachment(attachment, directory):
# Replace any '/' in the attachment name by '_'
filename = attachment['pathname'].replace('/', '_')
attachment_pathname = os.path.join(directory, filename)
try:
with open(attachment_pathname, 'w') as f:
f.write(base64.b64decode(attachment['content']))
except EnvironmentError as e:
lqa_logger.error(e)
exit(APPLICATION_ERROR)
|