forked from guillaume-chevalier/seq2seq-signal-prediction
-
Notifications
You must be signed in to change notification settings - Fork 0
/
datasets.py
256 lines (206 loc) · 7.81 KB
/
datasets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
import numpy as np
import requests
import random
import math
__author__ = "Guillaume Chevalier"
__license__ = "MIT"
__version__ = "2017-03"
def generate_x_y_data_v1(isTrain, batch_size):
"""
Data for exercise 1.
returns: tuple (X, Y)
X is a sine and a cosine from 0.0*pi to 1.5*pi
Y is a sine and a cosine from 1.5*pi to 3.0*pi
Therefore, Y follows X. There is also a random offset
commonly applied to X an Y.
The returned arrays are of shape:
(seq_length, batch_size, output_dim)
Therefore: (10, batch_size, 2)
For this exercise, let's ignore the "isTrain"
argument and test on the same data.
"""
seq_length = 10
batch_x = []
batch_y = []
for _ in range(batch_size):
rand = random.random() * 2 * math.pi
sig1 = np.sin(np.linspace(0.0 * math.pi + rand,
3.0 * math.pi + rand, seq_length * 2))
sig2 = np.cos(np.linspace(0.0 * math.pi + rand,
3.0 * math.pi + rand, seq_length * 2))
x1 = sig1[:seq_length]
y1 = sig1[seq_length:]
x2 = sig2[:seq_length]
y2 = sig2[seq_length:]
x_ = np.array([x1, x2])
y_ = np.array([y1, y2])
x_, y_ = x_.T, y_.T
batch_x.append(x_)
batch_y.append(y_)
batch_x = np.array(batch_x)
batch_y = np.array(batch_y)
# shape: (batch_size, seq_length, output_dim)
batch_x = np.array(batch_x).transpose((1, 0, 2))
batch_y = np.array(batch_y).transpose((1, 0, 2))
# shape: (seq_length, batch_size, output_dim)
return batch_x, batch_y
def generate_x_y_data_two_freqs(isTrain, batch_size, seq_length):
batch_x = []
batch_y = []
for _ in range(batch_size):
offset_rand = random.random() * 2 * math.pi
freq_rand = (random.random() - 0.5) / 1.5 * 15 + 0.5
amp_rand = random.random() + 0.1
sig1 = amp_rand * np.sin(np.linspace(
seq_length / 15.0 * freq_rand * 0.0 * math.pi + offset_rand,
seq_length / 15.0 * freq_rand * 3.0 * math.pi + offset_rand,
seq_length * 2
)
)
offset_rand = random.random() * 2 * math.pi
freq_rand = (random.random() - 0.5) / 1.5 * 15 + 0.5
amp_rand = random.random() * 1.2
sig1 = amp_rand * np.cos(np.linspace(
seq_length / 15.0 * freq_rand * 0.0 * math.pi + offset_rand,
seq_length / 15.0 * freq_rand * 3.0 * math.pi + offset_rand,
seq_length * 2
)
) + sig1
x1 = sig1[:seq_length]
y1 = sig1[seq_length:]
x_ = np.array([x1])
y_ = np.array([y1])
x_, y_ = x_.T, y_.T
batch_x.append(x_)
batch_y.append(y_)
batch_x = np.array(batch_x)
batch_y = np.array(batch_y)
# shape: (batch_size, seq_length, output_dim)
batch_x = np.array(batch_x).transpose((1, 0, 2))
batch_y = np.array(batch_y).transpose((1, 0, 2))
# shape: (seq_length, batch_size, output_dim)
return batch_x, batch_y
def generate_x_y_data_v2(isTrain, batch_size):
"""
Similar the the "v1" function, but here we generate a signal with
2 frequencies chosen randomly - and this for the 2 signals. Plus,
the lenght of the examples is of 15 rather than 10.
So we have 30 total values for past and future.
"""
return generate_x_y_data_two_freqs(isTrain, batch_size, seq_length=15)
def generate_x_y_data_v3(isTrain, batch_size):
"""
Similar to the "v2" function, but here we generate a signal
with noise in the X values. Plus,
the lenght of the examples is of 30 rather than 10.
So we have 60 total values for past and future.
"""
seq_length = 30
x, y = generate_x_y_data_two_freqs(
isTrain, batch_size, seq_length=seq_length)
noise_amount = random.random() * 0.15 + 0.10
x = x + noise_amount * np.random.randn(seq_length, batch_size, 1)
avg = np.average(x)
std = np.std(x) + 0.0001
x = x - avg
y = y - avg
x = x / std / 2.5
y = y / std / 2.5
return x, y
def loadCurrency(curr, window_size):
"""
Return the historical data for the USD or EUR bitcoin value. Is done with an web API call.
curr = "USD" | "EUR"
"""
# For more info on the URL call, it is inspired by :
# https://github.com/Levino/coindesk-api-node
r = requests.get(
"http://api.coindesk.com/v1/bpi/historical/close.json?start=2010-07-17&end=2017-03-03¤cy={}".format(
curr
)
)
data = r.json()
time_to_values = sorted(data["bpi"].items())
values = [val for key, val in time_to_values]
kept_values = values[1000:]
X = []
Y = []
for i in range(len(kept_values) - window_size * 2):
X.append(kept_values[i:i + window_size])
Y.append(kept_values[i + window_size:i + window_size * 2])
# To be able to concat on inner dimension later on:
X = np.expand_dims(X, axis=2)
Y = np.expand_dims(Y, axis=2)
return X, Y
def normalize(X, Y=None):
"""
Normalise X and Y according to the mean and standard deviation of the X values only.
"""
# # It would be possible to normalize with last rather than mean, such as:
# lasts = np.expand_dims(X[:, -1, :], axis=1)
# assert (lasts[:, :] == X[:, -1, :]).all(), "{}, {}, {}. {}".format(lasts[:, :].shape, X[:, -1, :].shape, lasts[:, :], X[:, -1, :])
mean = np.expand_dims(np.average(X, axis=1) + 0.00001, axis=1)
stddev = np.expand_dims(np.std(X, axis=1) + 0.00001, axis=1)
# print (mean.shape, stddev.shape)
# print (X.shape, Y.shape)
X = X - mean
X = X / (2.5 * stddev)
if Y is not None:
assert Y.shape == X.shape, (Y.shape, X.shape)
Y = Y - mean
Y = Y / (2.5 * stddev)
return X, Y
return X
def fetch_batch_size_random(X, Y, batch_size):
"""
Returns randomly an aligned batch_size of X and Y among all examples.
The external dimension of X and Y must be the batch size (eg: 1 column = 1 example).
X and Y can be N-dimensional.
"""
assert X.shape == Y.shape, (X.shape, Y.shape)
idxes = np.random.randint(X.shape[0], size=batch_size)
X_out = np.array(X[idxes]).transpose((1, 0, 2))
Y_out = np.array(Y[idxes]).transpose((1, 0, 2))
return X_out, Y_out
X_train = []
Y_train = []
X_test = []
Y_test = []
def generate_x_y_data_v4(isTrain, batch_size):
"""
Return financial data for the bitcoin.
Features are USD and EUR, in the internal dimension.
We normalize X and Y data according to the X only to not
spoil the predictions we ask for.
For every window (window or seq_length), Y is the prediction following X.
Train and test data are separated according to the 80/20 rule.
Therefore, the 20 percent of the test data are the most
recent historical bitcoin values. Every example in X contains
40 points of USD and then EUR data in the feature axis/dimension.
It is to be noted that the returned X and Y has the same shape
and are in a tuple.
"""
# 40 pas values for encoder, 40 after for decoder's predictions.
seq_length = 40
global Y_train
global X_train
global X_test
global Y_test
# First load, with memoization:
if len(Y_test) == 0:
# API call:
X_usd, Y_usd = loadCurrency("USD", window_size=seq_length)
X_eur, Y_eur = loadCurrency("EUR", window_size=seq_length)
# All data, aligned:
X = np.concatenate((X_usd, X_eur), axis=2)
Y = np.concatenate((Y_usd, Y_eur), axis=2)
X, Y = normalize(X, Y)
# Split 80-20:
X_train = X[:int(len(X) * 0.8)]
Y_train = Y[:int(len(Y) * 0.8)]
X_test = X[int(len(X) * 0.8):]
Y_test = Y[int(len(Y) * 0.8):]
if isTrain:
return fetch_batch_size_random(X_train, Y_train, batch_size)
else:
return fetch_batch_size_random(X_test, Y_test, batch_size)