1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
|
import sys
import os
import glob
import difflib
import gzip
import contextlib
import inspect
import tempfile
import pysam
WORKDIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
"pysam_test_work"))
BAM_DATADIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
"pysam_data"))
TABIX_DATADIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
"tabix_data"))
CBCF_DATADIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
"cbcf_data"))
LINKDIR = os.path.abspath(os.path.join(
os.path.dirname(__file__), "..", "linker_tests"))
TESTS_TEMPDIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "tmp"))
IS_PYTHON3 = sys.version_info[0] >= 3
if IS_PYTHON3:
from itertools import zip_longest
from urllib.request import urlopen
else:
from itertools import izip as zip_longest
from urllib2 import urlopen
if IS_PYTHON3:
def force_str(s):
try:
return s.decode('ascii')
except AttributeError:
return s
def force_bytes(s):
try:
return s.encode('ascii')
except AttributeError:
return s
else:
def force_str(s):
return s
def force_bytes(s):
return s
def openfile(fn):
if fn.endswith(".gz"):
try:
return gzip.open(fn, "rt", encoding="utf-8")
except TypeError:
return gzip.open(fn, "r")
else:
return open(fn)
def checkBinaryEqual(filename1, filename2):
'''return true if the two files are binary equal.
'''
if os.path.getsize(filename1) != os.path.getsize(filename2):
return False
infile1 = open(filename1, "rb")
infile2 = open(filename2, "rb")
def chariter(infile):
while 1:
c = infile.read(1)
if c == b"":
break
yield c
found = False
for c1, c2 in zip_longest(chariter(infile1), chariter(infile2)):
if c1 != c2:
break
else:
found = True
infile1.close()
infile2.close()
return found
def checkGZBinaryEqual(filename1, filename2):
'''return true if the decompressed contents of the two files
are binary equal.
'''
with gzip.open(filename1, "rb") as infile1:
d1 = infile1.read()
with gzip.open(filename2, "rb") as infile2:
d2 = infile2.read()
if d1 == d2:
return True
return False
def check_samtools_view_equal(
filename1, filename2,
without_header=False):
'''return true if the two files are equal in their
content through samtools view.
'''
# strip MD and NM tags, as not preserved in CRAM files
args = ["-x", "MD", "-x", "NM"]
if not without_header:
args.append("-h")
lines1 = pysam.samtools.view(*(args + [filename1]))
lines2 = pysam.samtools.view(*(args + [filename2]))
if len(lines1) != len(lines2):
return False
if lines1 != lines2:
# line by line comparison
# sort each line, as tags get rearranged between
# BAM/CRAM
for n, pair in enumerate(zip(lines1, lines2)):
l1, l2 = pair
l1 = sorted(l1[:-1].split("\t"))
l2 = sorted(l2[:-1].split("\t"))
if l1 != l2:
print("mismatch in line %i" % n)
print(l1)
print(l2)
return False
else:
return False
return True
def check_url(url):
'''return True if URL is available.
A URL might not be available if it is the wrong URL
or there is no connection to the URL.
'''
try:
urlopen(url, timeout=1)
return True
except:
return False
def checkFieldEqual(cls, read1, read2, exclude=[]):
'''check if two reads are equal by comparing each field.'''
# add the . for refactoring purposes.
for x in (".query_name",
".query_sequence",
".flag",
".reference_id",
".reference_start",
".mapping_quality",
".cigartuples",
".next_reference_id",
".next_reference_start",
".template_length",
".query_length",
".query_qualities",
".bin",
".is_paired", ".is_proper_pair",
".is_unmapped", ".mate_is_unmapped",
".is_reverse", ".mate_is_reverse",
".is_read1", ".is_read2",
".is_secondary", ".is_qcfail",
".is_duplicate"):
n = x[1:]
if n in exclude:
continue
cls.assertEqual(getattr(read1, n), getattr(read2, n),
"attribute mismatch for %s: %s != %s" %
(n, getattr(read1, n), getattr(read2, n)))
def check_lines_equal(cls, a, b, sort=False, filter_f=None, msg=None):
"""check if contents of two files are equal comparing line-wise.
sort: bool
sort contents of both files before comparing.
filter_f:
remover lines in both a and b where expression is True
"""
with openfile(a) as inf:
aa = inf.readlines()
with openfile(b) as inf:
bb = inf.readlines()
if filter_f is not None:
aa = [x for x in aa if not filter_f(x)]
bb = [x for x in bb if not filter_f(x)]
if sort:
cls.assertEqual(sorted(aa), sorted(bb), msg)
else:
cls.assertEqual(aa, bb, msg)
def get_temp_filename(suffix=""):
caller_name = inspect.getouterframes(inspect.currentframe(), 2)[1][3]
try:
os.makedirs(TESTS_TEMPDIR)
except OSError:
pass
f = tempfile.NamedTemporaryFile(
prefix="pysamtests_tmp_{}_".format(caller_name),
suffix=suffix,
delete=False,
dir=TESTS_TEMPDIR)
f.close()
return f.name
@contextlib.contextmanager
def get_temp_context(suffix="", keep=False):
caller_name = inspect.getouterframes(inspect.currentframe(), 3)[1][3]
try:
os.makedirs(TESTS_TEMPDIR)
except OSError:
pass
f = tempfile.NamedTemporaryFile(
prefix="pysamtests_tmp_{}_".format(caller_name),
suffix=suffix,
delete=False,
dir=TESTS_TEMPDIR)
f.close()
yield f.name
if not keep:
# clear up any indices as well
for f in glob.glob(f.name + "*"):
os.unlink(f)
def load_and_convert(filename, encode=True):
'''load data from filename and convert all fields to string.
Filename can be either plain or compressed (ending in .gz).
'''
data = []
if filename.endswith(".gz"):
with gzip.open(filename) as inf:
for line in inf:
line = line.decode("ascii")
if line.startswith("#"):
continue
d = line.strip().split("\t")
data.append(d)
else:
with open(filename) as f:
for line in f:
if line.startswith("#"):
continue
d = line.strip().split("\t")
data.append(d)
return data
def flatten_nested_list(l):
return [i for ll in l for i in ll]
|