1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172
|
#!/bin/env python
from os import getcwd, remove, rmdir, mkdir
from cogent.util.unit_test import TestCase, main
from cogent.util.misc import flatten
from cogent.app.raxml import Raxml,raxml_alignment, build_tree_from_alignment
from cogent.app.util import ApplicationError
from cogent.parse.phylip import get_align_for_phylip
from cogent.core.tree import PhyloNode
from cogent.core.moltype import RNA
from StringIO import StringIO
from cogent.util.misc import app_path
from subprocess import Popen, PIPE, STDOUT
__author__ = "Micah Hamady"
__copyright__ = "Copyright 2007-2012, The Cogent Project"
__credits__ = ["Micah Hamady", "Rob Knight", "Daniel McDonald"]
__license__ = "GPL"
__version__ = "1.5.3"
__maintainer__ = "Micah Hamady"
__email__ = "Micah Hamady"
__status__ = "Development"
class GenericRaxml(TestCase):
def setUp(self):
"""Check if Raxml version is supported for this test"""
acceptable_version = (7,0,3)
self.assertTrue(app_path('raxmlHPC'),
"raxmlHPC not found. This may or may not be a problem depending on "+\
"which components of QIIME you plan to use.")
command = "raxmlHPC -v | grep version"
proc = Popen(command,shell=True,universal_newlines=True,\
stdout=PIPE,stderr=STDOUT)
stdout = proc.stdout.read()
version_string = stdout.strip().split(' ')[4].strip()
try:
version = tuple(map(int,version_string.split('.')))
pass_test = version == acceptable_version
except ValueError:
pass_test = False
version_string = stdout
self.assertTrue(pass_test,\
"Unsupported raxmlHPC version. %s is required, but running %s." \
% ('.'.join(map(str,acceptable_version)), version_string))
"""Setup data for raxml tests"""
self.seqs1 = ['ACUGCUAGCUAGUAGCGUACGUA','GCUACGUAGCUAC',
'GCGGCUAUUAGAUCGUA']
self.labels1 = ['>1','>2','>3']
self.lines1 = flatten(zip(self.labels1,self.seqs1))
self.test_model = "GTRCAT"
self.align1 = get_align_for_phylip(StringIO(PHYLIP_FILE))
self.test_fn1 = "/tmp/raxml_test1.txt"
self.test_fn2 = "raxml_test1.txt"
self.test_fn1_space = "/tmp/raxml test1.txt"
def writeTmp(self, outname):
"""Write data to temp file"""
t = open(outname, "w+")
t.write(PHYLIP_FILE)
t.close()
class RaxmlTests(GenericRaxml):
"""Tests for the Raxml application controller"""
def test_raxml(self):
"""raxml BaseCommand should return the correct BaseCommand"""
r = Raxml()
self.assertEqual(r.BaseCommand, \
''.join(['cd \"',getcwd(),'/\"; ','raxmlHPC -e 0.1 -f d -c 50']))
r.Parameters['-s'].on('seq.nexus')
self.assertEqual(r.BaseCommand,\
''.join(['cd \"',getcwd(),'/\"; ',\
'raxmlHPC -e 0.1 -f d -c 50 -s seq.nexus']))
def test_raxml_params(self):
"""raxml should raise exception if missing required params"""
r = Raxml(WorkingDir="/tmp")
r.SuppressStdout = True
r.SuppressStderr = True
# raise error by default
self.assertRaises(ValueError, r)
# specify output name
r.Parameters['-n'].on("test_name")
self.assertRaises(ApplicationError, r)
# specify model
r.Parameters['-m'].on("GTRCAT")
self.assertRaises(ApplicationError, r)
r.Parameters['-s'].on(self.test_fn1)
self.assertRaises(ApplicationError, r)
self.writeTmp(self.test_fn1)
o = r()
o.cleanUp()
remove(self.test_fn1)
def test_raxml_from_file(self):
"""raxml should run correctly using filename"""
r = Raxml(WorkingDir="/tmp")
r.Parameters['-s'].on(self.test_fn1)
r.Parameters['-m'].on("GTRCAT")
r.Parameters['-n'].on("test_me")
# test with abs filename
cur_out = self.test_fn1
self.writeTmp(cur_out)
out = r()
out.cleanUp()
remove(cur_out)
# test with rel + working dir
r.Parameters['-s'].on(self.test_fn2)
r.Parameters['-n'].on("test_me2")
r.Parameters['-w'].on("/tmp/")
self.writeTmp(self.test_fn1)
out = r()
out.cleanUp()
remove(self.test_fn1)
r.Parameters['-s'].on("\"%s\"" % self.test_fn1_space)
r.Parameters['-n'].on("test_me3")
r.Parameters['-w'].on("/tmp/")
#print r.BaseCommand
self.writeTmp(self.test_fn1_space)
out = r()
out.cleanUp()
remove(self.test_fn1_space)
def test_raxml_alignment(self):
"""raxml_alignment should work as expected"""
phy_node, parsimony_phy_node, log_likelihood, total_exec \
= raxml_alignment(self.align1)
def test_build_tree_from_alignment(self):
"""Builds a tree from an alignment"""
tree = build_tree_from_alignment(self.align1, RNA, False)
self.assertTrue(isinstance(tree, PhyloNode))
self.assertEqual(len(tree.tips()), 7)
self.assertRaises(NotImplementedError, build_tree_from_alignment, \
self.align1, RNA, True)
PHYLIP_FILE= """ 7 50
Species001 UGCAUGUCAG UAUAGCUUUA GUGAAACUGC GAAUGGCUCA UUAAAUCAGU
Species002 UGCAUGUCAG UAUAGCUUUA GUGAAACUGC GAAUGGCUNN UUAAAUCAGU
Species003 UGCAUGUCAG UAUAGCAUUA GUGAAACUGC GAAUGGCUCA UUAAAUCAGU
Species004 UCCAUGUCAG UAUAACUUUG GUGAAACUGC GAAUGGCUCA UUAAAUCAGG
Species005 NNNNNNNNNN UAUAUCUUAU GUGAAACUUC GAAUGCCUCA UUAAAUCAGU
Species006 UGCAUGUCAG UAUAGCUUUG GUGAAACUGC GAAUGGCUCA UUAAAUCAGU
Species007 UGCAUGUCAG UAUAACUUUG GUGAAACUGC GAAUGGCUCA UUAAAUCAGU
"""
if __name__ == '__main__':
main()
|