Newer
Older
#!/usr/bin/env python3
import unittest
import urllib.request
import os
import zipfile
import warnings
import timeit
import multiprocessing
import pandas as pd
import Bio.SeqIO
import Bio.pairwise2
from Bio.SeqRecord import SeqRecord
from Bio.SubsMat.MatrixInfo import blosum62
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#==============================================================================
import importlib
import os
import sys
import io
spec, m = None, None
def bali_score(ref_fasta_path, test_fasta_path):
"""Portable wrapper for the bali_score.py script.
Calculate BALI scores from a test and a reference multiple sequence alignment in FASTA format.
Reproduces the results of the bali_score C program.
"""
path = os.path.join("balibase", "bali_score.py")
global spec, m
if spec is None:
spec = importlib.util.spec_from_file_location("baliscore", path)
m = importlib.util.module_from_spec(spec)
tmp_argv = sys.argv
tmp_stdout = sys.stdout
buf = sys.stdout = io.StringIO()
sys.argv = [path, ref_fasta_path, test_fasta_path]
try:
spec.loader.exec_module(m)
finally:
sys.argv = tmp_argv
sys.stdout = tmp_stdout
buf.seek(0)
score = {}
for line in buf:
name, var = line.partition("=")[::2]
score[name.strip()] = float(var)
return score
#==============================================================================
class BalibaseTestCase(unittest.TestCase):
"""Base class to inherit by all test cases that need access to the balibase.
"""
def setUp(self):
balibase_zippath = "balibase.zip"
self.balibase_path = "balibase"
testfile_path = os.path.join(
"RV11.unaligned", "BBS11001.fasta")
if not os.path.isdir(self.balibase_path):
os.mkdir(self.balibase_path)
if not os.path.isfile(testfile_path):
# Archive is no longer publicly available, so this will probably
# fail. Fall back to comitting the archive.
if not os.path.isfile(balibase_zippath):
print("Fetching balibase archive from moodle...")
urllib.request.urlretrieve(
"https://moodle.polytechnique.fr/mod/resource/view.php?id=38570",
balibase_zippath)
with zipfile.ZipFile(balibase_zippath) as balibase_zip:
balibase_zip.extractall()
def get_dataset_heads(self):
"""Generator function to iterate over the first two sequences of each
unaligned fasta file.
"""
dataset_dir = os.path.join(self.balibase_path, "RV11.unaligned")
for filename in os.listdir(dataset_dir):
records = Bio.SeqIO.parse(
os.path.join(dataset_dir, filename),
"fasta")
seq1 = next(records)
seq2 = next(records)
yield seq1, seq2, filename
def get_dataset_records(self):
"""Generator function to iterate over the record generator of each
unaligned fasta file.
"""
dataset_dir = os.path.join(self.balibase_path, "RV11.unaligned")
for filename in os.listdir(dataset_dir):
records = Bio.SeqIO.parse(
os.path.join(dataset_dir, filename),
"fasta")
yield records, filename
def assertSameResidues(self, str1, str2):
"""Strip strings of their '-' before comparing them
"""
return self.assertEqual(
str(str1).translate({ord('-'):None}),
str(str2).translate({ord('-'):None})
)
def test_simple_align(self):
"""Test alignments with the simplest metric.
As there can be a huge number of
solutions, we check only that we got the right score, and one valid
alignment.
"""
from alignementseq import align
score_fn = lambda a,b : -2 if a == '' or b == '' else -1 if a != b else 1
for s1, s2, filename in self.get_dataset_heads():
score, r1, r2 = align(s1, s2, score_fn)
(ex_r1, ex_r2, exp_score, *_) = Bio.pairwise2.align.globalms(
s1.seq, s2.seq, 1, -1, -2, -2, one_alignment_only=True)[0]
try:
self.assertSameResidues(r1.seq, s1.seq)
self.assertSameResidues(r2.seq, s2.seq)
self.assertEqual(score, sum(
-2 if a == '-' or b == '-' else
-1 if a != b else
1
for a,b in zip(r1.seq, r2.seq)))
self.assertEqual(score, exp_score)
except AssertionError:
print('from', filename, ':')
print(r1.seq)
print(r2.seq)
print('')
print(ex_r1)
print(ex_r2)
raise
def test_blosum_align(self):
"""Tests alignments with blosum but no gap extension.
"""
from alignementseq import align, vec_align
for s1, s2, filename in self.get_dataset_heads():
(ex_r1, ex_r2, exp_score, *_) = Bio.pairwise2.align.globalds(
s1.seq, s2.seq, blosum62, -8, -8, one_alignment_only=True)[0]
for method in align, vec_align:
score, r1, r2 = method(s1, s2)
try:
self.assertSameResidues(r1.seq, s1.seq)
self.assertSameResidues(r2.seq, s2.seq)
self.assertEqual(score, sum(
-8 if a == '-' or b == '-' else
blosum62[a,b] if (a,b) in blosum62 else
blosum62[b,a]
for a,b in zip(r1.seq, r2.seq)))
self.assertEqual(score, exp_score)
except AssertionError:
print('from', filename, "with", method, ":")
print(r1.seq)
print(r2.seq)
print('')
print(ex_r1)
print(ex_r2)
raise
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def test_blosum_align2steps(self):
"""Tests alignments with blosum and gap extension.
"""
from alignementseq import align2steps
for s1, s2, filename in self.get_dataset_heads():
(ex_r1, ex_r2, exp_score, *_) = Bio.pairwise2.align.globaldd(
s1.seq, s2.seq, blosum62, -8, -4, -8, -4, one_alignment_only=True)[0]
score, r1, r2 = align2steps(s1, s2)
try:
self.assertSameResidues(r1.seq, s1.seq)
self.assertSameResidues(r2.seq, s2.seq)
calc_score = 0 # score calculated from r1 and r2
gap1, gap2 = False, False
for a,b in zip(r1.seq, r2.seq):
if a == '-':
if not gap1:
gap1 = True
gap2 = False # suppose 2 gaps are never aligned
calc_score -= 8
else:
calc_score -= 4
elif b == '-':
if not gap2:
gap2 = True
gap1 = False
calc_score -= 8
else:
calc_score -= 4
else:
gap1, gap2 = False, False
calc_score += blosum62[a,b] if (a,b) in blosum62 else blosum62[b,a]
self.assertEqual(score, calc_score)
self.assertEqual(score, exp_score)
except AssertionError:
print('from', filename, "with", align2steps, ":")
print(r1.seq)
print(r2.seq)
print('')
print(ex_r1)
print(ex_r2)
raise
def test_benchmark_multiple_align(self):
"""Tests the multiple_align function (using blosum and gap extension)."""
#from alignementseq import multiple_align
from Bio.Align.Applications import ClustalwCommandline
from Bio import AlignIO
import pip
print("Test-benchmark multiple_align")
import clustalo as cl
for records, filename in list(self.get_dataset_records())[:3]:
cl.runClustalO("ariane-la-fusee@laposte.net", "sp:wap_rat,sp:wap_mouse,sp:wap_pig", fmt='fasta')
# TODO: use real seq instead of rat, pig...
print("Fine", filename)
def save_alignments(self, NameFile):
from alignementseq import align2steps
for s1, s2, filename in self.get_dataset_heads():
score, r1, r2 = align2steps(s1, s2)
with open("balibase/" + str(NameFile)+'.fasta', "w") as fd:
Bio.SeqIO.write((r1, r2), fd, "fasta")
"""Performance tests, slow.
Excluded from default test suite, run it with
``python -m unittest alignmentseq_tests.PerformanceTestCase``
"""
def setUp(self):
"""Loads titin sequences.
"""
super().setUp()
self.unit_timeout = int(os.environ.get("UNIT_TIMEOUT") or 60)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
self.titin_human, self.titin_mouse = Bio.SeqIO.parse(
Bio.Entrez.efetch(db="protein", id=["CAA62188","EDL27217"],
rettype="gp", retmode="text"),
"genbank")
def test_performance(self):
from alignementseq import align, vec_align
def biopython_align(seq1, seq2):
return Bio.pairwise2.align.globalds(
seq1.seq, seq2.seq, blosum62, -8, -8,
one_alignment_only=True)[0]
methods = (vec_align, align, biopython_align)
times = pd.DataFrame(columns=("length1", "length2", "method", "time"),
dtype=float)
for s1, s2, filename in self.get_dataset_heads():
for method in methods:
def to_time():
method(s1, s2)
t = timeit.Timer(to_time).timeit(1)
times.loc[len(times)] = (
len(s1.seq), len(s2.seq), method.__name__, t
)
times['length_product'] = times.length1 * times.length2
ax = None
cmap = matplotlib.cm.inferno
colors = [ cmap(i/len(methods)) for i in range(len(methods)) ]
for im, method in enumerate(methods):
ax = times.loc[times.method == method.__name__].plot.scatter(
'length_product', 'time', c=colors[im],
label=method.__name__, ax=ax)
matplotlib.pyplot.savefig("timings.png")
def test_titin(self):
from alignementseq import align, vec_align
for method in vec_align, align:
def to_time():
method(self.titin_human, self.titin_mouse)
def run():
t = timeit.Timer(to_time).timeit(1)
print("{}: {}".format(method.__name__, t))
p = multiprocessing.Process(target=run)
p.start()
p.join(self.unit_timeout)
# Note: if the task eats up your memory, it can take a while for
# it to terminate if it times out.
if p.is_alive():
p.terminate()
p.join()
print("{}: timeout".format(method.__name__))
def load_tests(loader, standard_tests, pattern):
return unittest.defaultTestLoader.loadTestsFromTestCase(AlignmentSeqTestCase)