-
Notifications
You must be signed in to change notification settings - Fork 0
/
functions.py
132 lines (105 loc) · 3.62 KB
/
functions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import numpy as np
from Bio import SeqIO
from sklearn.model_selection import cross_val_score
from datetime import datetime as dt
from os import listdir, remove
from os.path import isfile, join, getctime
from sklearn.decomposition import TruncatedSVD, KernelPCA
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.svm import SVC
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.neural_network import MLPClassifier
######NLP functions######
def getKmers(sequence, size):
return [sequence[x:x+size].lower() for x in range(len(sequence) - size + 1)]
def make_sentence(mySeq,word_size):
words = getKmers(mySeq, size=word_size)
sentence = ' '.join(words)
return sentence
######Initial work######
#Process functions#
def process_function_taxonomy(record):
str = record.seq._data
sentence = make_sentence(str, 6)
label = record.annotations["taxonomy"][-5]
return [sentence, label]
def process_function_return_string(record):
str = record.seq._data
label = record.annotations["taxonomy"][-5]
return [str,label]
def only_seqs(record):
str = record.seq._data
return str
#Main iterations#
def get_features(records,process_function):
arr = []
for record in records:
tmp_arr = process_function(record)
arr.append(tmp_arr)
return arr
def genebank_to_numpyarr(path,process_function):
file_type = path.split(".")[1]
records = SeqIO.parse(path, file_type)
l = get_features(records,process_function)
np_arr = np.asarray(l,dtype='U')
return np_arr
def set_seq_features(infile, features):
file_type = infile.split(".")[1]
records = list(SeqIO.parse(infile, file_type))
for record,feature in zip(records,features):
record.annotations["pred"] = feature
return records
######Model######
def timer(func):
def func_wrapper(X,y,i):
t1 = dt.now()
scores = func(X,y,i)
t2 = dt.now()
delta = (t2 - t1).seconds
return delta, scores
return func_wrapper
@timer
def model(X, y,i):
clf = SVC(gamma='auto')
cv =CountVectorizer()
pca = TruncatedSVD(n_components=i)
model_transformation = Pipeline([('CountVectorizer', cv), ("pca", pca), ('svc', clf)])
scores = cross_val_score(model_transformation, X, y, cv=5).mean()
return scores
######Writing to file######
def write_to_file(out_path, array):
with open(out_path, 'w') as f:
f.write("n_components, mean_score, delta_time \n")
for i in array:
i =[str(j) for j in i]
f.write(",".join(i) + "\n")
def check_and_remove_files(file_list, log_folder):
file_list.sort(key=lambda x: x[0])
if file_list.__len__() > 5:
path = log_folder+"/"+file_list[0][1]
remove(path)
def get_path(log_folder,outfile):
time = dt.now().strftime("%d_%m_%Y__%H_%M_%S")
out_path = log_folder + "/" +outfile+ "_"+time
return out_path
def get_file_list(log_folder):
file_list= [[getctime(join(log_folder, f)),
f]
for f in listdir(log_folder) if isfile(join(log_folder, f))]
return file_list
def smart_write(log_folder,outfile , array):
file_list = get_file_list(log_folder)
check_and_remove_files(file_list, log_folder)
out_path =get_path(log_folder,outfile)
write_to_file(out_path, array)
######Classes######
class Transformer(BaseEstimator, TransformerMixin):
"""An example of classifier"""
def __init__(self, k_mer=6):
self.k_mer = k_mer
def fit(self, X, y=None):
return self
def transform(self, X):
X= [make_sentence(i, self.k_mer) for i in X]
return X