1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
|
# vim: set fileencoding=utf-8 :
from .. import context # noqa: F401
import os
import shutil
import subprocess
import tarfile
import tempfile
import unittest
import zipfile
import gbp.log
import gbp.errors
from gbp.deb.changelog import ChangeLog
from . gbplogtester import GbpLogTester
from . debiangittestrepo import DebianGitTestRepo
from . capture import capture_stdout, capture_stderr
from . popen import patch_popen
__all__ = ['GbpLogTester', 'DebianGitTestRepo', 'OsReleaseFile',
'MockedChangeLog', 'get_dch_default_urgency',
'capture_stderr', 'capture_stdout',
'ls_dir', 'ls_tar', 'ls_zip',
'patch_popen', 'have_cmd', 'skip_without_cmd']
class OsReleaseFile(object):
"""Represents a simple file with key-value pairs"""
def __init__(self, filename="/etc/os-release"):
self._values = {}
try:
with open(filename, 'r') as filed:
for line in filed.readlines():
try:
key, value = line.split('=', 1)
except ValueError:
pass
else:
self._values[key] = value.strip()
except IOError as err:
gbp.log.info('Failed to read OS release file %s: %s' %
(filename, err))
def __getitem__(self, key):
if key in self._values:
return self._values[key]
return None
def __contains__(self, key):
return key in self._values
def __str__(self):
return str(self._values)
def __repr__(self):
return repr(self._values)
class MockedChangeLog(ChangeLog):
contents = """foo (%s) experimental; urgency=low
%s
-- Debian Maintainer <maint@debian.org> Sat, 01 Jan 2012 00:00:00 +0100"""
def __init__(self, version, changes="a important change"):
ChangeLog.__init__(self,
contents=self.contents % (version, changes))
def get_dch_default_urgency():
"""Determine the default urgency level used by dch"""
urgency = 'medium'
tempdir = tempfile.mkdtemp()
tmp_dch_name = os.path.join(tempdir, 'changelog')
try:
dch_cmd = ['debchange', '--create', '--empty', '--changelog', tmp_dch_name,
'--package=foo', '--newversion=1',
'--distribution=UNRELEASED']
ret = subprocess.Popen(dch_cmd).wait()
except OSError:
pass
else:
if ret == 0:
with open(tmp_dch_name, encoding='utf-8') as dchfile:
header = dchfile.readline().strip()
urgency = header.split()[-1].replace('urgency=', '')
finally:
if os.path.isdir(tempdir):
shutil.rmtree(tempdir)
return urgency
def ls_dir(directory, directories=True):
"""List the contents of directory, recurse to subdirectories"""
contents = set()
for root, dirs, files in os.walk(directory):
prefix = ''
if root != directory:
prefix = os.path.relpath(root, directory) + '/'
contents.update(['%s%s' % (prefix, fname) for fname in files])
if directories:
contents.update(['%s%s' % (prefix, dname) for dname in dirs])
return contents
def ls_tar(tarball, directories=True):
"""List the contents of tar archive"""
tmpdir = tempfile.mkdtemp()
try:
tarobj = tarfile.open(tarball, 'r')
tarobj.extractall(tmpdir)
return ls_dir(tmpdir, directories)
finally:
shutil.rmtree(tmpdir)
def ls_zip(archive, directories=True):
"""List the contents of zip file"""
tmpdir = tempfile.mkdtemp()
try:
zipobj = zipfile.ZipFile(archive, 'r')
zipobj.extractall(tmpdir)
return ls_dir(tmpdir, directories)
finally:
shutil.rmtree(tmpdir)
def have_cmd(cmd):
"""Check if a command is available"""
return True if shutil.which(cmd) else False
def skip_without_cmd(cmd):
"""Skip if a command is not available"""
if have_cmd(cmd):
return lambda func: func
else:
return unittest.skip("Command '%s' not found" % cmd)
|