PyLearn SIG 04では、正常データ、異常データの二つを用いて、正常・異常状態を判定するクラス分類問題を解くモデルを構築しました。ただし、このためには正常と異常データを集めなくてはなりません。一般に、正常データは大量に集められるが、異常データは少ししか集められないという状況が多いでしょう。したがって、機械学習の分野では、正常データのみで正常と異常を切り分けるモデルを構築しよう、という問題が扱われるようになりました。これを、教師なし異常値検知と言います。「教師なし」とは分析に使用するデータはすべて正常として扱うので、もはや人が正解値を教える必要がない、よって、教師なし、というわけです。
import numpy as np
import random
# 正常クラスの信号読み込み
Sig_N=[]
for i in range(101, 151):
Sig_N.append(np.loadtxt("dat/" + str(i) + ".csv",
delimiter=",", skiprows=1, usecols=range(2,8)))
# 異常1クラスの信号読み込み
Sig_A1=[]
for i in range(51, 101):
Sig_A1.append(np.loadtxt("dat/" + str(i) + ".csv",
delimiter=",", skiprows=1, usecols=range(2,8)))
# 異常2クラスの信号読み込み
Sig_A2=[]
for i in range(1, 51):
Sig_A2.append(np.loadtxt("dat/" + str(i) + ".csv",
delimiter=",", skiprows=1, usecols=range(2,8)))
これでSig_N 〜 Sig_A2のリストに対し、6軸のセンサデータが取得されました。
続いて、合成加速度を求めて7チャンネル慣性センサデータへと変換します。
# 合成加速度を算出する関数:
# Input: 6チャンネルの慣性センサデータ(3軸加速度+3軸角速度)
# Output: 最後の列に合成加速度が追加された7チャンネルの慣性センサデータ
def MakingMag(signal):
M = signal[:,0] * signal[:,0] + signal[:,1] * signal[:,1] + (signal[:,2]) * (signal[:,2])
M = np.sqrt(M)
M_new = np.reshape(M, (len(M), 1))
# 7チャンネル慣性センサデータの取得
sig7 = np.concatenate([signal, M_new], 1)
return sig7
この関数を使って、150回の観測データ(6チャンネル)を、合成加速度が追加された7チャンネルデータに変換します。
# 正常データ
Sig7_N = []
for i in range(0, len(Sig_N)):
Sig7_N.append(MakingMag(Sig_N[i]))
# 異常1データ
Sig7_A1 = []
for i in range(0, len(Sig_A1)):
Sig7_A1.append(MakingMag(Sig_A1[i]))
# 異常2データ
Sig7_A2 = []
for i in range(0, len(Sig_A2)):
Sig7_A2.append(MakingMag(Sig_A2[i]))
続いて、150回の観測信号を特徴量に変換してみます。これを格納するリストとして、
を定義します(専門用語としてこれを特徴量ベクトルと呼びます。英語でFeature vector: FV)。変数の構造は、
とします(FV_A1, A2も同じ)。
軸 j とは、それぞれ以下の意味を持ちます。
k 番目の特徴量は、それぞれ以下のように定義することとします。
すなわち、
となります。なお、これらの特徴量のうち、pythonで用意されていないものもたくさんあるので、まずはそれを求める関数を定義しておきます。
import numpy as np
import scipy.stats as st
from sklearn.linear_model import LinearRegression
from scipy.fftpack import fft
# *****************************************************************
# RMSを求める関数
# Input: 1チャンネルの信号
def Calc_RMS(signal):
a = signal * signal # 二乗
sum_a = np.sum(a) # 総和
sqrt_a = np.sqrt(sum_a) # 平方根
RMS = np.mean(sqrt_a) # 平均値
return RMS
# *****************************************************************
# 動的特徴量を求める関数
# Input: 1チャンネルの信号
def Calc_LinearCoef(signal):
# 入力変数(信号値に対応する連番の数字)
X=np.arange(len(signal))
X=np.reshape(X, (len(X), 1))
# 出力変数
Y=signal
# 線形回帰モデルのパラメータフィッティング
ModelLR = LinearRegression() # 線形回帰モデルのセット
ModelLR.fit(X, Y) # パラメータ獲得(Xは時間、Yは信号値)
# 動的特徴量を返り値に
return ModelLR.coef_[0]
# *****************************************************************
# パワーバンドを求める関数
# signal: 時系列信号(1軸分のみ)
# sf: サンプリング周波数
# min_f: パワーバンドを算出する周波数の最小値
# max_f: パワーバンドを算出する周波数の最大値
def Calc_PowerBand(signal, sf, min_f, max_f):
# パワースペクトル算出
L = len(signal) # 信号長
freq = np.linspace(0, sf, L) # 周波数
yf = np.abs(fft(signal)) # 振幅スペクトル
yf = yf * yf # パワースペクトル
# 指定された周波数の探索
n1, n2 = [], []
for i in range(1, int(L/2)): # <- 直流成分は探索範囲から除外
n1.append(np.abs(freq[i] - min_f))
n2.append(np.abs(freq[i] - max_f))
min_id = np.argmin(n1) # min_fと最も近い周波数が格納された配列番号取得
max_id = np.argmin(n2)# max_fと最も近い周波数が格納された配列番号取得
# 指定された周波数のパワーバンド算出
PB = np.sum(yf[min_id:max_id])
return PB
# *****************************************************************
# 周波数領域エントロピーを求める関数
# signal: 時系列信号(1軸分のみ)
# sf: サンプリング周波数
def Calc_FreqEnt(signal, sf):
# パワースペクトル算出
L = len(signal) # 信号長
freq = np.linspace(0, sf, L) # 周波数
yf = np.abs(fft(signal)) # 振幅スペクトル
yf = yf * yf # パワースペクトル
# 手続き1
a = 0
for i in range(1, int(L/2)): # <- 直流成分除去 & ナイキスト周波数まで
a = a + yf[i]
# 手続き2 & 3
E = 0
for i in range(1, int(L/2)): # <- 直流成分除去 & ナイキスト周波数まで
E = E + (yf[i]/a) * np.log2(yf[i]/a)
E = -E
return E
# *****************************************************************
# エネルギーを算出する関数
# signal: 時系列信号(1軸分のみ)
# sf: サンプリング周波数
def Calc_Energy(signal, sf):
# パワースペクトル算出
L = len(signal) # 信号長
freq = np.linspace(0, sf, L) # 周波数
yf = np.abs(fft(signal)) # 振幅スペクトル
yf = yf * yf # パワースペクトル
# エネルギー算出
En = np.mean(yf[1:int(L/2)]) # <- 直流成分除去 & ナイキスト周波数まで
return En
個別の特徴量を算出する関数ができました。続いて、7チャンネル慣性センサデータを全ての特徴量に変換する関数を定義します。先ほど定義した FV[ i ] [ j ] [ k ]構成の出力を行います。
def FeatureVector(signal, NumClass, NumChannel, NumFeature, sf):
FV = []
for i in range(0, NumClass):
TempList = []
for j in range(0, NumChannel):
TempAry = np.zeros([1, NumFeature])
# 観測信号 i の軸 j の...
sig = signal[i][:, j]
TempAry[0, 0] = np.mean(sig) # j=0 平均
TempAry[0, 1] = np.var(sig) # j=1 分散
TempAry[0, 2] = st.skew(sig) # j=2 歪度
TempAry[0, 3] = st.kurtosis(sig) # j=3 尖度
TempAry[0, 4] = np.min(sig) # j=4 最小値
TempAry[0, 5] = np.max(sig) # j=5 最大値
TempAry[0, 6] = np.median(sig) # j=6 中央値
TempAry[0, 7] = Calc_RMS(sig) # j=7 RMS
TempAry[0, 8] = Calc_LinearCoef(sig) # j=8 動的特徴量
TempAry[0, 9] = Calc_PowerBand(sig, sf, 0, 10) # j=9 PB1
TempAry[0, 10] = Calc_PowerBand(sig, sf, 10, 20) # j=10 PB2
TempAry[0, 11] = Calc_PowerBand(sig, sf, 20, 30) # j=11 PB3
TempAry[0, 12] = Calc_PowerBand(sig, sf, 30, 40) # j=12 PB4
TempAry[0, 13] = Calc_PowerBand(sig, sf, 40, 50) # j=13 PB5
TempAry[0, 14] = Calc_FreqEnt(sig, sf) # j=14 周波数領域エントロピー
TempAry[0, 15] = Calc_Energy(sig, sf) # j=15 エネルギー
TempList.append(TempAry[0])
FV.append(TempList)
return FV
特徴量変換の関数ができたところで、実際にクラスN, A1, A2について、それぞれ50観測分のデータを特徴量に変換します。
NumChannel = len(Sig7_N[0][0, :]) # 軸の最大数
NumFeature = 16 # 扱う特徴量の数
NumClassN = len(Sig7_N) # クラスNのデータ数
NumClassA1 = len(Sig7_A1) # クラスNのデータ数
NumClassA2 = len(Sig7_A2) # クラスNのデータ数
sf = 100 # サンプリング周波数
# クラスNの特徴量変換
FV_N = FeatureVector(Sig7_N, NumClassN, NumChannel, NumFeature, sf)
# クラスA1の特徴量変換
FV_A1 = FeatureVector(Sig7_A1, NumClassA1, NumChannel, NumFeature, sf)
# クラスA2の特徴量変換
FV_A2 = FeatureVector(Sig7_A2, NumClassA2, NumChannel, NumFeature, sf)
これで、7チャンネルのデータを16種の特徴量に変換しました。すなわち、$7 \times 16 = 112$個の特徴量に変換されました。これは、1回の観測を112次元のベクトルに変換したことと等価です。1回の観測を112個の特徴で表現しました、ということです。
以上一連の流れは、クラス分類問題(PyLearnSIG04)とまったく同じです。異常値検知問題の場合は、ここからが少し異なりますので、注意してください。
ここから、特徴量同士のスケールの違いから、Z-score normalizationによる規格化を実施しました。これは、各特徴量の平均値と標準偏差を利用した規格化です。このとき、クラス分類問題の場合は、クラスN, A1, A2すべてのデータを使い、平均値と標準偏差を算出していたことを思い出してください。
異常値検出問題とは、正常データしか集められない場合において、異常状態を検出することを目標とします。すなわち、今は異常状態であるクラスA1, A2のデータがありますが、本来はないものです。これから作成する異常値検出モデルの精度評価を行うために用意しているだけです。したがって、規格化を含めたモデル構築のあらゆる処理において、異常状態であるクラスA1, A2を使用してはいけません。使っていいのは、モデルの検出精度を測る時だけです。
クラス分類問題の場合は、クラスN, A1, A2をすべて使い規格化に必要となる平均値と標準偏差を計算していましたが、異常値検知問題の場合は正常データしか集められないことを前提としているので、クラスNのみで平均値と標準偏差を算出することになります。
これを念頭に、Z-score normalizationを行います。表現は以下の通り。
# 観測データ数の獲得
NumDatN = len(FV_N)
NumDatA1 = len(FV_A1)
NumDatA2 = len(FV_A2)
Zscore = []
for j in range(0, NumChannel):
temp2 = []
for k in range(0, NumFeature):
temp1 = []
temp_zscore = np.zeros([1, 2])
# 軸 j , 特徴量 k を1列に並べる
for i in range(0, NumDatN):
temp1.append(FV_N[i][j][k])
### --- ここはクラス分類問題での規格化 --- ###
# for i in range(0, NumDatA1):
# temp1.append(FV_A1[i][j][k])
#for i in range(0, NumDatA2):
# temp1.append(FV_A1[i][j][k])
# 軸 j , 特徴量 k の 平均・標準偏差を取得
Val_mean = np.mean(temp1)
Val_std = np.std(temp1)
temp_zscore[0, 0] = Val_mean
temp_zscore[0, 1] = Val_std
temp2.append(temp_zscore[0])
Zscore.append(temp2)
正常データ(クラスN)のみを用いて、規格化を行うためのパラメータ(平均と標準偏差)を求めました。それでは、ここで得られた値を用いて規格化を行います。
# クラスNの特徴量の規格化
for i in range(0, NumClassN):
for j in range(0, NumChannel):
for k in range(0, NumFeature):
FV_N[i][j][k] = (FV_N[i][j][k] - Zscore[j][k][0]) / Zscore[j][k][1]
# クラスA1の特徴量の規格化
for i in range(0, NumClassA1):
for j in range(0, NumChannel):
for k in range(0, NumFeature):
FV_A1[i][j][k] = (FV_A1[i][j][k] - Zscore[j][k][0]) / Zscore[j][k][1]
# クラスA2の特徴量の規格化
for i in range(0, NumClassA2):
for j in range(0, NumChannel):
for k in range(0, NumFeature):
FV_A2[i][j][k] = (FV_A2[i][j][k] - Zscore[j][k][0]) / Zscore[j][k][1]
正常データで算出した規格化の係数を用いて、クラスN, A1, A2を規格化していくわけです。これで規格化が行われました。どのような値になったか、X軸加速度の分散値を3例ずつみてみます。
for i in range(0, 3):
print("クラスN / Xacc / 分散: ", round(FV_N[i][0][1], 2))
print("クラスA1 / Xacc / 分散: ", round(FV_A1[i][0][1], 2))
print("クラスA2 / Xacc / 分散: ", round(FV_A2[i][0][1], 2))
クラスNはおとなしい値ですが、クラスA1, A2はかなり大きな値になっています。異常値検出において良い特徴量とは、正常データを用いて求めた規格化係数で規格化を行ったとき、異常データは吹っ飛ぶことと等価と言えるでしょう。
for i in range(0, 3):
print("クラスN / Xacc / エネルギー: ", round(FV_N[i][0][15], 2))
print("クラスA1 / Xacc / エネルギー: ", round(FV_A1[i][0][15], 2))
print("クラスA2 / Xacc / エネルギー: ", round(FV_A2[i][0][15], 2))
エネルギーも同様ですね。それが良い特徴量であれば、正常ならば0付近の値を示し、異常値の場合、吹っ飛びます。
続いて、特徴量ベクトルに加工します。解説はPyLearnSIG04と同じ。
FV_N_ML = np.zeros([NumClassN, 112]) # <- 50回の観測、112次元の特徴量(7チャンネル * 16種の特徴量)
FV_A1_ML = np.zeros([NumClassA1, 112]) # <- 50回の観測、112次元の特徴量(7チャンネル * 16種の特徴量)
FV_A2_ML = np.zeros([NumClassA2, 112]) # <- 50回の観測、112次元の特徴量(7チャンネル * 16種の特徴量)
# クラスN
for i in range(0, NumClassN):
FV_N_ML[i, :] = np.concatenate([
FV_N[i][0], # X軸加速度の特徴量 16種
FV_N[i][1], # Y軸加速度の特徴量 16種
FV_N[i][2], # Z軸加速度の特徴量 16種
FV_N[i][3], # X軸角速度の特徴量 16種
FV_N[i][4], # Y軸角速度の特徴量 16種
FV_N[i][5], # Z軸角速度の特徴量 16種
FV_N[i][6], # 合成加速度の特徴量 16種
], 0)
# クラスA1
for i in range(0, NumClassA1):
FV_A1_ML[i, :] = np.concatenate([
FV_A1[i][0], # X軸加速度の特徴量 16種
FV_A1[i][1], # Y軸加速度の特徴量 16種
FV_A1[i][2], # Z軸加速度の特徴量 16種
FV_A1[i][3], # X軸角速度の特徴量 16種
FV_A1[i][4], # Y軸角速度の特徴量 16種
FV_A1[i][5], # Z軸角速度の特徴量 16種
FV_A1[i][6], # 合成加速度の特徴量 16種
], 0)
# クラスA2
for i in range(0, NumClassA2):
FV_A2_ML[i, :] = np.concatenate([
FV_A2[i][0], # X軸加速度の特徴量 16種
FV_A2[i][1], # Y軸加速度の特徴量 16種
FV_A2[i][2], # Z軸加速度の特徴量 16種
FV_A2[i][3], # X軸角速度の特徴量 16種
FV_A2[i][4], # Y軸角速度の特徴量 16種
FV_A2[i][5], # Z軸角速度の特徴量 16種
FV_A2[i][6], # 合成加速度の特徴量 16種
], 0)
# 特徴量IDと種別を表示
FeatureName = ["平均", "分散", "歪度", "尖度", "最小", "最大", "中央値", "RMS", "動的",
"PB1", "PB2", "PB3", "PB4", "PB5", "エントロピー", "エネルギー"]
AxisName = ["Xacc", "Yacc", "Zacc", "Xang", "Yang", "Zang", "Mag"]
i=0
for j in range(0, NumChannel):
for k in range(0, NumFeature):
print("特徴量ID ", i, ": ", AxisName[j], FeatureName[k])
i = i + 1
これで特徴量ベクトルに加工できました。FV_N_MLには、50行112列のの数字が集まっています。行が50回分の個別の観測結果、列が112種の特徴量の値となります(A1, A2も同じ)。どんな値が得られているのか、imshow関数と呼ばれるヒートマップを作成する関数で可視化してみます。
c=(-10, 10) # 色の最小・最大範囲
from matplotlib import pyplot as plt
plt.figure(figsize=(50, 30)) # 描画領域の横幅, 縦幅
# クラスN
plt.subplot(3,1,1)
plt.imshow(FV_N_ML, clim=c)
plt.colorbar()
plt.title('Feature vector: Class N', size=30)
plt.xlabel('Feature ID', size=30)
plt.ylabel('Observation', size=30)
plt.xticks(np.arange(0, 112, 5), size = 15) # メモリ間隔
plt.yticks(np.arange(0, 50, 5), size = 15)
# クラスA1
plt.subplot(3,1,2)
plt.imshow(FV_A1_ML, clim=c)
plt.colorbar()
plt.title('Feature vector: Class A1', size=30)
plt.xlabel('Feature ID', size=30)
plt.ylabel('Observation', size=30)
plt.xticks(np.arange(0, 112, 5), size = 15) # メモリ間隔
plt.yticks(np.arange(0, 50, 5), size = 15)
# クラスA2
plt.subplot(3,1,3)
plt.imshow(FV_A2_ML, clim=c)
plt.colorbar()
plt.title('Feature vector: Class A2', size=30)
plt.xlabel('Feature ID', size=30)
plt.ylabel('Observation', size=30)
plt.xticks(np.arange(0, 112, 5), size = 15) # メモリ間隔
plt.yticks(np.arange(0, 50, 5), size = 15)
plt.show()
上から順に、クラスN, A1, A2です。縦軸が50回分の観測、横軸が112個の特徴量となります。これを見るだけで、正常なモータ、グリスを塗らない場合、ネジをゆるめた場合で、特徴量にどのような違いが出るか、かなりたくさんの考察ができそうです。
異常データの多くの特徴量が、吹き飛ぶことを確認できます。 特徴量10と91で散布図を書いてみましょう。
# N, A1, A2の順に散布図を作成
plt.figure(figsize=(14, 4)) # 描画領域の横幅, 縦幅
plt.subplot(1,2,1)
ax1 = 10
ax2 = 91
plt.scatter(FV_N_ML[:,ax1], FV_N_ML[:,ax2], s=50, c='blue', alpha=0.8)
plt.scatter(FV_A1_ML[:,ax1], FV_A1_ML[:,ax2], s=50, c='red', alpha=0.8)
plt.scatter(FV_A2_ML[:,ax1], FV_A2_ML[:,ax2], s=50, c='green', alpha=0.8)
plt.title("Feature space of F" + str(ax1) + " and F" + str(ax2))
plt.xlabel("Feature " + str(ax1))
plt.ylabel("Feature " + str(ax2))
plt.grid(True) #グリッド線(True:引く、False:引かない)
plt.legend(["N", "A1", "A2"]) # 凡例
正常データ(クラスN)だけ落ち着いており、異常データは吹き飛んでいることがわかります。 とは言え、クラスNとクラスA2が混ざってそうにも見えるので、左下を拡大してみます。
# N, A1, A2の順に散布図を作成
plt.figure(figsize=(14, 4)) # 描画領域の横幅, 縦幅
plt.subplot(1,2,1)
ax1 = 10
ax2 = 91
plt.scatter(FV_N_ML[:,ax1], FV_N_ML[:,ax2], s=50, c='blue', alpha=0.8)
plt.scatter(FV_A1_ML[:,ax1], FV_A1_ML[:,ax2], s=50, c='red', alpha=0.8)
plt.scatter(FV_A2_ML[:,ax1], FV_A2_ML[:,ax2], s=50, c='green', alpha=0.8)
plt.title("Feature space of F" + str(ax1) + " and F" + str(ax2))
plt.xlabel("Feature " + str(ax1))
plt.ylabel("Feature " + str(ax2))
plt.grid(True) #グリッド線(True:引く、False:引かない)
plt.xlim([-10, 10]) # 範囲指定
plt.ylim([-10, 10]) # 範囲指定
plt.legend(["N", "A1", "A2"]) # 凡例
クラスNはいい感じに分離できていることを確認できました。
標準偏差距離による異常値検知とは、平均から標準偏差いくつ分よりも大きくデータが離れていれば、異常値だと解釈する、最もオーソドックスな異常値検知手法となります。
これを特徴量10と特徴量91の2次元空間で判定するコードを書いてみます。今回は可視化したいので2つにしていますが、3つでも4つでも、端的に言えばいくつ使っても構いません。ただし、使用する特徴量を増やしすぎると次元呪いに触れる可能性があるので注意してください。
# 正常データについて、特徴量10と特徴量91の平均値を算出。
ax1, ax2 = 10, 91
M1 = np.mean(FV_N_ML[:,ax1])
M2 = np.mean(FV_N_ML[:,ax2])
# 標準偏差も算出。
S1 = np.std(FV_N_ML[:,ax1])
S2 = np.std(FV_N_ML[:,ax2])
# nは標準偏差の何倍を基準とするか。datは特徴量が格納された2次元のリスト
# 正常ならば1、異常ならば-1を出力
def AnomalyDetection_StdDist(M1, M2, S1, S2, n, dat):
F1 = dat[0]
F2 = dat[1]
res = 0
if F1 >= M1 - n * S1 and F1 <= M1 + n * S1: # 特徴量1は条件を満たすか
if F2 >= M2 - n * S2 and F2 <= M2 + n * S2: # 特徴量2は条件を満たすか
res = 1
else:
res = -1
else:
res = -1
return res
関数ができたところで、4標準偏差以内を正常としていくつか判定してみます。まずは、クラスNの0番目。
F1 = FV_N_ML[0, ax1]
F2 = FV_N_ML[0, ax2]
n=4 # 異常値検出の条件
res = AnomalyDetection_StdDist(M1, M2, S1, S2, n, [F1, F2])
print(res)
1なので、正常と判定してくれました。続いてクラスNの5番目です。
F1 = FV_N_ML[5, ax1]
F2 = FV_N_ML[5, ax2]
n=4 # 異常値検出の条件
res = AnomalyDetection_StdDist(M1, M2, S1, S2, n, [F1, F2])
print(res)
これも正常と判定してくれました。続いて異常値を入れてみます。
F1 = FV_A1_ML[0, ax1]
F2 = FV_A1_ML[0, ax2]
n=4 # 異常値検出の条件
res = AnomalyDetection_StdDist(M1, M2, S1, S2, n, [F1, F2])
print(res)
F1 = FV_A2_ML[0, ax1]
F2 = FV_A2_ML[0, ax2]
n=4 # 異常値検出の条件
res = AnomalyDetection_StdDist(M1, M2, S1, S2, n, [F1, F2])
print(res)
クラスA1, A2を入れたところ、どちらも異常と判定されました。いい感じです。 続いて、標準偏差距離による異常値検知は、如何なる境界を有しているのか可視化してみます。
Xmin, Ymin, Xmax, Ymax = -10, -10, 10, 10 # 空間の最小最大値
resolution = 0.2 # 細かさ
x_mesh, y_mesh = np.meshgrid(np.arange(Xmin, Xmax, resolution),
np.arange(Ymin, Ymax, resolution))
MeshDat = np.array([x_mesh.ravel(), y_mesh.ravel()]).T
ViewDat = np.zeros([len(MeshDat[:,0])])
for i in range(0, len(MeshDat[:,0])):
F1 = MeshDat[i, 0]
F2 = MeshDat[i, 1]
ViewDat[i]=AnomalyDetection_StdDist(M1, M2, S1, S2, n, [F1, F2])
z = np.reshape(ViewDat, (len(x_mesh), len(y_mesh))) # 空間のためにデータ整形
# 正常・異常領域のメッシュプロット
plt.scatter(MeshDat[ViewDat==1,0], MeshDat[ViewDat==1,1], c="blue", s=10, alpha=0.1)
plt.scatter(MeshDat[ViewDat==-1,0], MeshDat[ViewDat==-1,1], c="red", s=10, alpha=0.1)
# 観測した正常・異常データのプロット
plt.scatter(FV_N_ML[:,ax1], FV_N_ML[:,ax2], s=50, c='blue', alpha=0.8)
plt.scatter(FV_A1_ML[:,ax1], FV_A1_ML[:,ax2], s=50, c='red', alpha=0.8)
plt.scatter(FV_A2_ML[:,ax1], FV_A2_ML[:,ax2], s=50, c='green', alpha=0.8)
plt.title("Feature space of F" + str(ax1) + " and F" + str(ax2))
plt.xlabel("Feature " + str(ax1))
plt.ylabel("Feature " + str(ax2))
plt.grid(True) #グリッド線(True:引く、False:引かない)
plt.xlim([Xmin, Ymax]) # 範囲指定
plt.ylim([Ymin, Ymax]) # 範囲指定
plt.legend(["Space: N", "Space: A", "N", "A1", "A2"]) # 凡例
薄い青が正常だと判定される部分空間、薄い赤が異常だと判定される部分空間です。標準偏差距離のnを小さくすると正常領域が狭まり、大きくすると広まることを意味します。すなわちnによって、判定の厳しさを制御することが可能になります。
今回は距離を「標準偏差何倍分だけ離れているか?」という基準で異常・正常の判定を行いましたが、ユークリッド距離なども使えます。そうすると円形の識別境界を生成することも可能になります。色々試してみるといいでしょう。
さて、5章にて2つの特徴量を用いてその空間上で、標準偏差に基づく距離を使用して異常値検出を行いました。ただしあの方法ですと、長方形や円、楕円のように、きれいな図形でしか境界を生成することができません(円や楕円を生成する方法は、高校数学をちょっと復習するとわかると思います)。世の中の問題を解いているときに、そんなにきれいな境界で検出を行えない場合もあります。すなわち、きれいな境界ではなく、ぐにゃぐにゃした識別境界を生成したい、という状況もあり得ます。
これを実現する一つの方法に、One class SVM (OC-SVM)と呼ばれる方法があります。本章では、この実現方法と識別境界について説明します。
まずはじめに使用する特徴量を決定します。今回は識別境界を可視化できるようにするため、2つとしてみます。5章で述べたように、可視化したいので2つにしているだけです。実運用の場合はもっとたくさん使ってください。
# 使用する特徴量の決定
ax1 = 10
ax2 = 91
X = np.concatenate([FV_N_ML[:,ax1:ax1+1], FV_N_ML[:,ax2:ax2+1]], 1)
これで、特徴量10と91のみが取り出せました。復習ですが、異常データは手元にないというシーンを想定しているので、正常データのみでokです。
続いてOC-SVMの実装です。pythonの場合、OneClassSVM関数を使用します。引数の意味は以下の通り。
今回は勉強として、以下4つのモデルを作ってみます。
from sklearn.svm import OneClassSVM
# モデル構築
OCSVM_LL = OneClassSVM(kernel='rbf', gamma=0.01, nu = 0.01)
OCSVM_LH = OneClassSVM(kernel='rbf', gamma=0.01, nu = 0.3)
OCSVM_HL = OneClassSVM(kernel='rbf', gamma=0.5, nu = 0.01)
OCSVM_HH = OneClassSVM(kernel='rbf', gamma=0.5, nu = 0.3)
# 学習
# 異常値検知(教師なし学習)なので、ラベルは不要
OCSVM_LL.fit(X)
OCSVM_LH.fit(X)
OCSVM_HL.fit(X)
OCSVM_HH.fit(X)
これで学習ができました。特徴量10と91がそれぞれ0のとき、正常・異常どう判定されるかみてみます。このために、predict関数に2つの数字を入れてばokです。返り値は以下の通りです。
# 判定(1なら正常、-1なら異常)
F1, F2 = 0, 0
res_ocsvm = OCSVM_LL.predict([[F1, F2]])
print(res_ocsvm)
OCSVM_LLを使うと、1が返ってきましたので、特徴量10と91が0ならば、正常と判定されるようです。とはいえ、どのような境界が生成されたかわからないので、可視化してみます。
Xmin, Ymin, Xmax, Ymax = -10, -10, 10, 10 # 空間の最小最大値
resolution = 0.2 # 細かさ
x_mesh, y_mesh = np.meshgrid(np.arange(Xmin, Xmax, resolution),
np.arange(Ymin, Ymax, resolution))
MeshDat = np.array([x_mesh.ravel(), y_mesh.ravel()]).T
plt.figure(figsize=(10, 10)) # 描画領域の横幅, 縦幅
plt.subplots_adjust(wspace=0.4, hspace=0.3)
# --- LL ---
plt.subplot(2,2,1)
ViewDat_LL = np.zeros([len(MeshDat[:,0])])
for i in range(0, len(MeshDat[:,0])):
F1 = MeshDat[i, 0]
F2 = MeshDat[i, 1]
ViewDat_LL[i]=OCSVM_LL.predict([[F1, F2]])
# 正常・異常領域のメッシュプロット
plt.scatter(MeshDat[ViewDat_LL==1,0], MeshDat[ViewDat_LL==1,1], c="blue", s=10, alpha=0.1)
plt.scatter(MeshDat[ViewDat_LL==-1,0], MeshDat[ViewDat_LL==-1,1], c="red", s=10, alpha=0.1)
# 観測した正常・異常データのプロット
plt.scatter(FV_N_ML[:,ax1], FV_N_ML[:,ax2], s=20, c='blue', alpha=0.8)
plt.scatter(FV_A1_ML[:,ax1], FV_A1_ML[:,ax2], s=20, c='red', alpha=0.8)
plt.scatter(FV_A2_ML[:,ax1], FV_A2_ML[:,ax2], s=20, c='green', alpha=0.8)
plt.title("OC-SVM LL: F" + str(ax1) + " and F" + str(ax2))
plt.xlabel("Feature " + str(ax1))
plt.ylabel("Feature " + str(ax2))
plt.grid(True) #グリッド線(True:引く、False:引かない)
plt.xlim([Xmin, Ymax]) # 範囲指定
plt.ylim([Ymin, Ymax]) # 範囲指定
plt.legend(["Space: N", "Space: A", "N", "A1", "A2"]) # 凡例
# --- LH ---
plt.subplot(2,2,2)
ViewDat_LH = np.zeros([len(MeshDat[:,0])])
for i in range(0, len(MeshDat[:,0])):
F1 = MeshDat[i, 0]
F2 = MeshDat[i, 1]
ViewDat_LH[i]=OCSVM_LH.predict([[F1, F2]])
# 正常・異常領域のメッシュプロット
plt.scatter(MeshDat[ViewDat_LH==1,0], MeshDat[ViewDat_LH==1,1], c="blue", s=10, alpha=0.1)
plt.scatter(MeshDat[ViewDat_LH==-1,0], MeshDat[ViewDat_LH==-1,1], c="red", s=10, alpha=0.1)
# 観測した正常・異常データのプロット
plt.scatter(FV_N_ML[:,ax1], FV_N_ML[:,ax2], s=20, c='blue', alpha=0.8)
plt.scatter(FV_A1_ML[:,ax1], FV_A1_ML[:,ax2], s=20, c='red', alpha=0.8)
plt.scatter(FV_A2_ML[:,ax1], FV_A2_ML[:,ax2], s=20, c='green', alpha=0.8)
plt.title("OC-SVM LH: F" + str(ax1) + " and F" + str(ax2))
plt.xlabel("Feature " + str(ax1))
plt.ylabel("Feature " + str(ax2))
plt.grid(True) #グリッド線(True:引く、False:引かない)
plt.xlim([Xmin, Ymax]) # 範囲指定
plt.ylim([Ymin, Ymax]) # 範囲指定
plt.legend(["Space: N", "Space: A", "N", "A1", "A2"]) # 凡例
# --- HL ---
plt.subplot(2,2,3)
ViewDat_HL = np.zeros([len(MeshDat[:,0])])
for i in range(0, len(MeshDat[:,0])):
F1 = MeshDat[i, 0]
F2 = MeshDat[i, 1]
ViewDat_HL[i]=OCSVM_HL.predict([[F1, F2]])
# 正常・異常領域のメッシュプロット
plt.scatter(MeshDat[ViewDat_HL==1,0], MeshDat[ViewDat_HL==1,1], c="blue", s=10, alpha=0.1)
plt.scatter(MeshDat[ViewDat_HL==-1,0], MeshDat[ViewDat_HL==-1,1], c="red", s=10, alpha=0.1)
# 観測した正常・異常データのプロット
plt.scatter(FV_N_ML[:,ax1], FV_N_ML[:,ax2], s=20, c='blue', alpha=0.8)
plt.scatter(FV_A1_ML[:,ax1], FV_A1_ML[:,ax2], s=20, c='red', alpha=0.8)
plt.scatter(FV_A2_ML[:,ax1], FV_A2_ML[:,ax2], s=20, c='green', alpha=0.8)
plt.title("OC-SVM HL: F" + str(ax1) + " and F" + str(ax2))
plt.xlabel("Feature " + str(ax1))
plt.ylabel("Feature " + str(ax2))
plt.grid(True) #グリッド線(True:引く、False:引かない)
plt.xlim([Xmin, Ymax]) # 範囲指定
plt.ylim([Ymin, Ymax]) # 範囲指定
plt.legend(["Space: N", "Space: A", "N", "A1", "A2"]) # 凡例
# --- HH ---
plt.subplot(2,2,4)
ViewDat_HH = np.zeros([len(MeshDat[:,0])])
for i in range(0, len(MeshDat[:,0])):
F1 = MeshDat[i, 0]
F2 = MeshDat[i, 1]
ViewDat_HH[i]=OCSVM_HH.predict([[F1, F2]])
# 正常・異常領域のメッシュプロット
plt.scatter(MeshDat[ViewDat_HH==1,0], MeshDat[ViewDat_HH==1,1], c="blue", s=10, alpha=0.1)
plt.scatter(MeshDat[ViewDat_HH==-1,0], MeshDat[ViewDat_HH==-1,1], c="red", s=10, alpha=0.1)
# 観測した正常・異常データのプロット
plt.scatter(FV_N_ML[:,ax1], FV_N_ML[:,ax2], s=20, c='blue', alpha=0.8)
plt.scatter(FV_A1_ML[:,ax1], FV_A1_ML[:,ax2], s=20, c='red', alpha=0.8)
plt.scatter(FV_A2_ML[:,ax1], FV_A2_ML[:,ax2], s=20, c='green', alpha=0.8)
plt.title("OC-SVM HH: F" + str(ax1) + " and F" + str(ax2))
plt.xlabel("Feature " + str(ax1))
plt.ylabel("Feature " + str(ax2))
plt.grid(True) #グリッド線(True:引く、False:引かない)
plt.xlim([Xmin, Ymax]) # 範囲指定
plt.ylim([Ymin, Ymax]) # 範囲指定
plt.legend(["Space: N", "Space: A", "N", "A1", "A2"]) # 凡例
標準偏差距離の場合とは異なり、境界がぐにゃぐにゃと曲げられていますことが確認できます。gamma, nu、どのくらいが良いモデルでしょうか。人によって違いそうですね。
ところで、もうちょっと正常領域を広く撮りたいななどと考えはしないでしょうか。少し工夫をすると、この処理を実現できます。OC-SVMは、正常であれば1を、異常であれば-1を出力する性質がありますが、計算の過程で、一度実数値を求めてそれが0以上であれば1(正常)を、0未満であれば-1(異常)を出力するという構造をしています。
したがって、この実数値を0以上0未満で区切るのではなく、-0.5を基準とするなどのように、基準を下げてあげることで正常領域を広くすることができます。
これを、OCSVM_LLに実装してみたいと思います。1/-1の判定結果と判定前の実数値を取り出すためのメソッドはそれぞれ以下の通りです。
すなわち、decision_functionで得られた値に対し、基準としたい値でif文を書き、1/-1を返してあげれば良いということになります。これを利用して新たな判定関数を作ってみます。
# 特徴量1, 特徴量2, 判定基準, OC-SVMのモデルを入れると、1/-1を返す関数
def new_predict_OCSVM(F1, F2, th, model):
val = model.decision_function([[F1, F2]])
if val >= th:
return 1
else:
return -1
# 使用する場合
F1, F2 = 0, 0
th = -0.1
new_predict_OCSVM(0, 0, 0, OCSVM_LL)
-0.1以上を正常、それ未満を異常にするという、通常よりもゆるい判定を行うOC-SVMを構築しました。これできちんと正常領域が広がっているのか、確認してみます。
ViewDat_LL = np.zeros([len(MeshDat[:,0])])
for i in range(0, len(MeshDat[:,0])):
F1 = MeshDat[i, 0]
F2 = MeshDat[i, 1]
# ViewDat_LL[i]=OCSVM_LL.predict([[F1, F2]]) # 普通のSVM
ViewDat_LL[i] = new_predict_OCSVM(F1, F2, -0.1, OCSVM_LL) # ゆるいSVM
# 正常・異常領域のメッシュプロット
plt.scatter(MeshDat[ViewDat_LL==1,0], MeshDat[ViewDat_LL==1,1], c="blue", s=10, alpha=0.1)
plt.scatter(MeshDat[ViewDat_LL==-1,0], MeshDat[ViewDat_LL==-1,1], c="red", s=10, alpha=0.1)
# 観測した正常・異常データのプロット
plt.scatter(FV_N_ML[:,ax1], FV_N_ML[:,ax2], s=20, c='blue', alpha=0.8)
plt.scatter(FV_A1_ML[:,ax1], FV_A1_ML[:,ax2], s=20, c='red', alpha=0.8)
plt.scatter(FV_A2_ML[:,ax1], FV_A2_ML[:,ax2], s=20, c='green', alpha=0.8)
plt.title("OC-SVM LL: F" + str(ax1) + " and F" + str(ax2))
plt.xlabel("Feature " + str(ax1))
plt.ylabel("Feature " + str(ax2))
plt.grid(True) #グリッド線(True:引く、False:引かない)
plt.xlim([Xmin, Ymax]) # 範囲指定
plt.ylim([Ymin, Ymax]) # 範囲指定
plt.legend(["Space: N", "Space: A", "N", "A1", "A2"]) # 凡例
正常領域が、通常のOCSVM_LLよりも広がりました。性能を高めるため、こういった工夫も必要であることを念頭におくといいですね。正常領域が狭すぎると、正常を異常だとみなしてしまう場合が起こり得ます。そのため状況によっては正常領域を広げたくなります。ただし、正常領域を広めるということは、異常を正常と答える可能性が高まってしまうことに注意してください。トレードオフの関係ですので、試行錯誤的に正常領域の面積を操作することも必要でしょう。
さて、これまで任意に取り出した特徴量を用いて、正常・異常を判定するモデルを構築してきました。距離を用いたモデルは、きれいな図形状の識別境界を生成できて、OC-SVMであればぐにゃぐにゃと曲げた識別境界を生成できることがわかりました。
ところで、一見して良い特徴量を発見できない場合はどうすれば良いでしょうか。この場合、すべての特徴量を用い、それを次元圧縮することで、正常と異常が分離された特徴量を取得するというアプローチがあります。
これを実現するには、入力されたデータを再現するディープラーニング、オートエンコーダ (Auto Encorder; AE) を活用します。距離やOC-SVMによる異常値検知は、すでに獲得されている空間に対して識別境界を生成するモデルであることに対し、オートエンコーダは空間を獲得する方法であるので、混同しないように注意してください。
オートエンコーダを理解するには、まずはじめに回帰型ニューラルネットワークについて理解せねばいけません。こちらのページの一番下にこの解説があるので、まずはこれを理解してください。
理解すると、回帰型ニューラルネットワークが実数にて定義づけられた値を推定するモデルであることがわかると思います。オートエンコーダとは、回帰型ニューラルネットワークの出力に対し、入力の推定値を割り当てたモデルとなります。すなわち、ある特徴量ベクトルを入力すると、その特徴量ベクトルとまったく同じ値を出力することを目標としています。
と聞いても、既に明らかになっている入力を推定しても何も嬉しくありません。ここで、3層階層型ニューラルネットワークの構造について考えてみます。
ここで、入力ベクトルがn1次元だとすると、一度n2次元に変換されてから、n1次元に戻っていることがわかります。もしもここでn2次元をn1次元よりも小さい値に設定したならば、次元圧縮が行われていることになります。
オートエンコーダの出力は、入力を推定するわけですから、次元圧縮された中間層のベクトルは、入力の情報をできる限り保持したまま、次元だけが下がっていることになります。すなわち、オートエンコーダとは、入力の数字を再現できるような次元圧縮がなされていることになります。
これはとても便利です。例えば、データの転送速度が遅い状況を考えてみます。100個の数字を送りたい場合、100このまま送っていると、時間がたくさんかかってしまいます。しかし、転送側にオートエンコーダの入力層から中間層への変換、受信側にオートエンコーダの中間層から出力層への変換を実装すると、どうでしょうか。この場合、100個の入力をデータを転送する前に10個に圧縮するといったことができますので、転送速度が高速になります。100個を10個にしているので当然情報損失の懸念がありますが、受信側に入力を再現するような出力を行える変換があるわけですから、データ受信側で10個の数字を100個に戻せるわけです。
以上がオートエンコーダの利点ですが、実はこれを異常値検知やクラス分類問題など、様々なモデルに応用できることが明らかになっています。以前説明した通り、機械学習で良いモデルを作るには、入力次元をできるだけ下げた方が良いという考えがあります。次元の呪いに触れることから莫大なデータ数が必要になったり、高次元だと中身がブラックボックスになりがちになるためです。
特に教師なし学習による異常値検知問題の場合、正常データのみしか使えないわけですから、正常データを入力すると、正常データを出力するようなオートエンコーダが出来上がります。ここで、次元圧縮が行われた中間層に注目してみます。圧縮された空間上では、オートエンコーダ自身がよく知っている入力を与えると、ある一部分にデータが集まり、よく知っているデータから外れたデータ(つまり、異常データ)が入力されると、圧縮された空間において、正常データが集まる場所とは異なる部分にデータが集まりやすい、という特性が表出します。
つまり、オートエンコーダの中間層を取り出して得られる空間上では、非常に異常値検知が行いやすいことを意味します。 オートエンコーダにより次元圧縮された空間にて、OC-SVMなどで識別境界を生成すると、精度の良い異常値検出モデルができるのではないか?ということに着想するわけです。
具体的な構築の流れとして、以下を例にとってプログラムを組んでみます。
異常値検知モデルの構築:
この空間上で、正常データのみを使って、OC-SVMを構築する。
112次元の特徴量ベクトルをオートエンコーダに入力することで、次元圧縮を行う。
異常値検知モデルの使用:
まずはじめに、回帰型ニューラルネットワーク(オートエンコーダ)を定義します。 活性化関数として、恒等写像を利用します。ReLu関数やシグモイド関数でもokですが、ReLu関数は最小値0、シグモイド関数は最小0最大1というように、値の範囲が定まってしまうので、その点も考慮してください。
from sklearn.neural_network import MLPRegressor
Hidden_neuron = 3 # 中間層の次元(何次元に圧縮したいか?)
AE = MLPRegressor(activation="identity", max_iter=1000, tol = pow(10, -20),
hidden_layer_sizes=(Hidden_neuron,),
learning_rate_init=0.01, random_state=1, verbose=False)
続いて、このAEを学習してみます。入力とまったく同じ出力を目指すわけですから、第一引数と第二引数が同じになります。また、正常データしか使ってはいけないので、正常データのみを指定します。
AE.fit(FV_N_ML, FV_N_ML) # 正常データのみを用いて、自分自身を再現するような学習
AEが入力を再現するような出力をできているのか、チェックしてみます。 0回目の正常データの観測における、112次元の特徴量ベクトルと、AEの出力を見てみます。
# ちゃんと再現できているかチェック
resnn = AE.predict(FV_N_ML)
plt.plot(resnn[0,0:112], label = "Output of AE")
plt.plot(FV_N_ML[0,0:112], label = "Input")
plt.legend()
plt.xlabel("Elements of feature vector")
完全に一致しているわけではありませんが、ある程度似た値を出力していますね。AEの出力は、112次元を3次元に落とし、112次元に復元しているのに、元々の112次元とある程度一致するというのは、なかなかすごいことだったりします。普通はここまで急激な次元圧縮は行いません。今回は可視化したいので3次元まで落としているだけです。もうちょっと再現したいと思う場合は、圧縮先を10次元くらいにするなどしてください。(Hidden_neuron=10とする)
さて、AEが出来上がったところで、次元圧縮されたベクトル、つまり中間層の値を取り出す関数を作ります。GetHiddenOutputLayer関数は、入力ベクトル, 隠れ層のウェイト, 隠れ層のバイアス, 出力層のウェイト, 出力層のバイアスを入力すると、中間層のベクトルと出力層のベクトルを返してくれます。
学習ずみのオートエンコーダのパラメータを得るには、.coefs、.interceptsメソッドを利用します。この意味は、ニューラルネットワークを手計算できるくらいに理解できなければわからないことですので、ニューラルネットの数式を読みたくない人は読み飛ばしてください。
# --- AEに入力ベクトルを入れると、中間層・隠れ層のベクトルを取り出す関数 ---
# 入力:
# 入力ベクトル, 隠れ層のウェイト, 隠れ層のバイアス, 出力層のウェイト, 出力層のバイアス
# 出力:
# 隠れ層のベクトル, 出力層のベクトル
def GetHiddenOutputLayer(InputVal, AEcoef1, AEInter1, AEcoef2, AEInter2):
# 隠れ層取得
Hidden = np.dot(InputVal, AEcoef1) + AEInter1
# relu関数の場合は以下が必要
#for i in range(0, len(Hidden)):
# if Hidden[i] < 0:
# Hidden[i]=0
# 出力層取得
Out = np.dot(Hidden, AEcoef2) + AEInter2
return [Hidden, Out]
# 学習済みAEのパラメータ取得
AEcoef1 = AE.coefs_[0]
AEInter1 = AE.intercepts_[0]
AEcoef2 = AE.coefs_[1]
AEInter2 = AE.intercepts_[1]
# 関数の使用
InputVal = FV_N_ML[0,:]
[Hidden, Out] = GetHiddenOutputLayer(InputVal, AEcoef1, AEInter1, AEcoef2, AEInter2)
# 検証
# python関数の出力
# resnn = AE.predict([FV_N_ML[0,:]])
# plt.scatter(Out, resnn)
AEにより次元圧縮された3次元ベクトルを得る関数を作成できた後は、正常・異常データをそれぞれプロットしてみます。3次元に圧縮しているわけですから、可視化できます。
# 正常データ
Hidden_N = []
Hidden_N_np = np.zeros([len(FV_N_ML[:,0]), Hidden_neuron])
for i in range(0, len(FV_N_ML[:,0])):
InputVal = FV_N_ML[i,:]
temp = GetHiddenOutputLayer(InputVal, AEcoef1, AEInter1, AEcoef2, AEInter2)[0] #only hidden
Hidden_N.append(temp)
for j in range(0, Hidden_neuron):
Hidden_N_np[i,j] = Hidden_N[i][j]
# 異常1データ
Hidden_A1 = []
Hidden_A1_np = np.zeros([len(FV_A1_ML[:,0]), Hidden_neuron])
for i in range(0, len(FV_A1_ML[:,0])):
InputVal = FV_A1_ML[i,:]
temp = GetHiddenOutputLayer(InputVal, AEcoef1, AEInter1, AEcoef2, AEInter2)[0] #only hidden
Hidden_A1.append(temp)
for j in range(0, Hidden_neuron):
Hidden_A1_np[i,j] = Hidden_A1[i][j]
# 異常2データ
Hidden_A2 = []
Hidden_A2_np = np.zeros([len(FV_A2_ML[:,0]), Hidden_neuron])
for i in range(0, len(FV_A2_ML[:,0])):
InputVal = FV_A2_ML[i,:]
temp = GetHiddenOutputLayer(InputVal, AEcoef1, AEInter1, AEcoef2, AEInter2)[0] #only hidden
Hidden_A2.append(temp)
for j in range(0, Hidden_neuron):
Hidden_A2_np[i,j] = Hidden_A2[i][j]
続いて、可視化します。
from mpl_toolkits.mplot3d import Axes3D
fig = plt.figure(figsize=(14, 7))
plt.subplots_adjust(wspace=0.0, hspace=0.1)
# 全部見る
ax = fig.add_subplot(1,2,1, projection='3d')
ax.scatter(Hidden_N_np[:,0], Hidden_N_np[:,1], Hidden_N_np[:,2], label="Class N") # 正常
ax.scatter(Hidden_A1_np[:,0], Hidden_A1_np[:,1], Hidden_A1_np[:,2], label="Class A1") # 異常1
ax.scatter(Hidden_A2_np[:,0], Hidden_A2_np[:,1], Hidden_A2_np[:,2], label="Class A2") # 異常2
ax.set_xlabel("AE Feature 1")
ax.set_ylabel("AE Feature 2")
ax.set_zlabel("AE Feature 3")
ax.set_title("Feature space of AE: all view")
plt.legend()
# 正常箇所の拡大図
ax = fig.add_subplot(1,2,2, projection='3d')
ax.scatter(Hidden_N_np[:,0], Hidden_N_np[:,1], Hidden_N_np[:,2], label="Class N") # 正常
ax.scatter(Hidden_A1_np[:,0], Hidden_A1_np[:,1], Hidden_A1_np[:,2], label="Class A1") # 異常1
ax.scatter(Hidden_A2_np[:,0], Hidden_A2_np[:,1], Hidden_A2_np[:,2], label="Class A2") # 異常2
ax.set_xlabel("AE Feature 1")
ax.set_ylabel("AE Feature 2")
ax.set_zlabel("AE Feature 3")
ax.set_title("Feature space of AE: Zooming")
ax.set_xlim([0, 500])
ax.set_ylim([-50, 50])
ax.set_zlim([-20, 20])
plt.legend()
plt.show()
左が3クラスのプロット、右が正常箇所を拡大したプロットです。AEにより次元圧縮された空間は、当初の説明通り、正常データは一箇所に集まるが、正常とは離れたデータだと、離れた箇所に現れることがわかりました。
この空間ならば、容易に異常値検出を行えそうです。それでは、最後の仕上げにAEにより圧縮された空間でOC-SVMによる判定器を作ってみます。
OCSVM_AE = OneClassSVM(kernel='rbf', gamma=0.001, nu = 0.001)
OCSVM_AE.fit(Hidden_N_np) # 異常値検知(教師なし学習)なので、ラベルは不要
モデルができたところで、手元にあるデータを判定させてみます。
res_N = OCSVM_AE.predict(Hidden_N_np)
res_A1 = OCSVM_AE.predict(Hidden_A1_np)
res_A2 = OCSVM_AE.predict(Hidden_A2_np)
print("正常データの判定結果: ")
print(res_N)
print("")
print("異常1データの判定結果: ")
print(res_A1)
print("")
print("異常2データの判定結果: ")
print(res_A2)
1が正常で、-1が異常でしたね。正常データだけを使って、正常と異常を切り分けるモデルを作ることができました。しかも、概ね良い判定結果が得られています。オートエンコーダを利用すると少しややこしくなりますが、目で見て自分で良さそうな特徴量を選ばなくても、自発的に良い特徴量を獲得することができる(場合が多い)です。ここら辺が、ディープラーニング系のメリットですね。