Skip to content
Snippets Groups Projects
alignementseq_tests.py 11.7 KiB
Newer Older
#!/usr/bin/env python3

import unittest
import urllib.request
import os
import zipfile
import warnings
import timeit
import multiprocessing
import pandas as pd
Etienne MORICE's avatar
Etienne MORICE committed
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot
import Bio.SeqIO
import Bio.pairwise2
import Bio.Entrez

from Bio.SeqRecord import SeqRecord
from Bio.SubsMat.MatrixInfo import blosum62
#==============================================================================
import importlib
import os
import sys
import io

spec, m = None, None

def bali_score(ref_fasta_path, test_fasta_path):
    """Portable wrapper for the bali_score.py script.
    
    Calculate BALI scores from a test and a reference multiple sequence alignment in FASTA format.
    Reproduces the results of the bali_score C program.
    """
    path = os.path.join("balibase", "bali_score.py")
    global spec, m
    if spec is None:
        spec = importlib.util.spec_from_file_location("baliscore", path)
        m = importlib.util.module_from_spec(spec)
    tmp_argv = sys.argv
    tmp_stdout = sys.stdout
    buf = sys.stdout = io.StringIO()
    sys.argv = [path, ref_fasta_path, test_fasta_path]
    try:
        spec.loader.exec_module(m)
    finally:
        sys.argv = tmp_argv
        sys.stdout = tmp_stdout
    buf.seek(0)
    score = {}
    for line in buf:
        name, var = line.partition("=")[::2]
        score[name.strip()] = float(var)
    return score
#==============================================================================

Etienne MORICE's avatar
Etienne MORICE committed
class BalibaseTestCase(unittest.TestCase):
    """Base class to inherit by all test cases that need access to the balibase.
    """
    def setUp(self):
        balibase_zippath = "balibase.zip"
        self.balibase_path = "balibase"
        testfile_path = os.path.join(
            "RV11.unaligned", "BBS11001.fasta")

        if not os.path.isdir(self.balibase_path):
            os.mkdir(self.balibase_path)
        if not os.path.isfile(testfile_path):
Etienne MORICE's avatar
Etienne MORICE committed
            # Archive is no longer publicly available, so this will probably
            # fail. Fall back to comitting the archive.
            if not os.path.isfile(balibase_zippath):
                print("Fetching balibase archive from moodle...")
                urllib.request.urlretrieve(
                    "https://moodle.polytechnique.fr/mod/resource/view.php?id=38570",
                    balibase_zippath)
            with zipfile.ZipFile(balibase_zippath) as balibase_zip:
                balibase_zip.extractall()

    def get_dataset_heads(self):
        """Generator function to iterate over the first two sequences of each
        unaligned fasta file.
        """
        dataset_dir = os.path.join(self.balibase_path, "RV11.unaligned")
        for filename in os.listdir(dataset_dir):
            records = Bio.SeqIO.parse(
                    os.path.join(dataset_dir, filename),
                    "fasta")
            seq1 = next(records)
            seq2 = next(records)
            yield seq1, seq2, filename
Etienne MORICE's avatar
Etienne MORICE committed

