-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathstock_prediction.py
167 lines (143 loc) · 7.17 KB
/
stock_prediction.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Bidirectional
from sklearn import preprocessing
from sklearn.model_selection import train_test_split
from yahoo_fin import stock_info as si
from collections import deque
import numpy as np
import pandas as pd
import random
# set seed, so we can get the same results after rerunning several times
np.random.seed(314)
tf.random.set_seed(314)
random.seed(314)
def shuffle_in_unison(a, b):
# shuffle two arrays in the same way
state = np.random.get_state()
np.random.shuffle(a)
np.random.set_state(state)
np.random.shuffle(b)
def load_data(ticker, n_steps=50, scale=True, shuffle=True, lookup_step=1, split_by_date=True,
test_size=0.2, feature_columns=['adjclose', 'volume', 'open', 'high', 'low']):
"""
Loads data from Yahoo Finance source, as well as scaling, shuffling, normalizing and splitting.
Params:
ticker (str/pd.DataFrame): the ticker you want to load, examples include AAPL, TESL, etc.
n_steps (int): the historical sequence length (i.e window size) used to predict, default is 50
scale (bool): whether to scale prices from 0 to 1, default is True
shuffle (bool): whether to shuffle the dataset (both training & testing), default is True
lookup_step (int): the future lookup step to predict, default is 1 (e.g next day)
split_by_date (bool): whether we split the dataset into training/testing by date, setting it
to False will split datasets in a random way
test_size (float): ratio for test data, default is 0.2 (20% testing data)
feature_columns (list): the list of features to use to feed into the model, default is everything grabbed from yahoo_fin
"""
# see if ticker is already a loaded stock from yahoo finance
if isinstance(ticker, str):
# load it from yahoo_fin library
df = si.get_data(ticker)
elif isinstance(ticker, pd.DataFrame):
# already loaded, use it directly
df = ticker
else:
raise TypeError("ticker can be either a str or a `pd.DataFrame` instances")
# this will contain all the elements we want to return from this function
result = {}
# we will also return the original dataframe itself
result['df'] = df.copy()
# make sure that the passed feature_columns exist in the dataframe
for col in feature_columns:
assert col in df.columns, f"'{col}' does not exist in the dataframe."
# add date as a column
if "date" not in df.columns:
df["date"] = df.index
if scale:
column_scaler = {}
# scale the data (prices) from 0 to 1
for column in feature_columns:
scaler = preprocessing.MinMaxScaler()
df[column] = scaler.fit_transform(np.expand_dims(df[column].values, axis=1))
column_scaler[column] = scaler
# add the MinMaxScaler instances to the result returned
result["column_scaler"] = column_scaler
# add the target column (label) by shifting by `lookup_step`
df['future'] = df['adjclose'].shift(-lookup_step)
# last `lookup_step` columns contains NaN in future column
# get them before droping NaNs
last_sequence = np.array(df[feature_columns].tail(lookup_step))
# drop NaNs
df.dropna(inplace=True)
sequence_data = []
sequences = deque(maxlen=n_steps)
for entry, target in zip(df[feature_columns + ["date"]].values, df['future'].values):
sequences.append(entry)
if len(sequences) == n_steps:
sequence_data.append([np.array(sequences), target])
# get the last sequence by appending the last `n_step` sequence with `lookup_step` sequence
# for instance, if n_steps=50 and lookup_step=10, last_sequence should be of 60 (that is 50+10) length
# this last_sequence will be used to predict future stock prices that are not available in the dataset
last_sequence = list([s[:len(feature_columns)] for s in sequences]) + list(last_sequence)
last_sequence = np.array(last_sequence).astype(np.float32)
# add to result
result['last_sequence'] = last_sequence
# construct the X's and y's
X, y = [], []
for seq, target in sequence_data:
X.append(seq)
y.append(target)
# convert to numpy arrays
X = np.array(X)
y = np.array(y)
if split_by_date:
# split the dataset into training & testing sets by date (not randomly splitting)
train_samples = int((1 - test_size) * len(X))
result["X_train"] = X[:train_samples]
result["y_train"] = y[:train_samples]
result["X_test"] = X[train_samples:]
result["y_test"] = y[train_samples:]
if shuffle:
# shuffle the datasets for training (if shuffle parameter is set)
shuffle_in_unison(result["X_train"], result["y_train"])
shuffle_in_unison(result["X_test"], result["y_test"])
else:
# split the dataset randomly
result["X_train"], result["X_test"], result["y_train"], result["y_test"] = train_test_split(X, y,
test_size=test_size, shuffle=shuffle)
# get the list of test set dates
dates = result["X_test"][:, -1, -1]
# retrieve test features from the original dataframe
result["test_df"] = result["df"].loc[dates]
# remove duplicated dates in the testing dataframe
result["test_df"] = result["test_df"][~result["test_df"].index.duplicated(keep='first')]
# remove dates from the training/testing sets & convert to float32
result["X_train"] = result["X_train"][:, :, :len(feature_columns)].astype(np.float32)
result["X_test"] = result["X_test"][:, :, :len(feature_columns)].astype(np.float32)
return result
def create_model(sequence_length, n_features, units=256, cell=LSTM, n_layers=2, dropout=0.3,
loss="mean_absolute_error", optimizer="rmsprop", bidirectional=False):
model = Sequential()
for i in range(n_layers):
if i == 0:
# first layer
if bidirectional:
model.add(Bidirectional(cell(units, return_sequences=True), batch_input_shape=(None, sequence_length, n_features)))
else:
model.add(cell(units, return_sequences=True, batch_input_shape=(None, sequence_length, n_features)))
elif i == n_layers - 1:
# last layer
if bidirectional:
model.add(Bidirectional(cell(units, return_sequences=False)))
else:
model.add(cell(units, return_sequences=False))
else:
# hidden layers
if bidirectional:
model.add(Bidirectional(cell(units, return_sequences=True)))
else:
model.add(cell(units, return_sequences=True))
# add dropout after each layer
model.add(Dropout(dropout))
model.add(Dense(1, activation="linear"))
model.compile(loss=loss, metrics=["mean_absolute_error"], optimizer=optimizer)
return model