データをLSTMで分析できるように形を整る
[行数], [変数数], [カラム数(ルックバック数)]の形式に変換
1 2 |
trainX = numpy.reshape(trainX, (trainX.shape[0], 1, trainX.shape[1])) testX = numpy.reshape(testX, (testX.shape[0], 1, testX.shape[1])) |
データをLSTMで分析できるように形を整る
[行数], [変数数], [カラム数(ルックバック数)]の形式に変換
1 2 |
trainX = numpy.reshape(trainX, (trainX.shape[0], 1, trainX.shape[1])) testX = numpy.reshape(testX, (testX.shape[0], 1, testX.shape[1])) |
LSTMの予想ではある時点からいくつか前のデータを用いて
次の時点のデータを予測し教師データを作成する
入力データがある地点からいくつか前の点のデータ
教師データは次の時点のデータ となるように
入力データ、教師データを作成
次の関数のlook_back=n
のn
が「いくつ前のデータを利用するか」を設定
1 2 3 4 5 6 7 8 9 10 11 |
def create_dataset(dataset, look_back): dataX, dataY = [], [] for i in range(len(dataset)-look_back-1): a = dataset[i:(i+look_back), 0] dataX.append(a) dataY.append(dataset[i + look_back, 0]) return numpy.array(dataX), numpy.array(dataY) look_back = 10 trainX, trainY = create_dataset(train, look_back) testX, testY = create_dataset(test, look_back) |
KerasでLSTMを構築する際はデータを0-1の値にスケーリングした方が結果が安定
但し、データをすべて用いてスケーリングすると、
訓練データにテストデータの情報が混入する事になる
データの最大値が1、最小値が0になるように加工する。
例えば訓練データに1に近い値がない場合には、
テストデータに最大値があるという事になる
訓練データを基準にデータ訓練データのスケーリングするには
1 2 3 4 5 6 7 8 |
# データのスケーリング scaler = MinMaxScaler(feature_range=(0, 1)) #trainに入っているデータを基準にスケーリングすることを定義 scaler_train = scaler.fit(train) #trainデータのスケーリング train = scaler_train.transform(train) #testデータのスケーリング test = scaler_train.transform(test) |
LSTM(長短期記憶ユニット)とはRNN(再帰型ネットワーク)のひとつ
時系列データの解析、言語の解析、音声の解析、売上予測等に使用。
RNNには長期間の時系列を保持することが難しいが、
inputgateやfogetgate、ouputgateで問題がクリアできる。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
import numpy import matplotlib.pyplot as plt from pandas import read_csv import math from keras.models import Sequential from keras.layers import Dense from keras.layers import LSTM from sklearn.preprocessing import MinMaxScaler from sklearn.metrics import mean_squared_error # データセットの作成 def create_dataset(dataset, look_back): dataX, dataY = [], [] for i in range(len(dataset)-look_back-1): a = dataset[i:(i+look_back), 0] dataX.append(a) dataY.append(dataset[i + look_back, 0]) return numpy.array(dataX), numpy.array(dataY) # 乱数設定 numpy.random.seed(7) # データセット読み込み dataframe = read_csv('monthly-champagne-sales-in-1000s.csv', usecols=[1], engine='python', skipfooter=3) dataset = dataframe.values dataset = dataset.astype('float32') # 訓練データ、テストデータ train_size = int(len(dataset) * 0.67) test_size = len(dataset) - train_size train, test = dataset[0:train_size,:], dataset[train_size:len(dataset),:] # データスケーリング scaler = MinMaxScaler(feature_range=(0, 1)) scaler_train = scaler.fit(train) train = scaler_train.transform(train) test = scaler_train.transform(test) # データ作成 look_back = 10 trainX, trainY = create_dataset(train, look_back) testX, testY = create_dataset(test, look_back) # データ整形 trainX = numpy.reshape(trainX, (trainX.shape[0], trainX.shape[1], 1)) testX = numpy.reshape(testX, (testX.shape[0], testX.shape[1], 1)) # LSTMモデルの作成と学習 model = Sequential() model.add(LSTM(64, return_sequences=True,input_shape=(look_back, 1))) model.add(LSTM(32)) model.add(Dense(1)) model.compile(loss='mean_squared_error', optimizer='adam') model.fit(trainX, trainY, epochs=10, batch_size=1, verbose=2) # 予測データの作成 trainPredict = model.predict(trainX) testPredict = model.predict(testX) # スケールしたデータを元に戻す trainPredict = scaler_train.inverse_transform(trainPredict) trainY = scaler_train.inverse_transform([trainY]) testPredict = scaler_train.inverse_transform(testPredict) testY = scaler_train.inverse_transform([testY]) # 予測精度の計算 trainScore = math.sqrt(mean_squared_error(trainY[0], trainPredict[:,0])) print('Train Score: %.2f RMSE' % (trainScore)) testScore = math.sqrt(mean_squared_error(testY[0], testPredict[:,0])) print('Test Score: %.2f RMSE' % (testScore)) # プロットのためのデータ整形 trainPredictPlot = numpy.empty_like(dataset) trainPredictPlot[:, :] = numpy.nan trainPredictPlot[look_back:len(trainPredict)+look_back, :] = trainPredict testPredictPlot = numpy.empty_like(dataset) testPredictPlot[:, :] = numpy.nan testPredictPlot[len(trainPredict)+(look_back*2)+1:len(dataset)-1, :] = testPredict # テストデータのプロット plt.plot(dataframe[round(len(dataset)*0.67):]) plt.plot(testPredictPlot) plt.show() |
データを訓練用と訓練用に分ける
時系列分析の場合は前回のデータを基にするのでランダムにしない
一般的な分別の場合はランダムに分ける
◆一般的な分析の場合
1 2 3 4 |
y = data.quality X = data.drop('quality', axis=1) X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,random_state=123,stratify=y) |
◆時系列分析の場合
前半67%を訓練用、残りはテスト用
1 2 3 4 |
train_size = int(len(dataset) * 0.67) test_size = len(dataset) - train_size train, test = dataset[0:train_size,:], dataset[train_size:len(dataset),:] print(len(train), len(test)) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
print(__doc__) import itertools import numpy as np import matplotlib.pyplot as plt from sklearn import svm, datasets from sklearn.model_selection import train_test_split from sklearn.metrics import confusion_matrix # import some data to play with iris = datasets.load_iris() X = iris.data y = iris.target class_names = iris.target_names # Split the data into a training set and a test set X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0) # Run classifier, using a model that is too regularized (C too low) to see # the impact on the results classifier = svm.SVC(kernel='linear', C=0.01) y_pred = classifier.fit(X_train, y_train).predict(X_test) def plot_confusion_matrix(cm, classes, normalize=False, title='Confusion matrix', cmap=plt.cm.Blues): """ This function prints and plots the confusion matrix. Normalization can be applied by setting `normalize=True`. """ if normalize: cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis] print("Normalized confusion matrix") else: print('Confusion matrix, without normalization') print(cm) plt.imshow(cm, interpolation='nearest', cmap=cmap) plt.title(title) plt.colorbar() tick_marks = np.arange(len(classes)) plt.xticks(tick_marks, classes, rotation=45) plt.yticks(tick_marks, classes) fmt = '.2f' if normalize else 'd' thresh = cm.max() / 2. for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])): plt.text(j, i, format(cm[i, j], fmt), horizontalalignment="center", color="white" if cm[i, j] > thresh else "black") plt.tight_layout() plt.ylabel('True label') plt.xlabel('Predicted label') # Compute confusion matrix cnf_matrix = confusion_matrix(y_test, y_pred) np.set_printoptions(precision=2) # Plot non-normalized confusion matrix plt.figure() plot_confusion_matrix(cnf_matrix, classes=class_names, title='Confusion matrix, without normalization') # Plot normalized confusion matrix plt.figure() plot_confusion_matrix(cnf_matrix, classes=class_names, normalize=True, title='Normalized confusion matrix') plt.show() |