class AlignmentSeqTestCase(BalibaseTestCase):
    
    def get_dataset_records(self):
        """Generator function to iterate over the record generator  of each
        unaligned fasta file.
        """
        dataset_dir = os.path.join(self.balibase_path, "RV11.unaligned")
        for filename in os.listdir(dataset_dir):
            records = Bio.SeqIO.parse(
                    os.path.join(dataset_dir, filename),
                    "fasta")
            yield records, filename

    def assertSameResidues(self, str1, str2):
        """Strip strings of their '-' before comparing them
        """
        return self.assertEqual(
            str(str1).translate({ord('-'):None}),
            str(str2).translate({ord('-'):None})
            )

    def test_simple_align(self):
        """Test alignments with the simplest metric.
        
        As there can be a huge number of
        solutions, we check only that we got the right score, and one valid
        alignment.
        """
        from alignementseq import align
        score_fn = lambda a,b : -2 if a == '' or b == '' else -1 if a != b else 1

        for s1, s2, filename in self.get_dataset_heads():
            score, r1, r2 = align(s1, s2, score_fn)
            (ex_r1, ex_r2, exp_score, *_) = Bio.pairwise2.align.globalms(
                s1.seq, s2.seq, 1, -1, -2, -2, one_alignment_only=True)[0]

            try:
                self.assertSameResidues(r1.seq, s1.seq)
                self.assertSameResidues(r2.seq, s2.seq)
                self.assertEqual(score, sum(
                        -2 if a == '-' or b == '-' else
                        -1 if a != b else 
                        1
                    for a,b in zip(r1.seq, r2.seq)))
                self.assertEqual(score, exp_score)
            except AssertionError:
                print('from', filename, ':')
                print(r1.seq)
                print(r2.seq)
                print('')
                print(ex_r1)
                print(ex_r2)
                raise

    def test_blosum_align(self):
        """Tests alignments with blosum but no gap extension.
        """

        from alignementseq import align, vec_align
        
        for s1, s2, filename in self.get_dataset_heads():
            (ex_r1, ex_r2, exp_score, *_) = Bio.pairwise2.align.globalds(
                s1.seq, s2.seq, blosum62, -8, -8, one_alignment_only=True)[0]
            for method in align, vec_align:
                score, r1, r2 = method(s1, s2)
                try:
                    self.assertSameResidues(r1.seq, s1.seq)
                    self.assertSameResidues(r2.seq, s2.seq)
                    self.assertEqual(score, sum(
                            -8 if a == '-' or b == '-' else
                            blosum62[a,b] if (a,b) in blosum62 else
                            blosum62[b,a]
                        for a,b in zip(r1.seq, r2.seq)))
                    self.assertEqual(score, exp_score)
                except AssertionError:
                    print('from', filename, "with", method, ":")
                    print(r1.seq)
                    print(r2.seq)
                    print('')
                    print(ex_r1)
                    print(ex_r2)
                    raise
    
    def test_blosum_align2steps(self):
        """Tests alignments with blosum and gap extension.
        """
        from alignementseq import align2steps
        
        for s1, s2, filename in self.get_dataset_heads():
            (ex_r1, ex_r2, exp_score, *_) = Bio.pairwise2.align.globaldd(
                s1.seq, s2.seq, blosum62, -8, -4, -8, -4, one_alignment_only=True)[0]
            score, r1, r2 = align2steps(s1, s2)
            try:
                self.assertSameResidues(r1.seq, s1.seq)
                self.assertSameResidues(r2.seq, s2.seq)
                calc_score = 0   # score calculated from r1 and r2
                gap1, gap2 = False, False
                for a,b in zip(r1.seq, r2.seq):
                    if a == '-':
                        if not gap1:
                            gap1 = True
                            gap2 = False   # suppose 2 gaps are never aligned
                            calc_score -= 8
                        else:
                            calc_score -= 4
                    elif b == '-':
                        if not gap2:
                            gap2 = True
                            gap1 = False
                            calc_score -= 8
                        else:
                            calc_score -= 4
                    else:
                        gap1, gap2 = False, False
                        calc_score += blosum62[a,b] if (a,b) in blosum62 else blosum62[b,a]
                self.assertEqual(score, calc_score)
                self.assertEqual(score, exp_score)
            except AssertionError:
                print('from', filename, "with", align2steps, ":")
                print(r1.seq)
                print(r2.seq)
                print('')
                print(ex_r1)
                print(ex_r2)
                raise
 
    def test_benchmark_multiple_align(self):
        """Tests the multiple_align function (using blosum and gap extension)."""
        #from alignementseq import multiple_align
        from Bio.Align.Applications import ClustalwCommandline
        from Bio import AlignIO
        import pip
        
        print("Test-benchmark multiple_align")
        import clustalo as cl
     
        for records, filename in list(self.get_dataset_records())[:3]:
            cl.runClustalO("ariane-la-fusee@laposte.net", "sp:wap_rat,sp:wap_mouse,sp:wap_pig", fmt='fasta')
            # TODO: use real seq instead of rat, pig...
            print("Fine", filename)
 
   
    
    def save_alignments(self, NameFile):
        from alignementseq import align2steps
        for s1, s2, filename in self.get_dataset_heads():
            score, r1, r2 = align2steps(s1, s2)
        with open("balibase/" + str(NameFile)+'.fasta', "w") as fd:
            Bio.SeqIO.write((r1, r2), fd, "fasta")
    
Etienne MORICE's avatar
Etienne MORICE committed
class PerformanceTestCase(BalibaseTestCase):
    """Performance tests, slow.

    Excluded from default test suite, run it with
    ``python -m unittest alignmentseq_tests.PerformanceTestCase``
    """

    def setUp(self):
        """Loads titin sequences.
        """
        super().setUp()

Etienne MORICE's avatar
Etienne MORICE committed
        self.unit_timeout = int(os.environ.get("UNIT_TIMEOUT") or 60)

        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            self.titin_human, self.titin_mouse = Bio.SeqIO.parse(
                    Bio.Entrez.efetch(db="protein", id=["CAA62188","EDL27217"],
                        rettype="gp", retmode="text"),
            "genbank")

    def test_performance(self):
        from alignementseq import align, vec_align
        def biopython_align(seq1, seq2):
            return Bio.pairwise2.align.globalds(
                seq1.seq, seq2.seq, blosum62, -8, -8,
                one_alignment_only=True)[0]

        methods = (vec_align, align, biopython_align)
        times = pd.DataFrame(columns=("length1", "length2", "method", "time"),
                dtype=float)
        for s1, s2, filename in self.get_dataset_heads():
            for method in methods:
                def to_time():
                    method(s1, s2)
                t = timeit.Timer(to_time).timeit(1)
                times.loc[len(times)] = (
                        len(s1.seq), len(s2.seq), method.__name__, t
                        )
        times['length_product'] = times.length1 * times.length2
        print(times)
        
        ax = None
        cmap = matplotlib.cm.inferno
        colors = [ cmap(i/len(methods)) for i in range(len(methods)) ]
        for im, method in enumerate(methods):
            ax = times.loc[times.method == method.__name__].plot.scatter(
                    'length_product', 'time', c=colors[im],
                    label=method.__name__, ax=ax)
        matplotlib.pyplot.savefig("timings.png")

    def test_titin(self):
        from alignementseq import align, vec_align

        for method in vec_align, align:
            def to_time():
                method(self.titin_human, self.titin_mouse)
            def run():
                t = timeit.Timer(to_time).timeit(1)
                print("{}: {}".format(method.__name__, t))
            p = multiprocessing.Process(target=run)
            p.start()
            p.join(self.unit_timeout)
            # Note: if the task eats up your memory, it can take a while for
            # it to terminate if it times out.
            if p.is_alive():
                p.terminate()
                p.join()
                print("{}: timeout".format(method.__name__))

def load_tests(loader, standard_tests, pattern):
    return unittest.defaultTestLoader.loadTestsFromTestCase(AlignmentSeqTestCase)

if __name__ == '__main__':
    unittest.main(verbosity=2)