#!/usr/bin/env python3 import unittest import urllib.request import os import zipfile import warnings import timeit import multiprocessing import pandas as pd import matplotlib matplotlib.use('Agg') import matplotlib.pyplot import Bio.SeqIO import Bio.pairwise2 import Bio.Entrez from Bio.SeqRecord import SeqRecord from Bio.SubsMat.MatrixInfo import blosum62 #============================================================================== import importlib import os import sys import io spec, m = None, None def bali_score(ref_fasta_path, test_fasta_path): """Portable wrapper for the bali_score.py script. Calculate BALI scores from a test and a reference multiple sequence alignment in FASTA format. Reproduces the results of the bali_score C program. """ path = os.path.join("balibase", "bali_score.py") global spec, m if spec is None: spec = importlib.util.spec_from_file_location("baliscore", path) m = importlib.util.module_from_spec(spec) tmp_argv = sys.argv tmp_stdout = sys.stdout buf = sys.stdout = io.StringIO() sys.argv = [path, ref_fasta_path, test_fasta_path] try: spec.loader.exec_module(m) finally: sys.argv = tmp_argv sys.stdout = tmp_stdout buf.seek(0) score = {} for line in buf: name, var = line.partition("=")[::2] score[name.strip()] = float(var) return score #============================================================================== class BalibaseTestCase(unittest.TestCase): """Base class to inherit by all test cases that need access to the balibase. """ def setUp(self): balibase_zippath = "balibase.zip" self.balibase_path = "balibase" testfile_path = os.path.join( self.balibase_path, "RV11.unaligned", "BBS11001.fasta") if not os.path.isdir(self.balibase_path): os.mkdir(self.balibase_path) if not os.path.isfile(testfile_path): # Archive is no longer publicly available, so this will probably # fail. Fall back to comitting the archive. if not os.path.isfile(balibase_zippath): print("Fetching balibase archive from moodle...") urllib.request.urlretrieve( "https://moodle.polytechnique.fr/mod/resource/view.php?id=38570", balibase_zippath) with zipfile.ZipFile(balibase_zippath) as balibase_zip: balibase_zip.extractall() def get_dataset_heads(self): """Generator function to iterate over the first two sequences of each unaligned fasta file. """ dataset_dir = os.path.join(self.balibase_path, "RV11.unaligned") for filename in os.listdir(dataset_dir): records = Bio.SeqIO.parse( os.path.join(dataset_dir, filename), "fasta") seq1 = next(records) seq2 = next(records) yield seq1, seq2, filename class AlignmentSeqTestCase(BalibaseTestCase): def get_dataset_records(self): """Generator function to iterate over the record generator of each unaligned fasta file. """ dataset_dir = os.path.join(self.balibase_path, "RV11.unaligned") for filename in os.listdir(dataset_dir): records = Bio.SeqIO.parse( os.path.join(dataset_dir, filename), "fasta") yield records, filename def assertSameResidues(self, str1, str2): """Strip strings of their '-' before comparing them """ return self.assertEqual( str(str1).translate({ord('-'):None}), str(str2).translate({ord('-'):None}) ) def test_simple_align(self): """Test alignments with the simplest metric. As there can be a huge number of solutions, we check only that we got the right score, and one valid alignment. """ from alignementseq import align score_fn = lambda a,b : -2 if a == '' or b == '' else -1 if a != b else 1 for s1, s2, filename in self.get_dataset_heads(): score, r1, r2 = align(s1, s2, score_fn) (ex_r1, ex_r2, exp_score, *_) = Bio.pairwise2.align.globalms( s1.seq, s2.seq, 1, -1, -2, -2, one_alignment_only=True)[0] try: self.assertSameResidues(r1.seq, s1.seq) self.assertSameResidues(r2.seq, s2.seq) self.assertEqual(score, sum( -2 if a == '-' or b == '-' else -1 if a != b else 1 for a,b in zip(r1.seq, r2.seq))) self.assertEqual(score, exp_score) except AssertionError: print('from', filename, ':') print(r1.seq) print(r2.seq) print('') print(ex_r1) print(ex_r2) raise def test_blosum_align(self): """Tests alignments with blosum but no gap extension. """ from alignementseq import align, vec_align for s1, s2, filename in self.get_dataset_heads(): (ex_r1, ex_r2, exp_score, *_) = Bio.pairwise2.align.globalds( s1.seq, s2.seq, blosum62, -8, -8, one_alignment_only=True)[0] for method in align, vec_align: score, r1, r2 = method(s1, s2) try: self.assertSameResidues(r1.seq, s1.seq) self.assertSameResidues(r2.seq, s2.seq) self.assertEqual(score, sum( -8 if a == '-' or b == '-' else blosum62[a,b] if (a,b) in blosum62 else blosum62[b,a] for a,b in zip(r1.seq, r2.seq))) self.assertEqual(score, exp_score) except AssertionError: print('from', filename, "with", method, ":") print(r1.seq) print(r2.seq) print('') print(ex_r1) print(ex_r2) raise def test_blosum_align2steps(self): """Tests alignments with blosum and gap extension. """ from alignementseq import align2steps for s1, s2, filename in self.get_dataset_heads(): (ex_r1, ex_r2, exp_score, *_) = Bio.pairwise2.align.globaldd( s1.seq, s2.seq, blosum62, -8, -4, -8, -4, one_alignment_only=True)[0] score, r1, r2 = align2steps(s1, s2) try: self.assertSameResidues(r1.seq, s1.seq) self.assertSameResidues(r2.seq, s2.seq) calc_score = 0 # score calculated from r1 and r2 gap1, gap2 = False, False for a,b in zip(r1.seq, r2.seq): if a == '-': if not gap1: gap1 = True gap2 = False # suppose 2 gaps are never aligned calc_score -= 8 else: calc_score -= 4 elif b == '-': if not gap2: gap2 = True gap1 = False calc_score -= 8 else: calc_score -= 4 else: gap1, gap2 = False, False calc_score += blosum62[a,b] if (a,b) in blosum62 else blosum62[b,a] self.assertEqual(score, calc_score) self.assertEqual(score, exp_score) except AssertionError: print('from', filename, "with", align2steps, ":") print(r1.seq) print(r2.seq) print('') print(ex_r1) print(ex_r2) raise def test_benchmark_multiple_align(self): """Tests the multiple_align function (using blosum and gap extension).""" #from alignementseq import multiple_align from Bio.Align.Applications import ClustalwCommandline from Bio import AlignIO import pip print("Test-benchmark multiple_align") import clustalo as cl for records, filename in list(self.get_dataset_records())[:3]: cl.runClustalO("ariane-la-fusee@laposte.net", "sp:wap_rat,sp:wap_mouse,sp:wap_pig", fmt='fasta') # TODO: use real seq instead of rat, pig... print("Fine", filename) def save_alignments(self, NameFile): from alignementseq import align2steps for s1, s2, filename in self.get_dataset_heads(): score, r1, r2 = align2steps(s1, s2) with open("balibase/" + str(NameFile)+'.fasta', "w") as fd: Bio.SeqIO.write((r1, r2), fd, "fasta") class PerformanceTestCase(BalibaseTestCase): """Performance tests, slow. Excluded from default test suite, run it with ``python -m unittest alignmentseq_tests.PerformanceTestCase`` """ def setUp(self): """Loads titin sequences. """ super().setUp() self.unit_timeout = int(os.environ.get("UNIT_TIMEOUT") or 60) with warnings.catch_warnings(): warnings.simplefilter("ignore") self.titin_human, self.titin_mouse = Bio.SeqIO.parse( Bio.Entrez.efetch(db="protein", id=["CAA62188","EDL27217"], rettype="gp", retmode="text"), "genbank") def test_performance(self): from alignementseq import align, vec_align def biopython_align(seq1, seq2): return Bio.pairwise2.align.globalds( seq1.seq, seq2.seq, blosum62, -8, -8, one_alignment_only=True)[0] methods = (vec_align, align, biopython_align) times = pd.DataFrame(columns=("length1", "length2", "method", "time"), dtype=float) for s1, s2, filename in self.get_dataset_heads(): for method in methods: def to_time(): method(s1, s2) t = timeit.Timer(to_time).timeit(1) times.loc[len(times)] = ( len(s1.seq), len(s2.seq), method.__name__, t ) times['length_product'] = times.length1 * times.length2 print(times) ax = None cmap = matplotlib.cm.inferno colors = [ cmap(i/len(methods)) for i in range(len(methods)) ] for im, method in enumerate(methods): ax = times.loc[times.method == method.__name__].plot.scatter( 'length_product', 'time', c=colors[im], label=method.__name__, ax=ax) matplotlib.pyplot.savefig("timings.png") def test_titin(self): from alignementseq import align, vec_align for method in vec_align, align: def to_time(): method(self.titin_human, self.titin_mouse) def run(): t = timeit.Timer(to_time).timeit(1) print("{}: {}".format(method.__name__, t)) p = multiprocessing.Process(target=run) p.start() p.join(self.unit_timeout) # Note: if the task eats up your memory, it can take a while for # it to terminate if it times out. if p.is_alive(): p.terminate() p.join() print("{}: timeout".format(method.__name__)) def load_tests(loader, standard_tests, pattern): return unittest.defaultTestLoader.loadTestsFromTestCase(AlignmentSeqTestCase) if __name__ == '__main__': unittest.main(verbosity=2)