今回は、畳込みニューラルネットワーク(CNN)を活用して、画像ではなく、加速度信号、音声信号などの時系列データからなんらかの状態判定を行うモデルを構築します。その例として、慣性信号から状態を判定してみます。
サンプルのデータとして、大前のサーバにおいてある慣性信号をダウンロードします。ターミナルでwgetコマンドを叩くと入手できますが、せっかくなので、pythonでターミナルコマンドを叩くコードを書いてみます。
import os
# 慣性センサデータのダウンロード
os.system("wget http://int-info.com/PyLearn/dat.zip")
os.system("unzip dat.zip")
これは、あるモータの隣に慣性センサを接続し、モータを回転させたときに得られたデータとなります。
事前準備: ターミナルで以下を入力し、jupyterを再起動してください。
バックエンドがplaidml kerasに変わっていればok
import plaidml.keras
import keras
print(keras.backend.backend())
慣性信号の基本については、PyLearn SIG01: 7チャンネル慣性センサデータに記載していますので、必要に応じてご覧ください。
なお、今回は1.csv〜150.csv、合計150個のCSVファイルがあります。これらはそれぞれ、以下の3状態となります。
これらは1回の観測でだいたい5秒間分のデータを計測したものです。したがいまして、5秒間の計測データが150個あることになります。
import numpy as np
signal=[]
for i in range(1, 151): # 1〜150まで
signal.append(np.loadtxt("dat/" + str(i) + ".csv", delimiter=",", skiprows=1, usecols=range(2,8)))
ロードができたところで、実際の計測時間を見てみます。
samp_freq = 100 # サンプリング周波数
# 信号長と計測時間の取得
for i in range(5): # 多くなるので5個だけprint
sig_len = len(signal[i][:, 0]) # 信号長(i番目のCSVファイル、すべての行、0列目)
sig_time = sig_len/100 #
print(i, ".csv / 信号長: ", sig_len, " / 計測時間: ", sig_time, " [秒]")
こんな感じで、各信号、長さがちょっとずつ違います。詳細は後述しますが、深層学習(CNN)で信号から状態を判定したい場合、入力する数字のサイズは毎回同じでなければいけません。今回は4.52秒のデータで判定して、次は5.38秒のデータで判定する、などという柔軟なモデルは作れません。ですので、信号長は固定値を採用せねばならないわけです。
今回は、開始1秒分(つまり、信号長100)のデータで状態を判定するという課題に置き換えてみます。すなわち、長さがバラバラの信号を、1秒分にカットしていくという作業を行います。
tips:
なお、1秒分というのは、機械学習のハイパーパラメータ(事前に分析者が試行錯誤的に決めなければならない値)となります。0.5秒や2秒などの方が精度が高くなるかもしれません。1秒というのはてきとうです。精度が悪い場合、見直す必要があることを覚えておいてください。
sig_100 = []
for i in range(len(signal)):
# i.csvに格納されていた信号の、「0〜99行目、全ての列」を切り出し
sig_100.append(signal[i][0:100, :])
for i in range(5): # 多くなるので5個だけprint
sig_len = len(sig_100[i][:, 0])
sig_time = sig_len/samp_freq
print(i, ".csv / 信号長: ", sig_len, " / 計測時間: ", sig_time, " [秒]")
これで、すべての観測信号がぴったり1秒分になりました。続いて、規格化処理に進みます。CNNは入力データの最小〜最大値レンジが0〜1範囲であることが要求されていました。そのため、規格化していきます。これは、以下の手続きで完了します。
まずは、X軸加速度の最大値・最小値を取り出します。
# 150信号の、X軸加速度の最大値を個別に取得(最小値も)
Xmax_each = []
Xmin_each = []
for i in range(len(sig_100)):
max_temp = np.max(sig_100[i][:, 0])
min_temp = np.min(sig_100[i][:, 0])
Xmax_each.append(max_temp)
Xmin_each.append(min_temp)
# 150個の最大値から、最も大きな値を取り出す。(最小値も)
Xmax = np.max(Xmax_each)
Xmin = np.min(Xmin_each)
print("最大値: ", Xmax)
print("最大値: ", Xmin)
X軸加速度の最大値・最小値を取得できたところで、1.csvのX軸加速度を規格化してみます。
j = 0 # 対象となるCSVファイルのid(+1したものがcsvファイル名)
temp_signal = np.zeros([len(sig_100[j][:, 0])]) # 一時保存用のnp配列
for i in range(len(sig_100[j][:, 0])):
temp_signal[i] = (sig_100[j][i, 0] - Xmin) / (Xmax-Xmin)
1.csvのX軸加速度が規格化できたところで、plotしてみます。
import matplotlib.pyplot as plt
fig = plt.figure(figsize=(20, 8)) # 描画領域の横、縦のサイズ
# 規格化前
plt.subplot(2, 1, 1)
plt.plot(sig_100[j][:, 0], label="Xacc")
plt.xlabel("time: 0.01[sec]", size=20)
plt.ylabel("Acceleration: 0.1[mG]", size=20)
plt.legend(fontsize=20) # 凡例表示
# 規格化後
plt.subplot(2, 1, 2)
plt.plot(temp_signal, label="Xacc")
plt.xlabel("time: 0.01[sec]", size=20)
plt.ylabel("Acceleration", size=20)
plt.legend(fontsize=20) # 凡例表示
plt.show()
規格化前はレンジが広かったですが、規格化後はレンジが0〜1の間に収まることが確認できました。当然、信号の形状はまったく一緒です。
上記コードは、あくまで「1.CSV」の「X軸加速度」のみに行った処理になります。150回の観測すべて、3軸加速度・3軸角速度すべてにこの処理を行う必要があります。これを一気に行うコードが以下となります。
まずは、6軸の最大値と最小値を取得します。
Max_list_6axis = []
Min_list_6axis = []
for j in range(6): # 6軸分
max_each = []
min_each = []
for i in range(len(sig_100)):
max_temp = np.max(sig_100[i][:, j])
min_temp = np.min(sig_100[i][:, j])
max_each.append(max_temp)
min_each.append(min_temp)
# j軸の150個の最大値から、最大・最小値抽出
max_all = np.max(max_each)
min_all = np.min(min_each)
# 最大・最小をappend
Max_list_6axis.append(max_all)
Min_list_6axis.append(min_all)
head = ["Xacc", "Yacc", "Zacc", "Xgyr", "Ygyr", "Zgyr"]
for j in range(len(head)): # 6軸分
print("j=", j, " / Max(", head[j], ") = ", Max_list_6axis[j])
print("j=", j, " / Min(", head[j], ") = ", Min_list_6axis[j], "\n")
6軸の最大・最小が、
に格納されました。続いてこの情報をもとに、150個の観測信号をすべて規格化します。
sig_100_stand = [] #150個の100x6サイズの行列を格納するリスト
for j in range(150):
# 100x6サイズの一時保存用のnp配列を用意
npsize = np.shape(sig_100[j]) # わからない場合はnpsizeをprint
temp_signal = np.zeros(npsize)
for ax in range(npsize[1]): # 6回まわす
# j.csv、軸axの規格化処理
temp_signal[:, ax] = (sig_100[j][:, ax] - Min_list_6axis[ax])
temp_signal[:, ax] = temp_signal[:, ax] / (Max_list_6axis[ax]-Min_list_6axis[ax])
sig_100_stand.append(temp_signal)
これで、
に、「j+1.csv」の「ax」軸の規格化された信号が格納されました。実際に規格化されているのか、みてみます。
fig = plt.figure(figsize=(20, 12)) # 描画領域の横、縦のサイズ
plt.subplots_adjust(wspace=0.4, hspace=0.4)
# 以下、自由にかえてよい
j = 100 # 見たいCSVファイルのid(j.csv)(0〜150を指定)
ax = 2 # 確認したい軸(0〜5を指定)
# 以下、描画
# 規格化前
plt.subplot(2, 1, 1)
plt.plot(sig_100[j][:, ax], label=head[ax])
plt.xlabel("time: 0.01[sec]", size=20)
plt.ylabel("Acceleration: 0.1[mG]", size=20)
plt.title("Non-standardization", size=20)
plt.legend(fontsize=20) # 凡例表示
#規格化後
plt.subplot(2, 1, 2)
plt.plot(sig_100_stand[j][:, ax], label=head[ax])
plt.xlabel("time: 0.01[sec]", size=20)
plt.ylabel("Acceleration", size=20)
plt.title("Standardization", size=20)
plt.legend(fontsize=20) # 凡例表示
plt.show()
きちんと規格化されていることがわかりました。そろそろCNNの学習に行きたいところですが、まだやらなければならないことがあります。
まず、PyLearn Keras 03: モノクロ画像に対するCNNの学習をみて、CNN に要求される入力データの形状を思い出します。このページを見ると、
こんな感じの記載があります。これはMNISTのデータで、
という意味がありました。今回も、この構造を守らなければいけません。今回扱う慣性信号であれば、
つまり、
というshapeをしたnumpyのデータに変換していく必要があります。現在で生きているデータ、
は、1回の観測信号100x6のサイズのデータがnumpyで管理され、150回分の結果がリストで管理されています。これを、前述したようにリストを使用せず、(150, 100, 6, 1)サイズのnumpyに変換せねば、CNNはデータを受け取ってくれないのです。
そういうわけで、この作業をやっていきます。
sig_cnn = np.zeros([150, 100, 6, 1]) # わかりやすさのため直打ち
for j in range(len(sig_100_stand)):
sig_cnn[j, :, :, 0] = sig_100_stand[j]
これで前処理が終了です。CNNは画像を入力することが想定されたモデルなので、画像っぽくデータを変形させる必要がありました。ですので、もちろん画像として出力することも可能です。以下に、画像化された「1.csv」を見てみます。
j = 0 # j+1.csvを表示、j=0
fig = plt.figure(figsize=(10, 5)) # 描画領域の横、縦のサイズ
plt.imshow(sig_cnn[j, :, :, 0].T, cmap='gray') #.Tは転置
plt.show()
こんな感じで、100x6サイズの細長い画像になりました(転置しているので、行列が逆になっています)。これで、慣性信号側の前処理が終了です。
先ほど、信号側の前処理が終わりました。ただ、あれですと、150回の測定のうち、どれが正常なモータで、どれが異常なモータなのか、わかりません。人工知能を開発するためには、この信号はこの状態という、問題と解答のペアとなる教師データが必要です。先ほど規格化された信号が問題部分に相当しますが、これに対応する解答がないわけです。ここでは、解答を作成します。現在、問題に相当する部分は、
こんな感じになっています。そのため、1.csv〜150.csvに対する正解ラベルを、その順番で格納されたデータを作る必要があります。
今回の150回の観測は、それぞれ、以下の条件でモータを回したことを意味します。
これに対応するクラスラベルの作り方は、One-hot vectorにする必要があるので、
サイズのnumpy配列にする必要があります。もしそのクラスが、
としてみます。すなわち、列成分の1つが「1」で、それ以外が「0」となります。この条件でデータを作ってみましょう。
ClassLabel = np.zeros([150, 3]) # わかりやすさのため直打ち
for i in range(150):
if i <= 49: # 1.csv 〜 50.csv(リスト0〜49)であれば
ClassLabel[i, 0] = 0
ClassLabel[i, 1] = 0
ClassLabel[i, 2] = 1
elif i <= 99: # 51.csv 〜 100.csv(リスト50〜99)であれば
ClassLabel[i, 0] = 0
ClassLabel[i, 1] = 1
ClassLabel[i, 2] = 0
else: # 101.csv 〜 150.csv(リスト100〜149)であれば
ClassLabel[i, 0] = 1
ClassLabel[i, 1] = 0
ClassLabel[i, 2] = 0
print("A2 ->", np.sum(ClassLabel[:, 2]), "個")
print("A1 ->", np.sum(ClassLabel[:, 1]), "個")
print("N ->", np.sum(ClassLabel[:, 0]), "個")
データ整形が済んだところで、最後の処理です。集めたデータを、2つのデータセットに分割します。
今回は、1クラス50データを、教師データ30、テストデータ20に分割してみます。3クラスありますので、
とします。
sig_cnn_train = np.zeros([90, 100, 6, 1]) # 教師データ(信号)
sig_cnn_test = np.zeros([60, 100, 6, 1]) # テストデータ(信号)
ClassLabel_train = np.zeros([90, 3]) # 教師データ(正答ラベル)
ClassLabel_test = np.zeros([60, 3]) # テストデータ(正答ラベル)
# データの移動
c_train = 0
c_test = 0
for i in range(0, 150):
# クラスA2の振り分け処理
if i < 30: # 0〜29を、教師データへ
sig_cnn_train[c_train, :, :, :] = sig_cnn[i, :, :, :]
ClassLabel_train[c_train, :] = ClassLabel[i, :]
c_train=c_train+1
elif i < 50: # 30〜49を、テストデータへ
sig_cnn_test[c_test, :, :, :] = sig_cnn[i, :, :, :]
ClassLabel_test[c_test, :] = ClassLabel[i, :]
c_test=c_test+1
# クラスA1の振り分け処理
elif i < 80: # 50〜79を、教師データへ
sig_cnn_train[c_train, :, :, :] = sig_cnn[i, :, :, :]
ClassLabel_train[c_train, :] = ClassLabel[i, :]
c_train=c_train+1
elif i < 100: # 80〜99を、テストデータへ
sig_cnn_test[c_test, :, :, :] = sig_cnn[i, :, :, :]
ClassLabel_test[c_test, :] = ClassLabel[i, :]
c_test=c_test+1
# クラスNの振り分け処理
elif i < 130: # 100〜129を、教師データへ
sig_cnn_train[c_train, :, :, :] = sig_cnn[i, :, :, :]
ClassLabel_train[c_train, :] = ClassLabel[i, :]
c_train=c_train+1
elif i < 150: # 130〜149を、テストデータへ
sig_cnn_test[c_test, :, :, :] = sig_cnn[i, :, :, :]
ClassLabel_test[c_test, :] = ClassLabel[i, :]
c_test=c_test+1
これで、教師データとテストデータの作成が終わりました。
となっています。
ようやく前処理が終わったので、CNNの作成に進みます。CNNの説明自体はPyLearn Keras 03: モノクロ画像に対するCNNの学習でしていますので、詳細は省略します。でも、注意すべき点は記載します。
CNNのモデルの構造自体は非常に自由度が高いものの、
この部分だけは、固定値になります。これを守らないと、エラーが出るので注意です。後もう一つ注意すべきところは、カーネルサイズです。
こんな感じにして、必ず、列成分で畳み込まないように注意しましょう。画像の場合は、行と列を畳むことに意味がありますが、信号の場合、行成分が時間で、列成分がXYZ軸加速度・角速度なので、列成分を畳み込み情報を圧縮することに、物理的な意味を見出せません。なので、カーネルサイズの列成分の畳み込みは、必ず1にします(仮にたたみ込んでも、エラーが出るわけではありませんし、そのほうがうまくいくこともあります。ここで述べていることは、精度ではなく物理的な意味を見いだせるか否かという論点での話になります)。
from keras.models import Sequential
from keras.optimizers import RMSprop
from keras.layers import Dense, Dropout, Flatten, Conv2D
# 自分で整えたデータサイズを入力(あくまで今回の場合)
InputSize1 = 100 # 1秒分のデータで判定させるため
InputSize2 = 6 # 6軸の信号のため
NumClass = 3 # 出力層のニューロン数(3クラス)
# 構築するCNNのモデル構造を定義
model = Sequential()
model.add(Conv2D(filters=1, kernel_size=(20, 1), activation='relu', input_shape=(InputSize1, InputSize2, 1)))
model.add(Conv2D(filters=1, kernel_size=(20, 1), activation='relu'))
model.add(Conv2D(filters=1, kernel_size=(20, 1), activation='relu'))
model.add(Conv2D(filters=1, kernel_size=(20, 1), activation='relu'))
model.add(Conv2D(filters=1, kernel_size=(20, 1), activation='relu'))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(30, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(NumClass, activation='softmax'))
# コンパイル(学習方法の定義)
model.compile(loss='categorical_crossentropy', optimizer=RMSprop(), metrics=['accuracy'])
model.summary() # 構造出力
### 学習! ###
batch_size = 30 # うまくいかない場合は、2^n+1にする
epochs = 200 # 学習回数
learning_process = model.fit(sig_cnn_train, ClassLabel_train, # 信号とラベルデータ
batch_size=batch_size, # バッチサイズ
epochs=epochs, # エポック数
verbose=0) # 学習過程を可視化させたい場合は1
print("finish!")
学習が終わったところで、学習過程を見てみます。
import numpy as np
plt.figure()
plt.plot(learning_process.epoch, np.array(learning_process.history['acc']),label='Train acc')
plt.xlabel('Epoch')
plt.ylabel('Acc')
plt.grid()
plt.legend()
教師データの数が少ない場合は、かなり乱数の振る舞いから影響を受けます。なので、うまくいった場合は、とりあえず保存する癖をつけておくといいでしょう。
うまくいった場合は、学習済みモデルを利用して、精度を見てみます。
score_train = model.evaluate(sig_cnn_train, ClassLabel_train, verbose=1, batch_size=1)
print('正答率(教師データ):', np.round(score_train[1], 4))
score_test = model.evaluate(sig_cnn_test, ClassLabel_test, verbose=1, batch_size=1)
print('正答率(テストデータ):', np.round(score_test[1], 4))
教師データの精度は高く、テストデータもそこそこ当たるようです。ところで、学習結果は以下の3状態をとります。
「良い学習」を目指すのがベストですが、教師データの精度が高く、テストデータの精度はそこそこ良い、みたいなことは割と普通です。多少は過学習よりでも、テストデータの精度がまあまああるのであれば、いいと言えばいいと思います。
補足:
判定結果を得るには、以下のように書けばokです。for文を使わなくても書けますが、実際に運用するときには、for文の中に一発判定を何度も組み込むことになるので、それを意識して書いています。
print("### テストデータの判定結果 ###")
for i in range(len(ClassLabel_test)):
res = model.predict(sig_cnn_test[i:i+1, :, :, :])
res = np.argmax(res)
if res == 0:
res_str = "クラスN"
if res == 1:
res_str = "クラスA1"
if res == 2:
res_str = "クラスA2"
print("テストデータ: i=", i, " / ", res_str)
テストデータは、
でしたから、まあまあって感じでしょうね。これを見ると、どういうクラスをどう間違えやすいのか、考察することができます。