Newer
Older
#!/usr/bin/env python3
import unittest
import urllib.request
import os
import zipfile
import warnings
import timeit
import multiprocessing
import pandas as pd
import Bio.SeqIO
import Bio.pairwise2
from Bio.SeqRecord import SeqRecord
from Bio.SubsMat.MatrixInfo import blosum62
class AlignmentSeqTestCase(unittest.TestCase):
def setUp(self):
self.unit_timeout = int(os.environ.get("UNIT_TIMEOUT") or 60)
balibase_zippath = "balibase.zip"
self.balibase_path = "balibase"
testfile_path = os.path.join(
"RV11.unaligned", "BBS11001.fasta")
if not os.path.isdir(self.balibase_path):
os.mkdir(self.balibase_path)
if not os.path.isfile(testfile_path):
if not os.path.isfile(balibase_zippath):
print("Fetching balibase archive from moodle...")
urllib.request.urlretrieve(
"https://moodle.polytechnique.fr/mod/resource/view.php?id=38570",
balibase_zippath)
with zipfile.ZipFile(balibase_zippath) as balibase_zip:
balibase_zip.extractall()
def get_dataset_heads(self):
"""Generator function to iterate over the first two sequences of each
unaligned fasta file.
"""
dataset_dir = os.path.join(self.balibase_path, "RV11.unaligned")
for filename in os.listdir(dataset_dir):
records = Bio.SeqIO.parse(
os.path.join(dataset_dir, filename),
"fasta")
seq1 = next(records)
seq2 = next(records)
yield seq1, seq2, filename
def assertSameResidues(self, str1, str2):
"""Strip strings of their '-' before comparing them
"""
return self.assertEqual(
str(str1).translate({ord('-'):None}),
str(str2).translate({ord('-'):None})
)
"""Test alignments with the simplest metric.
As there can be a huge number of
solutions, we check only that we got the right score, and one valid
alignment.
"""
from alignementseq import align
score_fn = lambda a,b : -2 if a == '' or b == '' else -1 if a != b else 1
for s1, s2, filename in self.get_dataset_heads():
score, r1, r2 = align(s1, s2, score_fn)
(ex_r1, ex_r2, exp_score, *_) = Bio.pairwise2.align.globalms(
s1.seq, s2.seq, 1, -1, -2, -2, one_alignment_only=True)[0]
try:
self.assertSameResidues(r1.seq, s1.seq)
self.assertSameResidues(r2.seq, s2.seq)
self.assertEqual(score, sum(
-2 if a == '-' or b == '-' else
-1 if a != b else
1
for a,b in zip(r1.seq, r2.seq)))
self.assertEqual(score, exp_score)
except AssertionError:
print('from', filename, ':')
print(r1.seq)
print(r2.seq)
print('')
print(ex_r1)
print(ex_r2)
raise
def test_blosum_align(self):
"""Tests alignments with blosum but no gap extension.
"""
from alignementseq import align, vec_align
for s1, s2, filename in self.get_dataset_heads():
(ex_r1, ex_r2, exp_score, *_) = Bio.pairwise2.align.globalds(
s1.seq, s2.seq, blosum62, -8, -8, one_alignment_only=True)[0]
for method in align, vec_align:
score, r1, r2 = method(s1, s2)
try:
self.assertSameResidues(r1.seq, s1.seq)
self.assertSameResidues(r2.seq, s2.seq)
self.assertEqual(score, sum(
-8 if a == '-' or b == '-' else
blosum62[a,b] if (a,b) in blosum62 else
blosum62[b,a]
for a,b in zip(r1.seq, r2.seq)))
self.assertEqual(score, exp_score)
except AssertionError:
print('from', filename, "with", method, ":")
print(r1.seq)
print(r2.seq)
print('')
print(ex_r1)
print(ex_r2)
raise
class PerformanceTestCase(AlignmentSeqTestCase):
"""Performance tests, slow.
Excluded from default test suite, run it with
``python -m unittest alignmentseq_tests.PerformanceTestCase``
"""
def setUp(self):
"""Loads titin sequences.
"""
super().setUp()
with warnings.catch_warnings():
warnings.simplefilter("ignore")
self.titin_human, self.titin_mouse = Bio.SeqIO.parse(
Bio.Entrez.efetch(db="protein", id=["CAA62188","EDL27217"],
rettype="gp", retmode="text"),
"genbank")
def test_performance(self):
from alignementseq import align, vec_align
methods = (vec_align, align)
times = pd.DataFrame(columns=("length1", "length2", "method", "time"),
dtype=float)
for s1, s2, filename in self.get_dataset_heads():
for method in methods:
def to_time():
method(s1, s2)
t = timeit.Timer(to_time).timeit(1)
times.loc[len(times)] = (
len(s1.seq), len(s2.seq), method.__name__, t
)
times['length_product'] = times.length1 * times.length2
ax = None
cmap = matplotlib.cm.inferno
colors = [ cmap(i/2) for i in range(2) ]
for im, method in enumerate(methods):
ax = times.loc[times.method == method.__name__].plot.scatter(
'length_product', 'time', c=colors[im],
label=method.__name__, ax=ax)
matplotlib.pyplot.savefig("timings.png")
def test_titin(self):
from alignementseq import align, vec_align
for method in vec_align, align:
def to_time():
method(self.titin_human, self.titin_mouse)
def run():
t = timeit.Timer(to_time).timeit(1)
print("{}: {}".format(method.__name__, t))
p = multiprocessing.Process(target=run)
p.start()
p.join(self.unit_timeout)
# Note: if the task eats up your memory, it can take a while for
# it to terminate if it times out.
if p.is_alive():
p.terminate()
p.join()
print("{}: timeout".format(method.__name__))
def load_tests(loader, standard_tests, pattern):
return unittest.defaultTestLoader.loadTestsFromTestCase(AlignmentSeqTestCase)