Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
B
BIOINF588
Manage
Activity
Members
Plan
Wiki
Code
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Deploy
Releases
Container Registry
Model registry
Analyze
Contributor analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Etienne MORICE
BIOINF588
Commits
b2053a14
Verified
Commit
b2053a14
authored
6 years ago
by
Etienne MORICE
Browse files
Options
Downloads
Patches
Plain Diff
Clean-up
parent
1c4855ba
No related branches found
Branches containing commit
No related tags found
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
alignementseq_tests.py
+118
-96
118 additions, 96 deletions
alignementseq_tests.py
with
118 additions
and
96 deletions
alignementseq_tests.py
+
118
−
96
View file @
b2053a14
#!/usr/bin/env python3
"""
Main test file.
Contains all top-level tests, and test utils.
"""
# Utils
import
unittest
import
urllib.request
import
os
import
sys
import
re
import
zipfile
import
warnings
import
time
import
timeit
import
importlib
import
io
# Scientific
import
multiprocessing
import
numpy
as
np
import
pandas
as
pd
...
...
@@ -16,53 +25,46 @@ import matplotlib
matplotlib
.
use
(
'
Agg
'
)
import
matplotlib.pyplot
# BioPython
import
Bio.SeqIO
import
Bio.pairwise2
import
Bio.Entrez
import
Bio.PDB
from
Bio.SeqRecord
import
SeqRecord
from
Bio.SubsMat.MatrixInfo
import
blosum62
#==============================================================================
import
importlib
import
os
import
sys
import
io
bali_spec
,
bali_m
=
None
,
None
def
bali_score
(
ref_fasta_path
,
test_fasta_path
):
"""
Portable wrapper for the bali_score.py script.
Calculate BALI scores from a test and a reference multiple sequence alignment in FASTA format.
Reproduces the results of the bali_score C program.
"""
path
=
os
.
path
.
join
(
"
balibase
"
,
"
bali_score.py
"
)
global
bali_spec
,
bali_m
if
bali_spec
is
None
:
bali_spec
=
importlib
.
util
.
spec_from_file_location
(
"
baliscore
"
,
path
)
bali_m
=
importlib
.
util
.
module_from_spec
(
bali_spec
)
tmp_argv
=
sys
.
argv
tmp_stdout
=
sys
.
stdout
buf
=
sys
.
stdout
=
io
.
StringIO
()
sys
.
argv
=
[
path
,
ref_fasta_path
,
test_fasta_path
]
try
:
bali_spec
.
loader
.
exec_module
(
bali_m
)
finally
:
sys
.
argv
=
tmp_argv
sys
.
stdout
=
tmp_stdout
buf
.
seek
(
0
)
score
=
{}
for
line
in
buf
:
name
,
var
=
line
.
partition
(
"
=
"
)[::
2
]
score
[
name
.
strip
()]
=
float
(
var
)
return
score
#==============================================================================
class
BalibaseTestCase
(
unittest
.
TestCase
):
"""
Base class to inherit by all test cases that need access to the balibase.
"""
bali_spec
,
bali_m
=
None
,
None
def
bali_score
(
self
,
ref_fasta_path
,
test_fasta_path
):
"""
Portable wrapper for the bali_score.py script.
Calculate BALI scores from a test and a reference multiple sequence alignment in FASTA format.
Reproduces the results of the bali_score C program.
"""
path
=
os
.
path
.
join
(
"
balibase
"
,
"
bali_score.py
"
)
if
self
.
bali_spec
is
None
:
self
.
bali_spec
=
importlib
.
util
.
spec_from_file_location
(
"
baliscore
"
,
path
)
self
.
bali_m
=
importlib
.
util
.
module_from_spec
(
self
.
bali_spec
)
tmp_argv
=
sys
.
argv
tmp_stdout
=
sys
.
stdout
buf
=
sys
.
stdout
=
io
.
StringIO
()
sys
.
argv
=
[
path
,
ref_fasta_path
,
test_fasta_path
]
try
:
self
.
bali_spec
.
loader
.
exec_module
(
self
.
bali_m
)
finally
:
sys
.
argv
=
tmp_argv
sys
.
stdout
=
tmp_stdout
buf
.
seek
(
0
)
score
=
{}
for
line
in
buf
:
name
,
var
=
line
.
partition
(
"
=
"
)[::
2
]
score
[
name
.
strip
()]
=
float
(
var
)
return
score
def
setUp
(
self
):
balibase_zippath
=
"
balibase.zip
"
self
.
balibase_path
=
"
balibase
"
...
...
@@ -125,16 +127,16 @@ class BalibaseTestCase(unittest.TestCase):
mostly)
"""
parser
=
Bio
.
PDB
.
MMCIFParser
()
for
s1
,
s2
,
os1
,
os2
,
name
in
self
.
get_dataset_heads
(
full
=
True
):
for
s
eq
1
,
s
eq
2
,
os
eq
1
,
os
eq
2
,
name
in
self
.
get_dataset_heads
(
full
=
True
):
if
name
in
self
.
exclude_fastas
:
warnings
.
warn
(
"
Exluding file
"
+
name
+
"
according to exclusion list
"
)
continue
skip
=
False
item
=
[
s1
,
s2
,
os1
,
os2
]
item
=
[
s
eq
1
,
s
eq
2
,
os
eq
1
,
os
eq
2
]
for
s
in
s1
,
s2
:
code
=
s
.
id
[
0
:
4
]
for
s
eq
in
s
eq
1
,
s
eq
2
:
code
=
s
eq
.
id
[
0
:
4
]
fname
=
os
.
path
.
join
(
'
data
'
,
code
+
'
.cif
'
)
# Attempt download
...
...
@@ -174,8 +176,11 @@ class BalibaseTestCase(unittest.TestCase):
yield
records
,
filename
class
AlignmentSeqTestCase
(
BalibaseTestCase
):
"""
TestCase for alignment correctness and quality
"""
def
assert
S
ame
R
esidues
(
self
,
str1
,
str2
):
def
assert
_s
ame
_r
esidues
(
self
,
str1
,
str2
):
"""
Strip strings of their
'
-
'
before comparing them
"""
return
self
.
assertEqual
(
...
...
@@ -192,25 +197,26 @@ class AlignmentSeqTestCase(BalibaseTestCase):
"""
from
alignementseq
import
align
score_fn
=
lambda
a
,
b
:
-
2
if
a
==
''
or
b
==
''
else
-
1
if
a
!=
b
else
1
aligner
=
Bio
.
pairwise2
.
align
for
s1
,
s2
,
filename
in
self
.
get_dataset_heads
():
score
,
r1
,
r2
=
align
(
s1
,
s2
,
score_fn
)
(
ex_r1
,
ex_r2
,
exp_score
,
*
_
)
=
Bio
.
pairwise2
.
align
.
globalms
(
s1
.
seq
,
s2
.
seq
,
1
,
-
1
,
-
2
,
-
2
,
one_alignment_only
=
True
)[
0
]
for
s
eq
1
,
s
eq
2
,
filename
in
self
.
get_dataset_heads
():
score
,
r
et
1
,
r
et
2
=
align
(
s
eq
1
,
s
eq
2
,
score_fn
)
(
ex_r1
,
ex_r2
,
exp_score
,
*
_
)
=
align
er
.
globalms
(
s
eq
1
.
seq
,
s
eq
2
.
seq
,
1
,
-
1
,
-
2
,
-
2
,
one_alignment_only
=
True
)[
0
]
try
:
self
.
assert
S
ame
R
esidues
(
r1
.
seq
,
s1
.
seq
)
self
.
assert
S
ame
R
esidues
(
r2
.
seq
,
s2
.
seq
)
self
.
assert
_s
ame
_r
esidues
(
r
et
1
.
seq
,
s
eq
1
.
seq
)
self
.
assert
_s
ame
_r
esidues
(
r
et
2
.
seq
,
s
eq
2
.
seq
)
self
.
assertEqual
(
score
,
sum
(
-
2
if
a
==
'
-
'
or
b
==
'
-
'
else
-
1
if
a
!=
b
else
-
1
if
a
!=
b
else
1
for
a
,
b
in
zip
(
r1
.
seq
,
r2
.
seq
)))
for
a
,
b
in
zip
(
r
et
1
.
seq
,
r
et
2
.
seq
)))
self
.
assertEqual
(
score
,
exp_score
)
except
AssertionError
:
print
(
'
from
'
,
filename
,
'
:
'
)
print
(
r1
.
seq
)
print
(
r2
.
seq
)
print
(
r
et
1
.
seq
)
print
(
r
et
2
.
seq
)
print
(
''
)
print
(
ex_r1
)
print
(
ex_r2
)
...
...
@@ -221,25 +227,26 @@ class AlignmentSeqTestCase(BalibaseTestCase):
"""
from
alignementseq
import
align
,
vec_align
aligner
=
Bio
.
pairwise2
.
align
for
s1
,
s2
,
filename
in
self
.
get_dataset_heads
():
(
ex_r1
,
ex_r2
,
exp_score
,
*
_
)
=
Bio
.
pairwise2
.
align
.
globalds
(
s1
.
seq
,
s2
.
seq
,
blosum62
,
-
8
,
-
8
,
one_alignment_only
=
True
)[
0
]
for
s
eq
1
,
s
eq
2
,
filename
in
self
.
get_dataset_heads
():
(
ex_r1
,
ex_r2
,
exp_score
,
*
_
)
=
align
er
.
globalds
(
s
eq
1
.
seq
,
s
eq
2
.
seq
,
blosum62
,
-
8
,
-
8
,
one_alignment_only
=
True
)[
0
]
for
method
in
align
,
vec_align
:
score
,
r1
,
r2
=
method
(
s1
,
s2
)
score
,
r
et
1
,
r
et
2
=
method
(
s
eq
1
,
s
eq
2
)
try
:
self
.
assert
S
ame
R
esidues
(
r1
.
seq
,
s1
.
seq
)
self
.
assert
S
ame
R
esidues
(
r2
.
seq
,
s2
.
seq
)
self
.
assert
_s
ame
_r
esidues
(
r
et
1
.
seq
,
s
eq
1
.
seq
)
self
.
assert
_s
ame
_r
esidues
(
r
et
2
.
seq
,
s
eq
2
.
seq
)
self
.
assertEqual
(
score
,
sum
(
-
8
if
a
==
'
-
'
or
b
==
'
-
'
else
blosum62
[
a
,
b
]
if
(
a
,
b
)
in
blosum62
else
blosum62
[
b
,
a
]
for
a
,
b
in
zip
(
r1
.
seq
,
r2
.
seq
)))
for
a
,
b
in
zip
(
r
et
1
.
seq
,
r
et
2
.
seq
)))
self
.
assertEqual
(
score
,
exp_score
)
except
AssertionError
:
print
(
'
from
'
,
filename
,
"
with
"
,
method
,
"
:
"
)
print
(
r1
.
seq
)
print
(
r2
.
seq
)
print
(
r
et
1
.
seq
)
print
(
r
et
2
.
seq
)
print
(
''
)
print
(
ex_r1
)
print
(
ex_r2
)
...
...
@@ -249,17 +256,18 @@ class AlignmentSeqTestCase(BalibaseTestCase):
"""
Tests alignments with blosum and gap extension.
"""
from
alignementseq
import
align2steps
aligner
=
Bio
.
pairwise2
.
align
for
s1
,
s2
,
filename
in
self
.
get_dataset_heads
():
(
ex_r1
,
ex_r2
,
exp_score
,
*
_
)
=
Bio
.
pairwise2
.
align
.
globaldd
(
s1
.
seq
,
s2
.
seq
,
blosum62
,
-
8
,
-
4
,
-
8
,
-
4
,
one_alignment_only
=
True
)[
0
]
score
,
r1
,
r2
=
align2steps
(
s1
,
s2
)
for
s
eq
1
,
s
eq
2
,
filename
in
self
.
get_dataset_heads
():
(
ex_r
et
1
,
ex_r
et
2
,
exp_score
,
*
_
)
=
align
er
.
globaldd
(
s
eq
1
.
seq
,
s
eq
2
.
seq
,
blosum62
,
-
8
,
-
4
,
-
8
,
-
4
,
one_alignment_only
=
True
)[
0
]
score
,
r
et
1
,
r
et
2
=
align2steps
(
s
eq
1
,
s
eq
2
)
try
:
self
.
assert
S
ame
R
esidues
(
r1
.
seq
,
s1
.
seq
)
self
.
assert
S
ame
R
esidues
(
r2
.
seq
,
s2
.
seq
)
calc_score
=
0
# score calculated from r1 and r2
self
.
assert
_s
ame
_r
esidues
(
r
et
1
.
seq
,
s
eq
1
.
seq
)
self
.
assert
_s
ame
_r
esidues
(
r
et
2
.
seq
,
s
eq
2
.
seq
)
calc_score
=
0
# score calculated from r
et
1 and r
et
2
gap1
,
gap2
=
False
,
False
for
a
,
b
in
zip
(
r1
.
seq
,
r2
.
seq
):
for
a
,
b
in
zip
(
r
et
1
.
seq
,
r
et
2
.
seq
):
if
a
==
'
-
'
:
if
not
gap1
:
gap1
=
True
...
...
@@ -281,11 +289,11 @@ class AlignmentSeqTestCase(BalibaseTestCase):
self
.
assertEqual
(
score
,
exp_score
)
except
AssertionError
:
print
(
'
from
'
,
filename
,
"
with
"
,
align2steps
,
"
:
"
)
print
(
r1
.
seq
)
print
(
r2
.
seq
)
print
(
r
et
1
.
seq
)
print
(
r
et
2
.
seq
)
print
(
''
)
print
(
ex_r1
)
print
(
ex_r2
)
print
(
ex_r
et
1
)
print
(
ex_r
et
2
)
raise
def
create_clustalo_alignments
(
self
):
...
...
@@ -307,7 +315,7 @@ class AlignmentSeqTestCase(BalibaseTestCase):
outfilename
=
os
.
path
.
join
(
out_dir
,
filename
)
if
os
.
path
.
isfile
(
outfilename
):
continue
def
run
():
def
run
(
filename
=
filename
,
outfilename
=
outfilename
):
cl
.
runClustalO
(
"
jane.doe@uni.org
"
,
os
.
path
.
join
(
dataset_dir
,
filename
),
...
...
@@ -320,6 +328,18 @@ class AlignmentSeqTestCase(BalibaseTestCase):
for
j
in
jobs
:
j
.
join
()
def
create_alignments_with
(
self
,
method
,
method_name
=
None
):
"""
Create fasta files with aligned sequences using the given method, under
balibase_path/<method_name>
"""
dataset_dir
=
os
.
path
.
join
(
self
.
balibase_path
,
"
RV11.unaligned
"
)
out_dir
=
os
.
path
.
join
(
self
.
balibase_path
,
method_name
or
method
.
__name__
)
for
in_filename
in
os
.
listdir
(
dataset_dir
):
sequences
=
list
(
Bio
.
SeqIO
.
parse
(
in_filename
,
"
fasta
"
))
def
test_benchmark_multiple_align
(
self
):
"""
Tests the multiple_align function (using blosum and gap extension).
"""
...
...
@@ -335,7 +355,7 @@ class AlignmentSeqTestCase(BalibaseTestCase):
scores
=
[]
for
filename
in
os
.
listdir
(
ref_dataset_dir
):
scores
.
append
(
bali_score
(
scores
.
append
(
self
.
bali_score
(
os
.
path
.
join
(
ref_dataset_dir
,
filename
),
os
.
path
.
join
(
clustalo_dir
,
filename
)
))
...
...
@@ -349,9 +369,9 @@ class AlignmentSeqTestCase(BalibaseTestCase):
"""
from
alignementseq
import
vec_align
,
align_dihedrals
score_fn
=
lambda
a
,
b
:
0
if
a
==
''
or
b
==
''
else
0
if
a
!=
b
else
1
for
seq1
,
seq2
,
oseq1
,
oseq2
,
struct1
,
struct2
,
name
in
self
.
get_datasets_heads_with_struct
():
del
seq1
,
seq2
,
name
skip
=
False
angles1
=
[]
angles2
=
[]
...
...
@@ -366,7 +386,7 @@ class AlignmentSeqTestCase(BalibaseTestCase):
key
=
(
lambda
t
:
t
[
1
])
)
ident
=
match
/
len
(
seq
.
seq
)
if
(
ident
<
0.95
)
:
if
ident
<
0.95
:
warnings
.
warn
(
"
No chain found with more than 0.95 identity
"
,
"
(best was
"
,
ident
,
"
), skipping this file.
"
)
skip
=
True
...
...
@@ -394,15 +414,6 @@ class AlignmentSeqTestCase(BalibaseTestCase):
continue
align_dihedrals
(
oseq1
,
oseq2
,
angles1
,
angles2
)
def
save_alignments
(
self
,
NameFile
):
from
alignementseq
import
align2steps
for
s1
,
s2
,
filename
in
self
.
get_dataset_heads
():
score
,
r1
,
r2
=
align2steps
(
s1
,
s2
)
with
open
(
"
balibase/
"
+
str
(
NameFile
)
+
'
.fasta
'
,
"
w
"
)
as
fd
:
Bio
.
SeqIO
.
write
((
r1
,
r2
),
fd
,
"
fasta
"
)
class
PerformanceTestCase
(
BalibaseTestCase
):
"""
Performance tests, slow.
...
...
@@ -426,9 +437,12 @@ class PerformanceTestCase(BalibaseTestCase):
"
genbank
"
)
def
test_performance
(
self
):
"""
Create timing graph comparing speed of simple alignment methods
"""
from
alignementseq
import
align
,
vec_align
aligner
=
Bio
.
pairwise2
.
align
def
biopython_align
(
seq1
,
seq2
):
return
Bio
.
pairwise2
.
align
.
globalds
(
return
align
er
.
globalds
(
seq1
.
seq
,
seq2
.
seq
,
blosum62
,
-
8
,
-
8
,
one_alignment_only
=
True
)[
0
]
...
...
@@ -436,19 +450,21 @@ class PerformanceTestCase(BalibaseTestCase):
methods
=
(
vec_align
,
align
,
biopython_align
)
times
=
pd
.
DataFrame
(
columns
=
(
"
length1
"
,
"
length2
"
,
"
method
"
,
"
time
"
),
dtype
=
float
)
for
s1
,
s2
,
filename
in
self
.
get_dataset_heads
():
for
seq1
,
seq2
,
filename
in
self
.
get_dataset_heads
():
del
filename
for
method
in
methods
:
def
to_time
():
method
(
s1
,
s2
)
t
=
timeit
.
Timer
(
to_time
).
timeit
(
1
)
def
to_time
(
method
=
method
,
seq1
=
seq1
,
seq2
=
seq2
):
method
(
s
eq
1
,
s
eq
2
)
t
ime_elapsed
=
timeit
.
Timer
(
to_time
).
timeit
(
1
)
times
.
loc
[
len
(
times
)]
=
(
len
(
s1
.
seq
),
len
(
s2
.
seq
),
method
.
__name__
,
t
len
(
seq1
.
seq
),
len
(
seq2
.
seq
),
method
.
__name__
,
time_elapsed
)
times
[
'
length_product
'
]
=
times
.
length1
*
times
.
length2
print
(
times
)
ax
=
None
cmap
=
matplotlib
.
cm
.
inferno
cmap
=
matplotlib
.
cm
.
get_cmap
(
'
inferno
'
)
colors
=
[
cmap
(
i
/
len
(
methods
))
for
i
in
range
(
len
(
methods
))
]
for
im
,
method
in
enumerate
(
methods
):
ax
=
times
.
loc
[
times
.
method
==
method
.
__name__
].
plot
.
scatter
(
...
...
@@ -457,12 +473,16 @@ class PerformanceTestCase(BalibaseTestCase):
matplotlib
.
pyplot
.
savefig
(
"
timings.png
"
)
def
test_titin
(
self
):
"""
Test speed of alignment for largest human protein (titin).
Warning, this may be slow or eat up your memory.
"""
from
alignementseq
import
align
,
vec_align
for
method
in
vec_align
,
align
:
def
to_time
():
def
to_time
(
method
=
method
):
method
(
self
.
titin_human
,
self
.
titin_mouse
)
def
run
():
def
run
(
method
=
method
):
t
=
timeit
.
Timer
(
to_time
).
timeit
(
1
)
print
(
"
{}: {}
"
.
format
(
method
.
__name__
,
t
))
p
=
multiprocessing
.
Process
(
target
=
run
)
...
...
@@ -476,8 +496,10 @@ class PerformanceTestCase(BalibaseTestCase):
print
(
"
{}: timeout
"
.
format
(
method
.
__name__
))
def
load_tests
(
loader
,
standard_tests
,
pattern
):
"""
Tests to be executed when testing the module as a whole
"""
del
loader
,
standard_tests
,
pattern
return
unittest
.
defaultTestLoader
.
loadTestsFromTestCase
(
AlignmentSeqTestCase
)
if
__name__
==
'
__main__
'
:
unittest
.
main
(
verbosity
=
2
)
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment