Date: 2018-05-29
Author: Nick Wong
Let's try to make an LSTM neural network prediction for gold with Quantapi. Despite many people argued that asset price is not predictable, we still try to predict the asset price using LSTM to see if there is any good results and as a way of learning. If you do not know what is LSTM, please read this How Recurrent Neural Networks and Long Short-Term Memory Work.
The result is as follows:
Neurons | Epochs | RMSE | Correct Trend Percentage |
---|---|---|---|
1 | 10 | 16.029 | 27.3 |
1 | 20 | 15.728 | 27.3 |
2 | 10 | 15.929 | 27.3 |
2 | 20 | 15.867 | 27.3 |
4 | 10 | 15.975 | 27.3 |
4 | 100 | 15.796 | 27.3 |
4 | 200 | 16.339 | 27.3 |
4 | 500 | 16.363 | 36.4 |
4 | 1000 | 16.006 | 45.5 |
10 | 10 | 16.091 | 27.3 |
10 | 100 | 16.156 | 27.3 |
10 | 200 | 16.219 | 27.3 |
10 | 500 | 15.915 | 36.4 |
10 | 1000 | 17.930 | 27.3 |
20 | 10 | 16.002 | 27.3 |
20 | 100 | 16.577 | 36.4 |
20 | 200 | 16.573 | 36.4 |
20 | 500 | 16.921 | 27.3 |
20 | 1000 | 19.766 | 27.3 |
50 | 10 | 16.090 | 27.3 |
50 | 100 | 16.699 | 45.5 |
50 | 200 | 17.015 | 27.3 |
50 | 500 | 17.481 | 36.4 |
* 50 | 1000 | 15.902 | 54.5 |
* Best for correct trend predictions
Below code may take several hours to run. You can try to tune parameters such as number of neurons, epochs, or even change neural network structures or other assets to try.
Get Quantapi dataBelow we would import some required packages for the script. Remember to run pip install if you haven't installed the packages yet. Then we define a function get_quantapi(symbol, token, duration, format) which takes in 4 parameters symbol, token, duration, format.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | #import required packages from pandas import DataFrame from pandas import concat from matplotlib import pyplot from sklearn.metrics import mean_squared_error from sklearn.preprocessing import MinMaxScaler from math import sqrt from keras.models import Sequential from keras.layers import Dense from keras.layers import LSTM from keras.layers.core import Dropout from keras import optimizers import numpy import pandas as pd import time start = time.time() #get data from quantapi json format def get_quantapi(symbol, token, duration, format): query = 'https://quantapi.co/api/' + symbol + '/' + token + '/?d=' + duration + '&f=' + format df = pd.read_json(query) df = df[['date', 'open', 'high', 'low', 'close']] df = df.sort_values('date', ascending = True) print(df.tail()) return df |
This function takes in a dataframe and a lag time to convert a timeseries to a supervised learning problem.
1 2 3 4 5 6 7 8 | #frame a sequence as a supervised learning problem def timeseries_to_supervised(data, lag=1): df = DataFrame(data) columns = [df.shift(i) for i in range(1, lag+1)] columns.append(df) df = concat(columns, axis=1) df.fillna(0, inplace = True) return df |
Removing trends by differencing.
1 2 3 4 5 6 7 | #create a differenced series def difference(dataset, interval=1): diff = list() for i in range(interval, len(dataset)): value = dataset[i] - dataset[i - interval] diff.append(value) return numpy.array(diff) |
This function is used to inverse the differenced values.
1 2 3 | #invert differenced value def inverse_difference(history, yhat, interval=1): return yhat + history[-interval] |
This function transform the differenced values to -1 and 1.
1 2 3 4 5 6 7 8 9 10 11 | #tranform scale for train and test data to [-1,1] def scale(train, test): scaler = MinMaxScaler(feature_range=(-1,1)) scaler = scaler.fit(train) #transform train train = train.reshape(train.shape[0], train.shape[1]) train_scaled = scaler.transform(train) #transform test test = test.reshape(test.shape[0], test.shape[1]) test_scaled = scaler.transform(test) return scaler, train_scaled, test_scaled |
This function inverse scaling for forecasted values.
1 2 3 4 5 6 7 | #inverse scaling for a forecasted value def invert_scale(scaler, X, value): new_row = [X for x in X] + [value] array = numpy.array(new_row) array = array.reshape(1, len(array)) inverted = scaler.inverse_transform(array) return inverted[0,-1] |
This function fits LSTM network to training data. Basically it uses 2 LSTM layer with dropouts.
LSTM Layer 1 -> Dropout -> LSTM Layer 2 -> Dropout
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | #fit an LSTM network to training data def fit_lstm(train, batch_size, nb_epoch, neurons): X, Y = train[:, 0:-1], train[:, -1] X = X.reshape(X.shape[0], 1, X.shape[1]) model = Sequential() model.add(LSTM(neurons, batch_input_shape=(batch_size, X.shape[1], X.shape[2]), stateful=True, return_sequences=True)) model.add(Dropout(0.4)) model.add(LSTM(neurons, batch_input_shape=(batch_size, X.shape[1], X.shape[2]), stateful=True, return_sequences=False)) model.add(Dropout(0.3)) model.add(Dense(1)) #set learning rate #adam = optimizers.Adam(lr=0.0001) adam = optimizers.Adam() model.compile(loss='mean_squared_error', optimizer=adam) for i in range(nb_epoch): model.fit(X, Y, epochs=1, batch_size=batch_size, verbose=0, shuffle=False) model.reset_states() return model |
This function makes one step forecast.
1 2 3 4 5 | #make a one-step forecast def forecast_lstm(model, batch_size, X): X = X.reshape(1, 1, len(X)) yhat = model.predict(X, batch_size = batch_size) return yhat[0,0] |
This code calls before functions and run.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 | symbol = 'gold' #input your token here token = 'Your token here' duration = '20y' format = 'json' series = get_quantapi(symbol, token, duration, format) series = series.set_index('date') #resample to weekly Friday level, last one is excluded as not actual Friday series = series.resample('W-FRI').last() #remove last rows to align prediction with close price only series = series.drop(series.tail(1).index) #print(series.tail()) #transform data to be stationary raw_values = series.values close_values = raw_values[:,-1] diff_values = difference(raw_values, 1) #transform data to be supervised learning supervised = timeseries_to_supervised(diff_values, 1) supervised.columns = ['sd_open', 'sd_high', 'sd_low', 'sd_close', 'open', 'high', 'low', 'close'] supervised = supervised.drop(supervised.columns[[4,5,6]], axis = 1) supervised_values = supervised.values #split data into train and test sets train, test = supervised_values[0:-12], supervised_values[-12:] #tranform the scale of data scaler, train_scaled, test_scaled = scale(train, test) #print(train_scaled.shape, test_scaled.shape) #define the number of neurons and number of epochs nb_epochs = [10, 20, 100, 200, 500, 1000] neurons = [1, 2, 4, 10, 20, 50] nb_correct_predict = 0 for neuron in neurons: for epochs in nb_epochs: #fit the model lstm_model = fit_lstm(train_scaled, 1, epochs, neuron) nb_correct_predict = 0 #forecast the entire training dataset to build up state for forecasting train_reshaped = train_scaled[:,0:-1] train_reshaped = train_reshaped.reshape(train_reshaped.shape[0], 1, train_reshaped.shape[1]) lstm_model.predict(train_reshaped, batch_size=1) # walk-forward validation on the test data predictions = list() for i in range(len(test_scaled)): # make one-step forecast X, y = test_scaled[i, 0:-1], test_scaled[i, -1] X_close = test_scaled[i,-2:-1] yhat = forecast_lstm(lstm_model, 1, X) # Put the predictions there and invert scale test_scaled[i, -1] = yhat yhat = scaler.inverse_transform(test_scaled)[i, -1] # invert differencing yhat = inverse_difference(close_values, yhat, len(test_scaled)+1-i) # store forecast predictions.append(yhat) expected = close_values[len(train) + i + 1] #calculate number of correct trend predictions if i != 0: if (expected > old_expected) and (yhat > old_yhat): nb_correct_predict = nb_correct_predict+1 elif (expected < old_expected) and (yhat < old_yhat): nb_correct_predict = nb_correct_predict+1 elif (expected == old_expected) and (yhat == old_yhat): nb_correct_predict = nb_correct_predict+1 print('Date=%s, Predicted=%f, Expected=%f' % (series.index[-12+i], yhat, expected)) old_yhat = yhat old_expected = expected #predict the next gold price last = test_scaled[-1, 0:-1] yhat = forecast_lstm(lstm_model, 1, last) # invert scaling test_scaled[-1, -1] = yhat yhat = scaler.inverse_transform(test_scaled)[-1, -1] # invert differencing yhat = inverse_difference(close_values, yhat, 1) # print next prediction print('Next predicted gold price: %f' % yhat) # print correct number of trend predictions p_correct_predict = nb_correct_predict/(len(test_scaled)-1) * 100 print('Number of correct trend predictions: %d, percentage: %.1f' % (nb_correct_predict, p_correct_predict)) #report performance rmse = sqrt(mean_squared_error(close_values[-12:], predictions)) print('Number of Epochs: %d, Number of Neurons: %d' % (epochs, neuron)) print('Test RMSE: %.3f' % rmse) print('Data: Open, High, Low, Close Price') end = time.time() print('*********Time Used: %.5s seconds*********' %(end - start)) #write result to file with open('result_gold.txt', 'a') as f: print('Number of Epochs: %d, Number of Neurons: %d' % (epochs, neuron), file = f) print('Number of correct trend predictions: %d, percentage: %.1f' % (nb_correct_predict, p_correct_predict), file = f) print('Test RMSE: %.3f' % rmse, file = f) print('Data: Open, High, Low, Close Price', file =f) print('*********Time Used: %.5s seconds*********' %(end - start), file = f) f.close() with open('rmse_gold.txt', 'a') as r: print('Neurons:%d,Epochs:%d,RMSE:%.3f,Trend Percentage:%.1f' %(neuron, epochs, rmse, p_correct_predict), file = r) r.close() today = dt.datetime.now().strftime("%Y%m%d") #line plot of observed vs predicted pyplot.plot(close_values[-12:], label = 'Expected Value') pyplot.plot(predictions, label = 'Predicted Value') pyplot.legend() pyplot.title('Predicted VS Actual Gold Price' + ', Neurons='+str(neuron)+', Epochs='+str(epochs)) pyplot.savefig('Gold' + str(today) + 'Neurons'+str(neuron)+'Epochs'+str(epochs)+'.png', bbox_inches='tight') pyplot.show() |
Full code can be copied below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 | #import required packages from pandas import DataFrame from pandas import concat from matplotlib import pyplot from sklearn.metrics import mean_squared_error from sklearn.preprocessing import MinMaxScaler from math import sqrt from keras.models import Sequential from keras.layers import Dense from keras.layers import LSTM from keras.layers.core import Dropout from keras import optimizers import numpy import pandas as pd import time import datetime as dt start = time.time() #get data from quantapi json format def get_quantapi(symbol, token, duration, format): query = 'https://quantapi.co/api/' + symbol + '/' + token + '/?d=' + duration + '&f=' + format df = pd.read_json(query) df = df[['date', 'open', 'high', 'low', 'close']] df = df.sort_values('date', ascending = True) print(df.tail()) return df #frame a sequence as a supervised learning problem def timeseries_to_supervised(data, lag=1): df = DataFrame(data) columns = [df.shift(i) for i in range(1, lag+1)] columns.append(df) df = concat(columns, axis=1) df.fillna(0, inplace = True) return df #create a differenced series def difference(dataset, interval=1): diff = list() for i in range(interval, len(dataset)): value = dataset[i] - dataset[i - interval] diff.append(value) return numpy.array(diff) #invert differenced value def inverse_difference(history, yhat, interval=1): return yhat + history[-interval] #tranform scale for train and test data to [-1,1] def scale(train, test): scaler = MinMaxScaler(feature_range=(-1,1)) scaler = scaler.fit(train) #transform train train = train.reshape(train.shape[0], train.shape[1]) train_scaled = scaler.transform(train) #transform test test = test.reshape(test.shape[0], test.shape[1]) test_scaled = scaler.transform(test) return scaler, train_scaled, test_scaled #inverse scaling for a forecasted value def invert_scale(scaler, X, value): new_row = [X for x in X] + [value] array = numpy.array(new_row) array = array.reshape(1, len(array)) inverted = scaler.inverse_transform(array) return inverted[0,-1] #fit an LSTM network to training data def fit_lstm(train, batch_size, nb_epoch, neurons): X, Y = train[:, 0:-1], train[:, -1] X = X.reshape(X.shape[0], 1, X.shape[1]) model = Sequential() model.add(LSTM(neurons, batch_input_shape=(batch_size, X.shape[1], X.shape[2]), stateful=True, return_sequences=True)) model.add(Dropout(0.4)) model.add(LSTM(neurons, batch_input_shape=(batch_size, X.shape[1], X.shape[2]), stateful=True, return_sequences=False)) model.add(Dropout(0.3)) model.add(Dense(1)) #set learning rate #adam = optimizers.Adam(lr=0.0001) adam = optimizers.Adam() model.compile(loss='mean_squared_error', optimizer=adam) for i in range(nb_epoch): model.fit(X, Y, epochs=1, batch_size=batch_size, verbose=0, shuffle=False) model.reset_states() return model #make a one-step forecast def forecast_lstm(model, batch_size, X): X = X.reshape(1, 1, len(X)) yhat = model.predict(X, batch_size = batch_size) return yhat[0,0] symbol = 'gold' #input your token here token = 'Your token here' duration = '20y' format = 'json' series = get_quantapi(symbol, token, duration, format) series = series.set_index('date') #resample to weekly Friday level, last one is excluded as not actual Friday series = series.resample('W-FRI').last() #remove last rows to align prediction with close price only series = series.drop(series.tail(1).index) #print(series.tail()) #transform data to be stationary raw_values = series.values close_values = raw_values[:,-1] diff_values = difference(raw_values, 1) #transform data to be supervised learning supervised = timeseries_to_supervised(diff_values, 1) supervised.columns = ['sd_open', 'sd_high', 'sd_low', 'sd_close', 'open', 'high', 'low', 'close'] supervised = supervised.drop(supervised.columns[[4,5,6]], axis = 1) supervised_values = supervised.values #split data into train and test sets train, test = supervised_values[0:-12], supervised_values[-12:] #tranform the scale of data scaler, train_scaled, test_scaled = scale(train, test) #print(train_scaled.shape, test_scaled.shape) #define the number of neurons and number of epochs nb_epochs = [10, 20, 100, 200, 500, 1000] neurons = [1, 2, 4, 10, 20, 50] nb_correct_predict = 0 for neuron in neurons: for epochs in nb_epochs: #fit the model lstm_model = fit_lstm(train_scaled, 1, epochs, neuron) nb_correct_predict = 0 #forecast the entire training dataset to build up state for forecasting train_reshaped = train_scaled[:,0:-1] train_reshaped = train_reshaped.reshape(train_reshaped.shape[0], 1, train_reshaped.shape[1]) lstm_model.predict(train_reshaped, batch_size=1) # walk-forward validation on the test data predictions = list() for i in range(len(test_scaled)): # make one-step forecast X, y = test_scaled[i, 0:-1], test_scaled[i, -1] X_close = test_scaled[i,-2:-1] yhat = forecast_lstm(lstm_model, 1, X) # Put the predictions there and invert scale test_scaled[i, -1] = yhat yhat = scaler.inverse_transform(test_scaled)[i, -1] # invert differencing yhat = inverse_difference(close_values, yhat, len(test_scaled)+1-i) # store forecast predictions.append(yhat) expected = close_values[len(train) + i + 1] #calculate number of correct trend predictions if i != 0: if (expected > old_expected) and (yhat > old_yhat): nb_correct_predict = nb_correct_predict+1 elif (expected < old_expected) and (yhat < old_yhat): nb_correct_predict = nb_correct_predict+1 elif (expected == old_expected) and (yhat == old_yhat): nb_correct_predict = nb_correct_predict+1 print('Date=%s, Predicted=%f, Expected=%f' % (series.index[-12+i], yhat, expected)) old_yhat = yhat old_expected = expected #predict the next gold price last = test_scaled[-1, 0:-1] yhat = forecast_lstm(lstm_model, 1, last) # invert scaling test_scaled[-1, -1] = yhat yhat = scaler.inverse_transform(test_scaled)[-1, -1] # invert differencing yhat = inverse_difference(close_values, yhat, 1) # print next prediction print('Next predicted gold price: %f' % yhat) # print correct number of trend predictions p_correct_predict = nb_correct_predict/(len(test_scaled)-1) * 100 print('Number of correct trend predictions: %d, percentage: %.1f' % (nb_correct_predict, p_correct_predict)) #report performance rmse = sqrt(mean_squared_error(close_values[-12:], predictions)) print('Number of Epochs: %d, Number of Neurons: %d' % (epochs, neuron)) print('Test RMSE: %.3f' % rmse) print('Data: Open, High, Low, Close Price') end = time.time() print('*********Time Used: %.5s seconds*********' %(end - start)) #write result to file with open('result_gold.txt', 'a') as f: print('Number of Epochs: %d, Number of Neurons: %d' % (epochs, neuron), file = f) print('Number of correct trend predictions: %d, percentage: %.1f' % (nb_correct_predict, p_correct_predict), file = f) print('Test RMSE: %.3f' % rmse, file = f) print('Data: Open, High, Low, Close Price', file =f) print('*********Time Used: %.5s seconds*********' %(end - start), file = f) f.close() with open('rmse_gold.txt', 'a') as r: print('Neurons:%d,Epochs:%d,RMSE:%.3f,Trend Percentage:%.1f' %(neuron, epochs, rmse, p_correct_predict), file = r) r.close() today = dt.datetime.now().strftime("%Y%m%d") #line plot of observed vs predicted pyplot.plot(close_values[-12:], label = 'Expected Value') pyplot.plot(predictions, label = 'Predicted Value') pyplot.legend() pyplot.title('Predicted VS Actual Gold Price' + ', Neurons='+str(neuron)+', Epochs='+str(epochs)) pyplot.savefig('Gold' + str(today) + 'Neurons'+str(neuron)+'Epochs'+str(epochs)+'.png', bbox_inches='tight') pyplot.show() |
Ref: https://machinelearningmastery.com/time-series-forecasting-long-short-term-memory-network-python/