-
Notifications
You must be signed in to change notification settings - Fork 9
/
data_utils.py
247 lines (204 loc) · 7.69 KB
/
data_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import codecs
from tensorflow.python.platform import gfile
# Special vocabulary symbols - we always put them at the start.
_PAD = "_PAD"
_GO = "_GO"
_EOS = "_EOS"
_UNK = "_UNK"
_START_VOCAB = [_PAD, _GO, _EOS, _UNK]
PAD_ID = 0
GO_ID = 1
EOS_ID = 2
UNK_ID = 3
def create_vocabulary(data):
"""Create vocabulary from input data.
Input data is assumed to contain one word per line.
Args:
data: word list that will be used to create vocabulary.
Rerurn:
vocab: vocabulary dictionary. In this dictionary keys are symbols
and values are their indexes.
"""
vocab = {}
for line in data:
for item in line:
if item in vocab:
vocab[item] += 1
else:
vocab[item] = 1
vocab_list = _START_VOCAB + sorted(vocab)
vocab = dict([(x, y) for (y, x) in enumerate(vocab_list)])
return vocab
def save_vocabulary(vocab, vocabulary_path):
"""Save vocabulary file in vocabulary_path.
We write vocabulary to vocabulary_path in a one-token-per-line format,
so that later token in the first line gets id=0, second line gets id=1,
and so on.
Args:
vocab: vocabulary dictionary.
vocabulary_path: path where the vocabulary will be created.
"""
print("Creating vocabulary %s" % (vocabulary_path))
with codecs.open(vocabulary_path, "w", "utf-8") as vocab_file:
for symbol in sorted(vocab, key=vocab.get):
vocab_file.write(symbol + '\n')
def load_vocabulary(vocabulary_path, reverse=False):
"""Load vocabulary from file.
We assume the vocabulary is stored one-item-per-line, so a file:
d
c
will result in a vocabulary {"d": 0, "c": 1}, and this function may
also return the reversed-vocabulary [0, 1].
Args:
vocabulary_path: path to the file containing the vocabulary.
reverse: flag managing what type of vocabulary to return.
Returns:
the vocabulary (a dictionary mapping string to integers), or
if set reverse to True the reversed vocabulary (a list, which reverses
the vocabulary mapping).
Raises:
ValueError: if the provided vocabulary_path does not exist.
"""
rev_vocab = []
with codecs.open(vocabulary_path, "r", "utf-8") as vocab_file:
rev_vocab.extend(vocab_file.readlines())
rev_vocab = [line.strip() for line in rev_vocab]
if reverse:
return rev_vocab
else:
return dict([(x, y) for (y, x) in enumerate(rev_vocab)])
def save_params(num_layers, size, model_dir):
"""Save model parameters in model_dir directory.
Returns:
num_layers: Number of layers in the model;
size: Size of each model layer.
"""
# Save model's architecture
with open(os.path.join(model_dir, "model.params"), 'w') as param_file:
param_file.write("num_layers:" + str(num_layers) + "\n")
param_file.write("size:" + str(size))
def load_params(model_path):
"""Load parameters from 'model.params' file.
Returns:
num_layers: Number of layers in the model;
size: Size of each model layer.
"""
# Checking model's architecture for decode processes.
if gfile.Exists(os.path.join(model_path, "model.params")):
with open(os.path.join(model_path, "model.params")) as f:
params = f.readlines()
for line in params:
split_line = line.strip().split(":")
if split_line[0] == "num_layers":
num_layers = int(split_line[1])
if split_line[0] == "size":
size = int(split_line[1])
return num_layers, size
def symbols_to_ids(symbols, vocab):
"""Turn symbols into ids sequence using given vocabulary file.
Args:
symbols: input symbols sequence;
vocab: vocabulary (a dictionary mapping string to integers).
Returns:
ids: output sequence of ids.
"""
ids = [vocab.get(s, UNK_ID) for s in symbols]
return ids
def split_to_grapheme_phoneme(inp_dictionary):
"""Split input dictionary into two separate lists with graphemes and phonemes.
Args:
inp_dictionary: input dictionary.
"""
graphemes, phonemes = [], []
for line in inp_dictionary:
split_line = line.strip().split()
if len(split_line) > 1:
graphemes.append(list(split_line[0]))
phonemes.append(split_line[1:])
return graphemes, phonemes
def collect_pronunciations(dic_lines):
'''Create dictionary mapping word to its different pronounciations.
'''
dic = {}
for line in dic_lines:
lst = line.strip().split()
if len(lst) > 1:
if lst[0] not in dic:
dic[lst[0]] = [" ".join(lst[1:])]
else:
dic[lst[0]].append(" ".join(lst[1:]))
elif len(lst) == 1:
print("WARNING: No phonemes for word '%s' line ignored" % (lst[0]))
return dic
def split_dictionary(train_path, valid_path=None, test_path=None):
"""Split source dictionary to train, validation and test sets.
"""
with codecs.open(train_path, "r", "utf-8") as f:
source_dic = f.readlines()
train_dic, valid_dic, test_dic = [], [], []
if valid_path:
with codecs.open(valid_path, "r", "utf-8") as f:
valid_dic = f.readlines()
if test_path:
with codecs.open(test_path, "r", "utf-8") as f:
test_dic = f.readlines()
dic = collect_pronunciations(source_dic)
# Split dictionary to train, validation and test (if not assigned).
for i, word in enumerate(dic):
for pronunciations in dic[word]:
if i % 20 == 0 and not valid_path:
valid_dic.append(word + ' ' + pronunciations)
elif (i % 20 == 1 or i % 20 == 2) and not test_path:
test_dic.append(word + ' ' + pronunciations)
else:
train_dic.append(word + ' ' + pronunciations)
return train_dic, valid_dic, test_dic
def prepare_g2p_data(model_dir, train_path, valid_path, test_path):
"""Create vocabularies into model_dir, create ids data lists.
Args:
model_dir: directory in which the data sets will be stored;
train_path: path to training dictionary;
valid_path: path to validation dictionary;
test_path: path to test dictionary.
Returns:
A tuple of 6 elements:
(1) Sequence of ids for Grapheme training data-set,
(2) Sequence of ids for Phoneme training data-set,
(3) Sequence of ids for Grapheme development data-set,
(4) Sequence of ids for Phoneme development data-set,
(5) Grapheme vocabulary,
(6) Phoneme vocabulary.
"""
# Create train, validation and test sets.
train_dic, valid_dic, test_dic = split_dictionary(train_path, valid_path,
test_path)
# Split dictionaries into two separate lists with graphemes and phonemes.
train_gr, train_ph = split_to_grapheme_phoneme(train_dic)
valid_gr, valid_ph = split_to_grapheme_phoneme(valid_dic)
# Load/Create vocabularies.
if (model_dir
and os.path.exists(os.path.join(model_dir, "vocab.grapheme"))
and os.path.exists(os.path.join(model_dir, "vocab.phoneme"))):
print("Loading vocabularies from %s" %model_dir)
ph_vocab = load_vocabulary(os.path.join(model_dir, "vocab.phoneme"))
gr_vocab = load_vocabulary(os.path.join(model_dir, "vocab.grapheme"))
else:
ph_vocab = create_vocabulary(train_ph)
gr_vocab = create_vocabulary(train_gr)
if model_dir:
os.makedirs(model_dir)
save_vocabulary(ph_vocab, os.path.join(model_dir, "vocab.phoneme"))
save_vocabulary(gr_vocab, os.path.join(model_dir, "vocab.grapheme"))
# Create ids for the training data.
train_ph_ids = [symbols_to_ids(line, ph_vocab) for line in train_ph]
train_gr_ids = [symbols_to_ids(line, gr_vocab) for line in train_gr]
valid_ph_ids = [symbols_to_ids(line, ph_vocab) for line in valid_ph]
valid_gr_ids = [symbols_to_ids(line, gr_vocab) for line in valid_gr]
return (train_gr_ids, train_ph_ids,
valid_gr_ids, valid_ph_ids,
gr_vocab, ph_vocab,
test_dic)