PyLearn SIG 05: 機械学習による異常値検知モデル(教師なし学習)

PyLearn SIG 04では、正常データ、異常データの二つを用いて、正常・異常状態を判定するクラス分類問題を解くモデルを構築しました。ただし、このためには正常と異常データを集めなくてはなりません。一般に、正常データは大量に集められるが、異常データは少ししか集められないという状況が多いでしょう。したがって、機械学習の分野では、正常データのみで正常と異常を切り分けるモデルを構築しよう、という問題が扱われるようになりました。これを、教師なし異常値検知と言います。「教師なし」とは分析に使用するデータはすべて正常として扱うので、もはや人が正解値を教える必要がない、よって、教師なし、というわけです。

0. 信号のロード

In [1]:
import numpy as np
import random

# 正常クラスの信号読み込み
Sig_N=[]
for i in range(101, 151):
    Sig_N.append(np.loadtxt("dat/" + str(i) + ".csv",
                                          delimiter=",", skiprows=1, usecols=range(2,8)))

# 異常1クラスの信号読み込み
Sig_A1=[]
for i in range(51, 101):
    Sig_A1.append(np.loadtxt("dat/" + str(i) + ".csv",
                                           delimiter=",", skiprows=1, usecols=range(2,8)))

# 異常2クラスの信号読み込み
Sig_A2=[]
for i in range(1, 51):
    Sig_A2.append(np.loadtxt("dat/" + str(i) + ".csv",
                                           delimiter=",", skiprows=1, usecols=range(2,8)))

これでSig_N 〜 Sig_A2のリストに対し、6軸のセンサデータが取得されました。

1. 合成加速度の算出

続いて、合成加速度を求めて7チャンネル慣性センサデータへと変換します。

In [2]:
# 合成加速度を算出する関数:
#  Input: 6チャンネルの慣性センサデータ(3軸加速度+3軸角速度)
#  Output: 最後の列に合成加速度が追加された7チャンネルの慣性センサデータ
def MakingMag(signal):
    M = signal[:,0] * signal[:,0] + signal[:,1] * signal[:,1] + (signal[:,2]) * (signal[:,2])
    M = np.sqrt(M)
    M_new = np.reshape(M, (len(M), 1))

    # 7チャンネル慣性センサデータの取得
    sig7 = np.concatenate([signal, M_new], 1)
    
    return sig7

この関数を使って、150回の観測データ(6チャンネル)を、合成加速度が追加された7チャンネルデータに変換します。

In [3]:
# 正常データ
Sig7_N = []
for i in range(0, len(Sig_N)):
    Sig7_N.append(MakingMag(Sig_N[i]))
    
# 異常1データ
Sig7_A1 = []
for i in range(0, len(Sig_A1)):
    Sig7_A1.append(MakingMag(Sig_A1[i]))
    
# 異常2データ
Sig7_A2 = []
for i in range(0, len(Sig_A2)):
    Sig7_A2.append(MakingMag(Sig_A2[i]))

2. 特徴量変換

続いて、150回の観測信号を特徴量に変換してみます。これを格納するリストとして、

  • FV_N: クラスNの特徴量を保存するリスト
  • FV_A1: クラスA1の特徴量を保存するリスト
  • FV_A2: クラスA2の特徴量を保存するリスト

を定義します(専門用語としてこれを特徴量ベクトルと呼びます。英語でFeature vector: FV)。変数の構造は、

  • FV_N[ i ]: i 回目の観測信号
  • FV_N[ i ][ j ]: i 回目の観測信号の軸 j
  • FV_N[ i ][ j ][ k ]: i 回目の観測信号の軸 j の特徴量 k

とします(FV_A1, A2も同じ)。

軸 j とは、それぞれ以下の意味を持ちます。

  • j = 0: X軸加速度
  • j = 1: Y軸加速度
  • j = 2: Z軸加速度
  • j = 3: X軸角速度
  • j = 4: Y軸角速度
  • j = 5: Z軸角速度
  • j = 6: 合成加速度

k 番目の特徴量は、それぞれ以下のように定義することとします。

  • k = 0: 平均、
  • k = 1: 分散、
  • k = 2: 歪度、
  • k = 3: 尖度、
  • k = 4: 最小値、
  • k = 5: 最大値、
  • k = 6: 中央値、
  • k = 7: RMS、
  • k = 8: 動的特徴量、
  • k = 9: パワーバンド(0 〜 10 Hz)
  • k = 10: パワーバンド(10 〜 20 Hz)
  • k = 11: パワーバンド(20 〜 30 Hz)
  • k = 12: パワーバンド(30 〜 40 Hz)
  • k = 13: パワーバンド(40 〜 50 Hz)# ここでナイキスト周波数
  • k = 14: 周波数領域エントロピー
  • k = 15: エネルギー

すなわち、

  • FV_N[5][3][2]: クラスNの5回目の観測信号のX軸角速度の歪度
  • FV_A1[0][2][4]: クラスA1の0回目の観測信号のZ軸角速度の最小値

となります。なお、これらの特徴量のうち、pythonで用意されていないものもたくさんあるので、まずはそれを求める関数を定義しておきます。

In [4]:
import numpy as np
import scipy.stats as st
from sklearn.linear_model import LinearRegression
from scipy.fftpack import fft

# *****************************************************************
# RMSを求める関数
#  Input: 1チャンネルの信号
def Calc_RMS(signal):
    a = signal * signal          # 二乗
    sum_a = np.sum(a)        # 総和
    sqrt_a = np.sqrt(sum_a) # 平方根
    RMS = np.mean(sqrt_a) # 平均値
    return RMS


# *****************************************************************
# 動的特徴量を求める関数
#  Input: 1チャンネルの信号
def Calc_LinearCoef(signal):
    # 入力変数(信号値に対応する連番の数字)
    X=np.arange(len(signal))
    X=np.reshape(X, (len(X), 1))

    # 出力変数
    Y=signal

    # 線形回帰モデルのパラメータフィッティング
    ModelLR = LinearRegression() # 線形回帰モデルのセット
    ModelLR.fit(X, Y) # パラメータ獲得(Xは時間、Yは信号値)
    
    # 動的特徴量を返り値に
    return ModelLR.coef_[0]


# *****************************************************************
# パワーバンドを求める関数
   # signal: 時系列信号(1軸分のみ)
   # sf: サンプリング周波数
   # min_f: パワーバンドを算出する周波数の最小値
   # max_f: パワーバンドを算出する周波数の最大値

def Calc_PowerBand(signal, sf, min_f, max_f):
    
    # パワースペクトル算出
    L = len(signal) # 信号長
    freq = np.linspace(0, sf, L) # 周波数
    yf = np.abs(fft(signal)) # 振幅スペクトル
    yf = yf * yf # パワースペクトル
    
    # 指定された周波数の探索
    n1, n2 = [], []
    for i in range(1, int(L/2)): # <- 直流成分は探索範囲から除外
        n1.append(np.abs(freq[i] - min_f))
        n2.append(np.abs(freq[i] - max_f))
    min_id = np.argmin(n1) # min_fと最も近い周波数が格納された配列番号取得
    max_id = np.argmin(n2)# max_fと最も近い周波数が格納された配列番号取得
    
    # 指定された周波数のパワーバンド算出
    PB = np.sum(yf[min_id:max_id])
                
    return PB


# *****************************************************************
# 周波数領域エントロピーを求める関数
   # signal: 時系列信号(1軸分のみ)
   # sf: サンプリング周波数

def Calc_FreqEnt(signal, sf):
    
    # パワースペクトル算出
    L = len(signal) # 信号長
    freq = np.linspace(0, sf, L) # 周波数
    yf = np.abs(fft(signal)) # 振幅スペクトル
    yf = yf * yf # パワースペクトル
     
    # 手続き1
    a = 0
    for i in range(1, int(L/2)): # <- 直流成分除去 & ナイキスト周波数まで
        a = a + yf[i]
    
    # 手続き2 & 3
    E = 0
    for i in range(1, int(L/2)): # <- 直流成分除去 & ナイキスト周波数まで
        E = E + (yf[i]/a) * np.log2(yf[i]/a)
    E = -E
    return E


# *****************************************************************
# エネルギーを算出する関数
   # signal: 時系列信号(1軸分のみ)
   # sf: サンプリング周波数
def Calc_Energy(signal, sf):
    
    # パワースペクトル算出
    L = len(signal) # 信号長
    freq = np.linspace(0, sf, L) # 周波数
    yf = np.abs(fft(signal)) # 振幅スペクトル
    yf = yf * yf # パワースペクトル
    
    # エネルギー算出
    En = np.mean(yf[1:int(L/2)]) # <- 直流成分除去 & ナイキスト周波数まで
    
    return En

個別の特徴量を算出する関数ができました。続いて、7チャンネル慣性センサデータを全ての特徴量に変換する関数を定義します。先ほど定義した FV[ i ] [ j ] [ k ]構成の出力を行います。

In [5]:
def FeatureVector(signal, NumClass, NumChannel, NumFeature, sf):
    FV = []
    for i in range(0, NumClass):
        TempList = []
        for j in range(0, NumChannel):
            TempAry = np.zeros([1, NumFeature])
        
            # 観測信号 i の軸 j の...
            sig = signal[i][:, j]
            TempAry[0, 0]   = np.mean(sig)                              # j=0 平均
            TempAry[0, 1]   = np.var(sig)                                  # j=1 分散
            TempAry[0, 2]   = st.skew(sig)                               # j=2 歪度
            TempAry[0, 3]   = st.kurtosis(sig)                           # j=3 尖度
            TempAry[0, 4]   = np.min(sig)                                 # j=4 最小値
            TempAry[0, 5]   = np.max(sig)                                # j=5 最大値
            TempAry[0, 6]   = np.median(sig)                           # j=6 中央値
            TempAry[0, 7]   = Calc_RMS(sig)                            # j=7 RMS
            TempAry[0, 8]   = Calc_LinearCoef(sig)                  # j=8 動的特徴量
            TempAry[0, 9]   = Calc_PowerBand(sig, sf,   0, 10) # j=9 PB1
            TempAry[0, 10] = Calc_PowerBand(sig, sf, 10, 20) # j=10 PB2
            TempAry[0, 11] = Calc_PowerBand(sig, sf, 20, 30) # j=11 PB3
            TempAry[0, 12] = Calc_PowerBand(sig, sf, 30, 40) # j=12 PB4
            TempAry[0, 13] = Calc_PowerBand(sig, sf, 40, 50) # j=13 PB5
            TempAry[0, 14] = Calc_FreqEnt(sig, sf)                  # j=14 周波数領域エントロピー
            TempAry[0, 15] = Calc_Energy(sig, sf)                   # j=15 エネルギー
        
            TempList.append(TempAry[0])
        
        FV.append(TempList)
    
    return FV

特徴量変換の関数ができたところで、実際にクラスN, A1, A2について、それぞれ50観測分のデータを特徴量に変換します。

In [6]:
NumChannel = len(Sig7_N[0][0, :]) # 軸の最大数
NumFeature = 16                          # 扱う特徴量の数
NumClassN = len(Sig7_N)              # クラスNのデータ数
NumClassA1 = len(Sig7_A1)          # クラスNのデータ数
NumClassA2 = len(Sig7_A2)          # クラスNのデータ数
sf = 100                                        # サンプリング周波数

# クラスNの特徴量変換
FV_N = FeatureVector(Sig7_N, NumClassN, NumChannel, NumFeature, sf)

# クラスA1の特徴量変換
FV_A1 = FeatureVector(Sig7_A1, NumClassA1, NumChannel, NumFeature, sf)

# クラスA2の特徴量変換
FV_A2 = FeatureVector(Sig7_A2, NumClassA2, NumChannel, NumFeature, sf)

これで、7チャンネルのデータを16種の特徴量に変換しました。すなわち、$7 \times 16 = 112$個の特徴量に変換されました。これは、1回の観測を112次元のベクトルに変換したことと等価です。1回の観測を112個の特徴で表現しました、ということです。

3. 特徴量の規格化

以上一連の流れは、クラス分類問題(PyLearnSIG04)とまったく同じです。異常値検知問題の場合は、ここからが少し異なりますので、注意してください。

ここから、特徴量同士のスケールの違いから、Z-score normalizationによる規格化を実施しました。これは、各特徴量の平均値と標準偏差を利用した規格化です。このとき、クラス分類問題の場合は、クラスN, A1, A2すべてのデータを使い、平均値と標準偏差を算出していたことを思い出してください。

異常値検出問題とは、正常データしか集められない場合において、異常状態を検出することを目標とします。すなわち、今は異常状態であるクラスA1, A2のデータがありますが、本来はないものです。これから作成する異常値検出モデルの精度評価を行うために用意しているだけです。したがって、規格化を含めたモデル構築のあらゆる処理において、異常状態であるクラスA1, A2を使用してはいけません。使っていいのは、モデルの検出精度を測る時だけです。

クラス分類問題の場合は、クラスN, A1, A2をすべて使い規格化に必要となる平均値と標準偏差を計算していましたが、異常値検知問題の場合は正常データしか集められないことを前提としているので、クラスNのみで平均値と標準偏差を算出することになります。

これを念頭に、Z-score normalizationを行います。表現は以下の通り。

  • Zscore[ j ][ k ][ 0 ]: 軸 j の特徴量 k の平均
  • Zscore[ j ][ k ][ 1 ]: 軸 j の特徴量 k の標準偏差
In [7]:
# 観測データ数の獲得
NumDatN = len(FV_N)
NumDatA1 = len(FV_A1)
NumDatA2 = len(FV_A2)
Zscore = []
for j in range(0, NumChannel):
    temp2 = []
    for k in range(0, NumFeature):
        
        temp1 = []
        temp_zscore = np.zeros([1, 2])
        
        # 軸 j , 特徴量 k を1列に並べる        
        for i in range(0, NumDatN):
            temp1.append(FV_N[i][j][k])
            
        ### --- ここはクラス分類問題での規格化 --- ###
        # for i in range(0, NumDatA1):
        #    temp1.append(FV_A1[i][j][k])

        #for i in range(0, NumDatA2):
        #    temp1.append(FV_A1[i][j][k])

        # 軸 j , 特徴量 k の 平均・標準偏差を取得
        Val_mean = np.mean(temp1)
        Val_std = np.std(temp1)
        temp_zscore[0, 0] = Val_mean
        temp_zscore[0, 1] = Val_std

        temp2.append(temp_zscore[0])

    Zscore.append(temp2)

正常データ(クラスN)のみを用いて、規格化を行うためのパラメータ(平均と標準偏差)を求めました。それでは、ここで得られた値を用いて規格化を行います。

In [8]:
# クラスNの特徴量の規格化
for i in range(0, NumClassN):
    for j in range(0, NumChannel):
        for k in range(0, NumFeature):
            FV_N[i][j][k] = (FV_N[i][j][k] - Zscore[j][k][0]) / Zscore[j][k][1]

# クラスA1の特徴量の規格化
for i in range(0, NumClassA1):
    for j in range(0, NumChannel):
        for k in range(0, NumFeature):
            FV_A1[i][j][k] = (FV_A1[i][j][k] - Zscore[j][k][0]) / Zscore[j][k][1]
            
# クラスA2の特徴量の規格化
for i in range(0, NumClassA2):
    for j in range(0, NumChannel):
        for k in range(0, NumFeature):
            FV_A2[i][j][k] = (FV_A2[i][j][k] - Zscore[j][k][0]) / Zscore[j][k][1]

正常データで算出した規格化の係数を用いて、クラスN, A1, A2を規格化していくわけです。これで規格化が行われました。どのような値になったか、X軸加速度の分散値を3例ずつみてみます。

In [9]:
for i in range(0, 3):
    print("クラスN  / Xacc / 分散: ", round(FV_N[i][0][1], 2))
    print("クラスA1 / Xacc / 分散: ", round(FV_A1[i][0][1], 2))
    print("クラスA2 / Xacc / 分散: ", round(FV_A2[i][0][1], 2))
クラスN  / Xacc / 分散:  1.48
クラスA1 / Xacc / 分散:  1816.82
クラスA2 / Xacc / 分散:  245.05
クラスN  / Xacc / 分散:  1.56
クラスA1 / Xacc / 分散:  3524.81
クラスA2 / Xacc / 分散:  253.58
クラスN  / Xacc / 分散:  2.77
クラスA1 / Xacc / 分散:  391.08
クラスA2 / Xacc / 分散:  208.63

クラスNはおとなしい値ですが、クラスA1, A2はかなり大きな値になっています。異常値検出において良い特徴量とは、正常データを用いて求めた規格化係数で規格化を行ったとき、異常データは吹っ飛ぶことと等価と言えるでしょう。

In [10]:
for i in range(0, 3):
    print("クラスN  / Xacc / エネルギー: ", round(FV_N[i][0][15], 2))
    print("クラスA1 / Xacc / エネルギー: ", round(FV_A1[i][0][15], 2))
    print("クラスA2 / Xacc / エネルギー: ", round(FV_A2[i][0][15], 2))
クラスN  / Xacc / エネルギー:  0.87
クラスA1 / Xacc / エネルギー:  1498.76
クラスA2 / Xacc / エネルギー:  181.93
クラスN  / Xacc / エネルギー:  1.73
クラスA1 / Xacc / エネルギー:  2513.77
クラスA2 / Xacc / エネルギー:  225.42
クラスN  / Xacc / エネルギー:  2.23
クラスA1 / Xacc / エネルギー:  338.42
クラスA2 / Xacc / エネルギー:  174.81

エネルギーも同様ですね。それが良い特徴量であれば、正常ならば0付近の値を示し、異常値の場合、吹っ飛びます。

4. 特徴量ベクトルへの加工

続いて、特徴量ベクトルに加工します。解説はPyLearnSIG04と同じ。

In [11]:
FV_N_ML = np.zeros([NumClassN, 112]) # <- 50回の観測、112次元の特徴量(7チャンネル * 16種の特徴量)
FV_A1_ML = np.zeros([NumClassA1, 112]) # <- 50回の観測、112次元の特徴量(7チャンネル * 16種の特徴量)
FV_A2_ML = np.zeros([NumClassA2, 112]) # <- 50回の観測、112次元の特徴量(7チャンネル * 16種の特徴量)

# クラスN
for i in range(0, NumClassN):
    FV_N_ML[i, :] = np.concatenate([
                    FV_N[i][0], # X軸加速度の特徴量 16種
                    FV_N[i][1], # Y軸加速度の特徴量 16種
                    FV_N[i][2], # Z軸加速度の特徴量 16種
                    FV_N[i][3], # X軸角速度の特徴量 16種
                    FV_N[i][4], # Y軸角速度の特徴量 16種
                    FV_N[i][5], # Z軸角速度の特徴量 16種
                    FV_N[i][6], # 合成加速度の特徴量 16種
                    ], 0)
    
# クラスA1
for i in range(0, NumClassA1):
    FV_A1_ML[i, :] = np.concatenate([
                    FV_A1[i][0], # X軸加速度の特徴量 16種
                    FV_A1[i][1], # Y軸加速度の特徴量 16種
                    FV_A1[i][2], # Z軸加速度の特徴量 16種
                    FV_A1[i][3], # X軸角速度の特徴量 16種
                    FV_A1[i][4], # Y軸角速度の特徴量 16種
                    FV_A1[i][5], # Z軸角速度の特徴量 16種
                    FV_A1[i][6], # 合成加速度の特徴量 16種
                    ], 0)
    
# クラスA2
for i in range(0, NumClassA2):
    FV_A2_ML[i, :] = np.concatenate([
                    FV_A2[i][0], # X軸加速度の特徴量 16種
                    FV_A2[i][1], # Y軸加速度の特徴量 16種
                    FV_A2[i][2], # Z軸加速度の特徴量 16種
                    FV_A2[i][3], # X軸角速度の特徴量 16種
                    FV_A2[i][4], # Y軸角速度の特徴量 16種
                    FV_A2[i][5], # Z軸角速度の特徴量 16種
                    FV_A2[i][6], # 合成加速度の特徴量 16種
                    ], 0)

# 特徴量IDと種別を表示
FeatureName = ["平均", "分散", "歪度", "尖度", "最小", "最大", "中央値", "RMS", "動的", 
                           "PB1", "PB2", "PB3", "PB4", "PB5", "エントロピー", "エネルギー"]
AxisName = ["Xacc", "Yacc", "Zacc", "Xang", "Yang", "Zang", "Mag"]
i=0
for j in range(0, NumChannel):
    for k in range(0, NumFeature):
        print("特徴量ID ", i, ": ", AxisName[j], FeatureName[k])
        i = i + 1
特徴量ID  0 :  Xacc 平均
特徴量ID  1 :  Xacc 分散
特徴量ID  2 :  Xacc 歪度
特徴量ID  3 :  Xacc 尖度
特徴量ID  4 :  Xacc 最小
特徴量ID  5 :  Xacc 最大
特徴量ID  6 :  Xacc 中央値
特徴量ID  7 :  Xacc RMS
特徴量ID  8 :  Xacc 動的
特徴量ID  9 :  Xacc PB1
特徴量ID  10 :  Xacc PB2
特徴量ID  11 :  Xacc PB3
特徴量ID  12 :  Xacc PB4
特徴量ID  13 :  Xacc PB5
特徴量ID  14 :  Xacc エントロピー
特徴量ID  15 :  Xacc エネルギー
特徴量ID  16 :  Yacc 平均
特徴量ID  17 :  Yacc 分散
特徴量ID  18 :  Yacc 歪度
特徴量ID  19 :  Yacc 尖度
特徴量ID  20 :  Yacc 最小
特徴量ID  21 :  Yacc 最大
特徴量ID  22 :  Yacc 中央値
特徴量ID  23 :  Yacc RMS
特徴量ID  24 :  Yacc 動的
特徴量ID  25 :  Yacc PB1
特徴量ID  26 :  Yacc PB2
特徴量ID  27 :  Yacc PB3
特徴量ID  28 :  Yacc PB4
特徴量ID  29 :  Yacc PB5
特徴量ID  30 :  Yacc エントロピー
特徴量ID  31 :  Yacc エネルギー
特徴量ID  32 :  Zacc 平均
特徴量ID  33 :  Zacc 分散
特徴量ID  34 :  Zacc 歪度
特徴量ID  35 :  Zacc 尖度
特徴量ID  36 :  Zacc 最小
特徴量ID  37 :  Zacc 最大
特徴量ID  38 :  Zacc 中央値
特徴量ID  39 :  Zacc RMS
特徴量ID  40 :  Zacc 動的
特徴量ID  41 :  Zacc PB1
特徴量ID  42 :  Zacc PB2
特徴量ID  43 :  Zacc PB3
特徴量ID  44 :  Zacc PB4
特徴量ID  45 :  Zacc PB5
特徴量ID  46 :  Zacc エントロピー
特徴量ID  47 :  Zacc エネルギー
特徴量ID  48 :  Xang 平均
特徴量ID  49 :  Xang 分散
特徴量ID  50 :  Xang 歪度
特徴量ID  51 :  Xang 尖度
特徴量ID  52 :  Xang 最小
特徴量ID  53 :  Xang 最大
特徴量ID  54 :  Xang 中央値
特徴量ID  55 :  Xang RMS
特徴量ID  56 :  Xang 動的
特徴量ID  57 :  Xang PB1
特徴量ID  58 :  Xang PB2
特徴量ID  59 :  Xang PB3
特徴量ID  60 :  Xang PB4
特徴量ID  61 :  Xang PB5
特徴量ID  62 :  Xang エントロピー
特徴量ID  63 :  Xang エネルギー
特徴量ID  64 :  Yang 平均
特徴量ID  65 :  Yang 分散
特徴量ID  66 :  Yang 歪度
特徴量ID  67 :  Yang 尖度
特徴量ID  68 :  Yang 最小
特徴量ID  69 :  Yang 最大
特徴量ID  70 :  Yang 中央値
特徴量ID  71 :  Yang RMS
特徴量ID  72 :  Yang 動的
特徴量ID  73 :  Yang PB1
特徴量ID  74 :  Yang PB2
特徴量ID  75 :  Yang PB3
特徴量ID  76 :  Yang PB4
特徴量ID  77 :  Yang PB5
特徴量ID  78 :  Yang エントロピー
特徴量ID  79 :  Yang エネルギー
特徴量ID  80 :  Zang 平均
特徴量ID  81 :  Zang 分散
特徴量ID  82 :  Zang 歪度
特徴量ID  83 :  Zang 尖度
特徴量ID  84 :  Zang 最小
特徴量ID  85 :  Zang 最大
特徴量ID  86 :  Zang 中央値
特徴量ID  87 :  Zang RMS
特徴量ID  88 :  Zang 動的
特徴量ID  89 :  Zang PB1
特徴量ID  90 :  Zang PB2
特徴量ID  91 :  Zang PB3
特徴量ID  92 :  Zang PB4
特徴量ID  93 :  Zang PB5
特徴量ID  94 :  Zang エントロピー
特徴量ID  95 :  Zang エネルギー
特徴量ID  96 :  Mag 平均
特徴量ID  97 :  Mag 分散
特徴量ID  98 :  Mag 歪度
特徴量ID  99 :  Mag 尖度
特徴量ID  100 :  Mag 最小
特徴量ID  101 :  Mag 最大
特徴量ID  102 :  Mag 中央値
特徴量ID  103 :  Mag RMS
特徴量ID  104 :  Mag 動的
特徴量ID  105 :  Mag PB1
特徴量ID  106 :  Mag PB2
特徴量ID  107 :  Mag PB3
特徴量ID  108 :  Mag PB4
特徴量ID  109 :  Mag PB5
特徴量ID  110 :  Mag エントロピー
特徴量ID  111 :  Mag エネルギー

これで特徴量ベクトルに加工できました。FV_N_MLには、50行112列のの数字が集まっています。行が50回分の個別の観測結果、列が112種の特徴量の値となります(A1, A2も同じ)。どんな値が得られているのか、imshow関数と呼ばれるヒートマップを作成する関数で可視化してみます。

In [13]:
c=(-10, 10) # 色の最小・最大範囲

from matplotlib import pyplot as plt

plt.figure(figsize=(50, 30)) # 描画領域の横幅, 縦幅

# クラスN
plt.subplot(3,1,1)
plt.imshow(FV_N_ML, clim=c)
plt.colorbar()
plt.title('Feature vector: Class N', size=30)
plt.xlabel('Feature ID', size=30)
plt.ylabel('Observation', size=30)
plt.xticks(np.arange(0, 112, 5), size = 15) # メモリ間隔
plt.yticks(np.arange(0, 50, 5), size = 15)

# クラスA1
plt.subplot(3,1,2)
plt.imshow(FV_A1_ML, clim=c)
plt.colorbar()
plt.title('Feature vector: Class A1', size=30)
plt.xlabel('Feature ID', size=30)
plt.ylabel('Observation', size=30)
plt.xticks(np.arange(0, 112, 5), size = 15) # メモリ間隔
plt.yticks(np.arange(0, 50, 5), size = 15)

# クラスA2
plt.subplot(3,1,3)
plt.imshow(FV_A2_ML, clim=c)
plt.colorbar()
plt.title('Feature vector: Class A2', size=30)
plt.xlabel('Feature ID', size=30)
plt.ylabel('Observation', size=30)
plt.xticks(np.arange(0, 112, 5), size = 15) # メモリ間隔
plt.yticks(np.arange(0, 50, 5), size = 15)

plt.show()

上から順に、クラスN, A1, A2です。縦軸が50回分の観測、横軸が112個の特徴量となります。これを見るだけで、正常なモータ、グリスを塗らない場合、ネジをゆるめた場合で、特徴量にどのような違いが出るか、かなりたくさんの考察ができそうです。

異常データの多くの特徴量が、吹き飛ぶことを確認できます。 特徴量10と91で散布図を書いてみましょう。

In [14]:
# N, A1, A2の順に散布図を作成
plt.figure(figsize=(14, 4)) # 描画領域の横幅, 縦幅

plt.subplot(1,2,1)
ax1 = 10
ax2 = 91
plt.scatter(FV_N_ML[:,ax1], FV_N_ML[:,ax2], s=50, c='blue', alpha=0.8)
plt.scatter(FV_A1_ML[:,ax1], FV_A1_ML[:,ax2], s=50, c='red', alpha=0.8)
plt.scatter(FV_A2_ML[:,ax1], FV_A2_ML[:,ax2], s=50, c='green', alpha=0.8)

plt.title("Feature space of F" + str(ax1) + " and F" + str(ax2))
plt.xlabel("Feature " + str(ax1))
plt.ylabel("Feature " + str(ax2))
plt.grid(True) #グリッド線(True:引く、False:引かない)

plt.legend(["N", "A1", "A2"]) # 凡例
Out[14]:
<matplotlib.legend.Legend at 0x1145e9240>

正常データ(クラスN)だけ落ち着いており、異常データは吹き飛んでいることがわかります。 とは言え、クラスNとクラスA2が混ざってそうにも見えるので、左下を拡大してみます。

In [15]:
# N, A1, A2の順に散布図を作成
plt.figure(figsize=(14, 4)) # 描画領域の横幅, 縦幅

plt.subplot(1,2,1)
ax1 = 10
ax2 = 91
plt.scatter(FV_N_ML[:,ax1], FV_N_ML[:,ax2], s=50, c='blue', alpha=0.8)
plt.scatter(FV_A1_ML[:,ax1], FV_A1_ML[:,ax2], s=50, c='red', alpha=0.8)
plt.scatter(FV_A2_ML[:,ax1], FV_A2_ML[:,ax2], s=50, c='green', alpha=0.8)

plt.title("Feature space of F" + str(ax1) + " and F" + str(ax2))
plt.xlabel("Feature " + str(ax1))
plt.ylabel("Feature " + str(ax2))
plt.grid(True) #グリッド線(True:引く、False:引かない)
plt.xlim([-10, 10]) # 範囲指定
plt.ylim([-10, 10]) # 範囲指定

plt.legend(["N", "A1", "A2"]) # 凡例
Out[15]:
<matplotlib.legend.Legend at 0x114667048>

クラスNはいい感じに分離できていることを確認できました。

5. 標準偏差距離による異常値検知

標準偏差距離による異常値検知とは、平均から標準偏差いくつ分よりも大きくデータが離れていれば、異常値だと解釈する、最もオーソドックスな異常値検知手法となります。

    1. 正常データの特徴量について、平均値と標準偏差を算出する。
    1. 入力された特徴量が、平均値 ー n 標準偏差 〜 平均値 + n 標準偏差に収まっているかチェックする。
    1. 収まっていれば正常、外れていれば異常と判断する。

これを特徴量10と特徴量91の2次元空間で判定するコードを書いてみます。今回は可視化したいので2つにしていますが、3つでも4つでも、端的に言えばいくつ使っても構いません。ただし、使用する特徴量を増やしすぎると次元呪いに触れる可能性があるので注意してください。

In [16]:
# 正常データについて、特徴量10と特徴量91の平均値を算出。
ax1, ax2 = 10, 91
M1 = np.mean(FV_N_ML[:,ax1])
M2 = np.mean(FV_N_ML[:,ax2])

# 標準偏差も算出。
S1 = np.std(FV_N_ML[:,ax1])
S2 = np.std(FV_N_ML[:,ax2]) 

# nは標準偏差の何倍を基準とするか。datは特徴量が格納された2次元のリスト
# 正常ならば1、異常ならば-1を出力
def AnomalyDetection_StdDist(M1, M2, S1, S2, n, dat):
    F1 = dat[0]
    F2 = dat[1]
    
    res = 0
    if F1 >= M1 - n * S1 and F1 <= M1 + n * S1: # 特徴量1は条件を満たすか
        if F2 >= M2 - n * S2 and F2 <= M2 + n * S2: # 特徴量2は条件を満たすか
            res = 1
        else:
            res = -1
    else: 
        res = -1
        
    return res

関数ができたところで、4標準偏差以内を正常としていくつか判定してみます。まずは、クラスNの0番目。

In [17]:
F1 = FV_N_ML[0, ax1]
F2 = FV_N_ML[0, ax2]
n=4 # 異常値検出の条件
res = AnomalyDetection_StdDist(M1, M2, S1, S2, n, [F1, F2])
print(res)
1

1なので、正常と判定してくれました。続いてクラスNの5番目です。

In [18]:
F1 = FV_N_ML[5, ax1]
F2 = FV_N_ML[5, ax2]
n=4 # 異常値検出の条件
res = AnomalyDetection_StdDist(M1, M2, S1, S2, n, [F1, F2])
print(res)
1

これも正常と判定してくれました。続いて異常値を入れてみます。

In [19]:
F1 = FV_A1_ML[0, ax1]
F2 = FV_A1_ML[0, ax2]
n=4 # 異常値検出の条件
res = AnomalyDetection_StdDist(M1, M2, S1, S2, n, [F1, F2])
print(res)

F1 = FV_A2_ML[0, ax1]
F2 = FV_A2_ML[0, ax2]
n=4 # 異常値検出の条件
res = AnomalyDetection_StdDist(M1, M2, S1, S2, n, [F1, F2])
print(res)
-1
-1

クラスA1, A2を入れたところ、どちらも異常と判定されました。いい感じです。 続いて、標準偏差距離による異常値検知は、如何なる境界を有しているのか可視化してみます。

In [20]:
Xmin, Ymin, Xmax, Ymax = -10, -10, 10, 10 # 空間の最小最大値
resolution = 0.2 # 細かさ
x_mesh, y_mesh = np.meshgrid(np.arange(Xmin, Xmax, resolution),
                             np.arange(Ymin, Ymax, resolution))
MeshDat = np.array([x_mesh.ravel(), y_mesh.ravel()]).T

ViewDat = np.zeros([len(MeshDat[:,0])])
for i in range(0, len(MeshDat[:,0])):
    F1 = MeshDat[i, 0]
    F2 = MeshDat[i, 1]
    ViewDat[i]=AnomalyDetection_StdDist(M1, M2, S1, S2, n, [F1, F2])
    
z = np.reshape(ViewDat, (len(x_mesh), len(y_mesh))) # 空間のためにデータ整形

# 正常・異常領域のメッシュプロット
plt.scatter(MeshDat[ViewDat==1,0], MeshDat[ViewDat==1,1], c="blue", s=10, alpha=0.1)
plt.scatter(MeshDat[ViewDat==-1,0], MeshDat[ViewDat==-1,1], c="red", s=10, alpha=0.1)

# 観測した正常・異常データのプロット
plt.scatter(FV_N_ML[:,ax1], FV_N_ML[:,ax2], s=50, c='blue', alpha=0.8)
plt.scatter(FV_A1_ML[:,ax1], FV_A1_ML[:,ax2], s=50, c='red', alpha=0.8)
plt.scatter(FV_A2_ML[:,ax1], FV_A2_ML[:,ax2], s=50, c='green', alpha=0.8)

plt.title("Feature space of F" + str(ax1) + " and F" + str(ax2))
plt.xlabel("Feature " + str(ax1))
plt.ylabel("Feature " + str(ax2))
plt.grid(True) #グリッド線(True:引く、False:引かない)
plt.xlim([Xmin, Ymax]) # 範囲指定
plt.ylim([Ymin, Ymax]) # 範囲指定

plt.legend(["Space: N", "Space: A", "N", "A1", "A2"]) # 凡例
Out[20]:
<matplotlib.legend.Legend at 0x11478b1d0>

薄い青が正常だと判定される部分空間、薄い赤が異常だと判定される部分空間です。標準偏差距離のnを小さくすると正常領域が狭まり、大きくすると広まることを意味します。すなわちnによって、判定の厳しさを制御することが可能になります。

今回は距離を「標準偏差何倍分だけ離れているか?」という基準で異常・正常の判定を行いましたが、ユークリッド距離なども使えます。そうすると円形の識別境界を生成することも可能になります。色々試してみるといいでしょう。

6. One Class SVMによる異常値検知

さて、5章にて2つの特徴量を用いてその空間上で、標準偏差に基づく距離を使用して異常値検出を行いました。ただしあの方法ですと、長方形や円、楕円のように、きれいな図形でしか境界を生成することができません(円や楕円を生成する方法は、高校数学をちょっと復習するとわかると思います)。世の中の問題を解いているときに、そんなにきれいな境界で検出を行えない場合もあります。すなわち、きれいな境界ではなく、ぐにゃぐにゃした識別境界を生成したい、という状況もあり得ます。

これを実現する一つの方法に、One class SVM (OC-SVM)と呼ばれる方法があります。本章では、この実現方法と識別境界について説明します。

まずはじめに使用する特徴量を決定します。今回は識別境界を可視化できるようにするため、2つとしてみます。5章で述べたように、可視化したいので2つにしているだけです。実運用の場合はもっとたくさん使ってください。

In [21]:
# 使用する特徴量の決定
ax1 = 10
ax2 = 91
X = np.concatenate([FV_N_ML[:,ax1:ax1+1], FV_N_ML[:,ax2:ax2+1]], 1)

これで、特徴量10と91のみが取り出せました。復習ですが、異常データは手元にないというシーンを想定しているので、正常データのみでokです。

続いてOC-SVMの実装です。pythonの場合、OneClassSVM関数を使用します。引数の意味は以下の通り。

  • kernel: 境界を曲げるかどうか。曲げたい場合はrbf、直線にしたい場合はlinearを採用します。ただし問題の性質上、linearカーネルを採用することはほとんどないです。
  • gamma: 小さいと単純な境界、大きいと複雑な境界になります。0より大きい実数。
  • nu: あらかじめ与えた正常データについて、何割を異常とみなすか。0より大きく、1以下。ただし1にするとすべて異常扱いされてしまうので注意が必要です。

今回は勉強として、以下4つのモデルを作ってみます。

  • OCSVM_LL: gamma低い、nu低い
  • OCSVM_LH: gamma低い、nu高い
  • OCSVM_HL: gamma高い、nu低い
  • OCSVM_HH: gamma高い、nu高い
In [22]:
from sklearn.svm import OneClassSVM

# モデル構築
OCSVM_LL = OneClassSVM(kernel='rbf', gamma=0.01, nu = 0.01)
OCSVM_LH = OneClassSVM(kernel='rbf', gamma=0.01, nu = 0.3)
OCSVM_HL = OneClassSVM(kernel='rbf', gamma=0.5, nu = 0.01)
OCSVM_HH = OneClassSVM(kernel='rbf', gamma=0.5, nu = 0.3)

# 学習
# 異常値検知(教師なし学習)なので、ラベルは不要
OCSVM_LL.fit(X)
OCSVM_LH.fit(X)
OCSVM_HL.fit(X)
OCSVM_HH.fit(X)
Out[22]:
OneClassSVM(cache_size=200, coef0=0.0, degree=3, gamma=0.5, kernel='rbf',
            max_iter=-1, nu=0.3, random_state=None, shrinking=True, tol=0.001,
            verbose=False)

これで学習ができました。特徴量10と91がそれぞれ0のとき、正常・異常どう判定されるかみてみます。このために、predict関数に2つの数字を入れてばokです。返り値は以下の通りです。

  • 正常: 1
  • 異常: -1
In [23]:
# 判定(1なら正常、-1なら異常)
F1, F2 = 0, 0
res_ocsvm = OCSVM_LL.predict([[F1, F2]])
print(res_ocsvm)
[1]

OCSVM_LLを使うと、1が返ってきましたので、特徴量10と91が0ならば、正常と判定されるようです。とはいえ、どのような境界が生成されたかわからないので、可視化してみます。

In [25]:
Xmin, Ymin, Xmax, Ymax = -10, -10, 10, 10 # 空間の最小最大値
resolution = 0.2 # 細かさ
x_mesh, y_mesh = np.meshgrid(np.arange(Xmin, Xmax, resolution),
                             np.arange(Ymin, Ymax, resolution))
MeshDat = np.array([x_mesh.ravel(), y_mesh.ravel()]).T

plt.figure(figsize=(10, 10)) # 描画領域の横幅, 縦幅
plt.subplots_adjust(wspace=0.4, hspace=0.3)

# --- LL ---
plt.subplot(2,2,1)
ViewDat_LL = np.zeros([len(MeshDat[:,0])])
for i in range(0, len(MeshDat[:,0])):
    F1 = MeshDat[i, 0]
    F2 = MeshDat[i, 1]
    ViewDat_LL[i]=OCSVM_LL.predict([[F1, F2]])

# 正常・異常領域のメッシュプロット
plt.scatter(MeshDat[ViewDat_LL==1,0], MeshDat[ViewDat_LL==1,1], c="blue", s=10, alpha=0.1)
plt.scatter(MeshDat[ViewDat_LL==-1,0], MeshDat[ViewDat_LL==-1,1], c="red", s=10, alpha=0.1)

# 観測した正常・異常データのプロット
plt.scatter(FV_N_ML[:,ax1], FV_N_ML[:,ax2], s=20, c='blue', alpha=0.8)
plt.scatter(FV_A1_ML[:,ax1], FV_A1_ML[:,ax2], s=20, c='red', alpha=0.8)
plt.scatter(FV_A2_ML[:,ax1], FV_A2_ML[:,ax2], s=20, c='green', alpha=0.8)

plt.title("OC-SVM LL: F" + str(ax1) + " and F" + str(ax2))
plt.xlabel("Feature " + str(ax1))
plt.ylabel("Feature " + str(ax2))
plt.grid(True) #グリッド線(True:引く、False:引かない)
plt.xlim([Xmin, Ymax]) # 範囲指定
plt.ylim([Ymin, Ymax]) # 範囲指定

plt.legend(["Space: N", "Space: A", "N", "A1", "A2"]) # 凡例

# --- LH ---
plt.subplot(2,2,2)
ViewDat_LH = np.zeros([len(MeshDat[:,0])])
for i in range(0, len(MeshDat[:,0])):
    F1 = MeshDat[i, 0]
    F2 = MeshDat[i, 1]
    ViewDat_LH[i]=OCSVM_LH.predict([[F1, F2]])

# 正常・異常領域のメッシュプロット
plt.scatter(MeshDat[ViewDat_LH==1,0], MeshDat[ViewDat_LH==1,1], c="blue", s=10, alpha=0.1)
plt.scatter(MeshDat[ViewDat_LH==-1,0], MeshDat[ViewDat_LH==-1,1], c="red", s=10, alpha=0.1)

# 観測した正常・異常データのプロット
plt.scatter(FV_N_ML[:,ax1], FV_N_ML[:,ax2], s=20, c='blue', alpha=0.8)
plt.scatter(FV_A1_ML[:,ax1], FV_A1_ML[:,ax2], s=20, c='red', alpha=0.8)
plt.scatter(FV_A2_ML[:,ax1], FV_A2_ML[:,ax2], s=20, c='green', alpha=0.8)

plt.title("OC-SVM LH: F" + str(ax1) + " and F" + str(ax2))
plt.xlabel("Feature " + str(ax1))
plt.ylabel("Feature " + str(ax2))
plt.grid(True) #グリッド線(True:引く、False:引かない)
plt.xlim([Xmin, Ymax]) # 範囲指定
plt.ylim([Ymin, Ymax]) # 範囲指定

plt.legend(["Space: N", "Space: A", "N", "A1", "A2"]) # 凡例

# --- HL ---
plt.subplot(2,2,3)
ViewDat_HL = np.zeros([len(MeshDat[:,0])])
for i in range(0, len(MeshDat[:,0])):
    F1 = MeshDat[i, 0]
    F2 = MeshDat[i, 1]
    ViewDat_HL[i]=OCSVM_HL.predict([[F1, F2]])

# 正常・異常領域のメッシュプロット
plt.scatter(MeshDat[ViewDat_HL==1,0], MeshDat[ViewDat_HL==1,1], c="blue", s=10, alpha=0.1)
plt.scatter(MeshDat[ViewDat_HL==-1,0], MeshDat[ViewDat_HL==-1,1], c="red", s=10, alpha=0.1)

# 観測した正常・異常データのプロット
plt.scatter(FV_N_ML[:,ax1], FV_N_ML[:,ax2], s=20, c='blue', alpha=0.8)
plt.scatter(FV_A1_ML[:,ax1], FV_A1_ML[:,ax2], s=20, c='red', alpha=0.8)
plt.scatter(FV_A2_ML[:,ax1], FV_A2_ML[:,ax2], s=20, c='green', alpha=0.8)

plt.title("OC-SVM HL: F" + str(ax1) + " and F" + str(ax2))
plt.xlabel("Feature " + str(ax1))
plt.ylabel("Feature " + str(ax2))
plt.grid(True) #グリッド線(True:引く、False:引かない)
plt.xlim([Xmin, Ymax]) # 範囲指定
plt.ylim([Ymin, Ymax]) # 範囲指定

plt.legend(["Space: N", "Space: A", "N", "A1", "A2"]) # 凡例

# --- HH ---
plt.subplot(2,2,4)
ViewDat_HH = np.zeros([len(MeshDat[:,0])])
for i in range(0, len(MeshDat[:,0])):
    F1 = MeshDat[i, 0]
    F2 = MeshDat[i, 1]
    ViewDat_HH[i]=OCSVM_HH.predict([[F1, F2]])

# 正常・異常領域のメッシュプロット
plt.scatter(MeshDat[ViewDat_HH==1,0], MeshDat[ViewDat_HH==1,1], c="blue", s=10, alpha=0.1)
plt.scatter(MeshDat[ViewDat_HH==-1,0], MeshDat[ViewDat_HH==-1,1], c="red", s=10, alpha=0.1)

# 観測した正常・異常データのプロット
plt.scatter(FV_N_ML[:,ax1], FV_N_ML[:,ax2], s=20, c='blue', alpha=0.8)
plt.scatter(FV_A1_ML[:,ax1], FV_A1_ML[:,ax2], s=20, c='red', alpha=0.8)
plt.scatter(FV_A2_ML[:,ax1], FV_A2_ML[:,ax2], s=20, c='green', alpha=0.8)

plt.title("OC-SVM HH: F" + str(ax1) + " and F" + str(ax2))
plt.xlabel("Feature " + str(ax1))
plt.ylabel("Feature " + str(ax2))
plt.grid(True) #グリッド線(True:引く、False:引かない)
plt.xlim([Xmin, Ymax]) # 範囲指定
plt.ylim([Ymin, Ymax]) # 範囲指定

plt.legend(["Space: N", "Space: A", "N", "A1", "A2"]) # 凡例
Out[25]:
<matplotlib.legend.Legend at 0x117286c88>

標準偏差距離の場合とは異なり、境界がぐにゃぐにゃと曲げられていますことが確認できます。gamma, nu、どのくらいが良いモデルでしょうか。人によって違いそうですね。

ところで、もうちょっと正常領域を広く撮りたいななどと考えはしないでしょうか。少し工夫をすると、この処理を実現できます。OC-SVMは、正常であれば1を、異常であれば-1を出力する性質がありますが、計算の過程で、一度実数値を求めてそれが0以上であれば1(正常)を、0未満であれば-1(異常)を出力するという構造をしています。

したがって、この実数値を0以上0未満で区切るのではなく、-0.5を基準とするなどのように、基準を下げてあげることで正常領域を広くすることができます。

これを、OCSVM_LLに実装してみたいと思います。1/-1の判定結果と判定前の実数値を取り出すためのメソッドはそれぞれ以下の通りです。

  • 判定結果: OCSVM_LL.predict([[F1, F2]])
  • 実数取得: OCSVM_LL.decision_function([[F1, F2]])

すなわち、decision_functionで得られた値に対し、基準としたい値でif文を書き、1/-1を返してあげれば良いということになります。これを利用して新たな判定関数を作ってみます。

In [26]:
# 特徴量1, 特徴量2, 判定基準, OC-SVMのモデルを入れると、1/-1を返す関数
def new_predict_OCSVM(F1, F2, th, model):
    val = model.decision_function([[F1, F2]])
    if val >= th:
        return 1
    else:
        return -1

# 使用する場合
F1, F2 = 0, 0
th = -0.1
new_predict_OCSVM(0, 0, 0, OCSVM_LL)
Out[26]:
1

-0.1以上を正常、それ未満を異常にするという、通常よりもゆるい判定を行うOC-SVMを構築しました。これできちんと正常領域が広がっているのか、確認してみます。

In [32]:
ViewDat_LL = np.zeros([len(MeshDat[:,0])])
for i in range(0, len(MeshDat[:,0])):
    F1 = MeshDat[i, 0]
    F2 = MeshDat[i, 1]
    # ViewDat_LL[i]=OCSVM_LL.predict([[F1, F2]]) # 普通のSVM
    ViewDat_LL[i] = new_predict_OCSVM(F1, F2, -0.1, OCSVM_LL) # ゆるいSVM

# 正常・異常領域のメッシュプロット
plt.scatter(MeshDat[ViewDat_LL==1,0], MeshDat[ViewDat_LL==1,1], c="blue", s=10, alpha=0.1)
plt.scatter(MeshDat[ViewDat_LL==-1,0], MeshDat[ViewDat_LL==-1,1], c="red", s=10, alpha=0.1)

# 観測した正常・異常データのプロット
plt.scatter(FV_N_ML[:,ax1], FV_N_ML[:,ax2], s=20, c='blue', alpha=0.8)
plt.scatter(FV_A1_ML[:,ax1], FV_A1_ML[:,ax2], s=20, c='red', alpha=0.8)
plt.scatter(FV_A2_ML[:,ax1], FV_A2_ML[:,ax2], s=20, c='green', alpha=0.8)

plt.title("OC-SVM LL: F" + str(ax1) + " and F" + str(ax2))
plt.xlabel("Feature " + str(ax1))
plt.ylabel("Feature " + str(ax2))
plt.grid(True) #グリッド線(True:引く、False:引かない)
plt.xlim([Xmin, Ymax]) # 範囲指定
plt.ylim([Ymin, Ymax]) # 範囲指定

plt.legend(["Space: N", "Space: A", "N", "A1", "A2"]) # 凡例
Out[32]:
<matplotlib.legend.Legend at 0x114f49198>

正常領域が、通常のOCSVM_LLよりも広がりました。性能を高めるため、こういった工夫も必要であることを念頭におくといいですね。正常領域が狭すぎると、正常を異常だとみなしてしまう場合が起こり得ます。そのため状況によっては正常領域を広げたくなります。ただし、正常領域を広めるということは、異常を正常と答える可能性が高まってしまうことに注意してください。トレードオフの関係ですので、試行錯誤的に正常領域の面積を操作することも必要でしょう。

7. ディープラーニングによる次元圧縮を利用した異常値検知

さて、これまで任意に取り出した特徴量を用いて、正常・異常を判定するモデルを構築してきました。距離を用いたモデルは、きれいな図形状の識別境界を生成できて、OC-SVMであればぐにゃぐにゃと曲げた識別境界を生成できることがわかりました。

ところで、一見して良い特徴量を発見できない場合はどうすれば良いでしょうか。この場合、すべての特徴量を用い、それを次元圧縮することで、正常と異常が分離された特徴量を取得するというアプローチがあります。

これを実現するには、入力されたデータを再現するディープラーニング、オートエンコーダ (Auto Encorder; AE) を活用します。距離やOC-SVMによる異常値検知は、すでに獲得されている空間に対して識別境界を生成するモデルであることに対し、オートエンコーダは空間を獲得する方法であるので、混同しないように注意してください。

オートエンコーダを理解するには、まずはじめに回帰型ニューラルネットワークについて理解せねばいけません。こちらのページの一番下にこの解説があるので、まずはこれを理解してください。

理解すると、回帰型ニューラルネットワークが実数にて定義づけられた値を推定するモデルであることがわかると思います。オートエンコーダとは、回帰型ニューラルネットワークの出力に対し、入力の推定値を割り当てたモデルとなります。すなわち、ある特徴量ベクトルを入力すると、その特徴量ベクトルとまったく同じ値を出力することを目標としています。

と聞いても、既に明らかになっている入力を推定しても何も嬉しくありません。ここで、3層階層型ニューラルネットワークの構造について考えてみます。

  • 入力層: n1次元
  • 中間層: n2次元
  • 出力層: n1次元(入力を再現するため、入力層と同じ次元になる)

ここで、入力ベクトルがn1次元だとすると、一度n2次元に変換されてから、n1次元に戻っていることがわかります。もしもここでn2次元をn1次元よりも小さい値に設定したならば、次元圧縮が行われていることになります。

オートエンコーダの出力は、入力を推定するわけですから、次元圧縮された中間層のベクトルは、入力の情報をできる限り保持したまま、次元だけが下がっていることになります。すなわち、オートエンコーダとは、入力の数字を再現できるような次元圧縮がなされていることになります。

これはとても便利です。例えば、データの転送速度が遅い状況を考えてみます。100個の数字を送りたい場合、100このまま送っていると、時間がたくさんかかってしまいます。しかし、転送側にオートエンコーダの入力層から中間層への変換、受信側にオートエンコーダの中間層から出力層への変換を実装すると、どうでしょうか。この場合、100個の入力をデータを転送する前に10個に圧縮するといったことができますので、転送速度が高速になります。100個を10個にしているので当然情報損失の懸念がありますが、受信側に入力を再現するような出力を行える変換があるわけですから、データ受信側で10個の数字を100個に戻せるわけです。

以上がオートエンコーダの利点ですが、実はこれを異常値検知やクラス分類問題など、様々なモデルに応用できることが明らかになっています。以前説明した通り、機械学習で良いモデルを作るには、入力次元をできるだけ下げた方が良いという考えがあります。次元の呪いに触れることから莫大なデータ数が必要になったり、高次元だと中身がブラックボックスになりがちになるためです。

特に教師なし学習による異常値検知問題の場合、正常データのみしか使えないわけですから、正常データを入力すると、正常データを出力するようなオートエンコーダが出来上がります。ここで、次元圧縮が行われた中間層に注目してみます。圧縮された空間上では、オートエンコーダ自身がよく知っている入力を与えると、ある一部分にデータが集まり、よく知っているデータから外れたデータ(つまり、異常データ)が入力されると、圧縮された空間において、正常データが集まる場所とは異なる部分にデータが集まりやすい、という特性が表出します。

つまり、オートエンコーダの中間層を取り出して得られる空間上では、非常に異常値検知が行いやすいことを意味します。 オートエンコーダにより次元圧縮された空間にて、OC-SVMなどで識別境界を生成すると、精度の良い異常値検出モデルができるのではないか?ということに着想するわけです。

具体的な構築の流れとして、以下を例にとってプログラムを組んでみます。

異常値検知モデルの構築:

  • 112次元の特徴量ベクトルを入力すると、それがそのまま推定された112次元の出力を行うオートエンコーダを構築する。
  • 中間層の次元は可視化するために、3次元とする。
  • すなわち、入力層112次元、中間層3次元、出力層112次元の回帰型ニューラルネットワーク(=オートエンコーダ)の構築を意味する。
  • この構築は、正常データのみで行う(異常データは集められない前提なので)。
  • オートエンコーダができた後は、112次元の正常データをそれに入力し、中間層の値を取得する。
  • つまり、112次元を3次元に圧縮した空間を獲得する。
  • この空間上で、正常データのみを使って、OC-SVMを構築する。

  • 112次元の特徴量ベクトルをオートエンコーダに入力することで、次元圧縮を行う。

  • 圧縮された空間で、OC-SVMや距離に基づく識別境界を生成する。

異常値検知モデルの使用:

  • 112次元の入力ベクトルをオートエンコーダに入力する。
  • 中間層3次元のベクトルを獲得する。
  • それをOC-SVMに代入し、正常・異常の判定結果を得る。

まずはじめに、回帰型ニューラルネットワーク(オートエンコーダ)を定義します。 活性化関数として、恒等写像を利用します。ReLu関数やシグモイド関数でもokですが、ReLu関数は最小値0、シグモイド関数は最小0最大1というように、値の範囲が定まってしまうので、その点も考慮してください。

In [33]:
from sklearn.neural_network import MLPRegressor
Hidden_neuron = 3 # 中間層の次元(何次元に圧縮したいか?)
AE = MLPRegressor(activation="identity", max_iter=1000, tol = pow(10, -20), 
                  hidden_layer_sizes=(Hidden_neuron,), 
                  learning_rate_init=0.01, random_state=1, verbose=False)

続いて、このAEを学習してみます。入力とまったく同じ出力を目指すわけですから、第一引数と第二引数が同じになります。また、正常データしか使ってはいけないので、正常データのみを指定します。

In [34]:
AE.fit(FV_N_ML, FV_N_ML) # 正常データのみを用いて、自分自身を再現するような学習
Out[34]:
MLPRegressor(activation='identity', alpha=0.0001, batch_size='auto', beta_1=0.9,
             beta_2=0.999, early_stopping=False, epsilon=1e-08,
             hidden_layer_sizes=(3,), learning_rate='constant',
             learning_rate_init=0.01, max_iter=1000, momentum=0.9,
             n_iter_no_change=10, nesterovs_momentum=True, power_t=0.5,
             random_state=1, shuffle=True, solver='adam', tol=1e-20,
             validation_fraction=0.1, verbose=False, warm_start=False)

AEが入力を再現するような出力をできているのか、チェックしてみます。 0回目の正常データの観測における、112次元の特徴量ベクトルと、AEの出力を見てみます。

In [36]:
# ちゃんと再現できているかチェック
resnn = AE.predict(FV_N_ML)
plt.plot(resnn[0,0:112], label = "Output of AE")
plt.plot(FV_N_ML[0,0:112], label = "Input")
plt.legend()
plt.xlabel("Elements of feature vector")
Out[36]:
<matplotlib.legend.Legend at 0x1150fbfd0>

完全に一致しているわけではありませんが、ある程度似た値を出力していますね。AEの出力は、112次元を3次元に落とし、112次元に復元しているのに、元々の112次元とある程度一致するというのは、なかなかすごいことだったりします。普通はここまで急激な次元圧縮は行いません。今回は可視化したいので3次元まで落としているだけです。もうちょっと再現したいと思う場合は、圧縮先を10次元くらいにするなどしてください。(Hidden_neuron=10とする)

さて、AEが出来上がったところで、次元圧縮されたベクトル、つまり中間層の値を取り出す関数を作ります。GetHiddenOutputLayer関数は、入力ベクトル, 隠れ層のウェイト, 隠れ層のバイアス, 出力層のウェイト, 出力層のバイアスを入力すると、中間層のベクトルと出力層のベクトルを返してくれます。

学習ずみのオートエンコーダのパラメータを得るには、.coefs、.interceptsメソッドを利用します。この意味は、ニューラルネットワークを手計算できるくらいに理解できなければわからないことですので、ニューラルネットの数式を読みたくない人は読み飛ばしてください。

In [40]:
# --- AEに入力ベクトルを入れると、中間層・隠れ層のベクトルを取り出す関数 ---
# 入力:
#   入力ベクトル, 隠れ層のウェイト, 隠れ層のバイアス, 出力層のウェイト, 出力層のバイアス
# 出力: 
#   隠れ層のベクトル, 出力層のベクトル
def GetHiddenOutputLayer(InputVal, AEcoef1, AEInter1, AEcoef2, AEInter2):
    
    # 隠れ層取得
    Hidden = np.dot(InputVal, AEcoef1) + AEInter1
    
    # relu関数の場合は以下が必要
    #for i in range(0, len(Hidden)):
    #   if Hidden[i] < 0:
    #        Hidden[i]=0

    # 出力層取得
    Out = np.dot(Hidden, AEcoef2) + AEInter2
    
    return [Hidden, Out]


# 学習済みAEのパラメータ取得
AEcoef1 = AE.coefs_[0]
AEInter1 = AE.intercepts_[0]
AEcoef2 = AE.coefs_[1]
AEInter2 = AE.intercepts_[1]

# 関数の使用
InputVal = FV_N_ML[0,:]
[Hidden, Out] = GetHiddenOutputLayer(InputVal, AEcoef1, AEInter1, AEcoef2, AEInter2)

# 検証
# python関数の出力
# resnn = AE.predict([FV_N_ML[0,:]])
# plt.scatter(Out, resnn)

AEにより次元圧縮された3次元ベクトルを得る関数を作成できた後は、正常・異常データをそれぞれプロットしてみます。3次元に圧縮しているわけですから、可視化できます。

In [42]:
# 正常データ
Hidden_N = []
Hidden_N_np = np.zeros([len(FV_N_ML[:,0]), Hidden_neuron])
for i in range(0, len(FV_N_ML[:,0])):
    InputVal = FV_N_ML[i,:]
    temp = GetHiddenOutputLayer(InputVal, AEcoef1, AEInter1, AEcoef2, AEInter2)[0] #only hidden
    Hidden_N.append(temp)
    for j in range(0, Hidden_neuron):
        Hidden_N_np[i,j] = Hidden_N[i][j]
        
# 異常1データ
Hidden_A1 = []
Hidden_A1_np = np.zeros([len(FV_A1_ML[:,0]), Hidden_neuron])
for i in range(0, len(FV_A1_ML[:,0])):
    InputVal = FV_A1_ML[i,:]
    temp = GetHiddenOutputLayer(InputVal, AEcoef1, AEInter1, AEcoef2, AEInter2)[0] #only hidden
    Hidden_A1.append(temp)
    for j in range(0, Hidden_neuron):
        Hidden_A1_np[i,j] = Hidden_A1[i][j]
        
# 異常2データ
Hidden_A2 = []
Hidden_A2_np = np.zeros([len(FV_A2_ML[:,0]), Hidden_neuron])
for i in range(0, len(FV_A2_ML[:,0])):
    InputVal = FV_A2_ML[i,:]
    temp = GetHiddenOutputLayer(InputVal, AEcoef1, AEInter1, AEcoef2, AEInter2)[0] #only hidden
    Hidden_A2.append(temp)
    for j in range(0, Hidden_neuron):
        Hidden_A2_np[i,j] = Hidden_A2[i][j]

続いて、可視化します。

In [66]:
from mpl_toolkits.mplot3d import Axes3D

fig = plt.figure(figsize=(14, 7))
plt.subplots_adjust(wspace=0.0, hspace=0.1)

# 全部見る
ax = fig.add_subplot(1,2,1, projection='3d')
ax.scatter(Hidden_N_np[:,0], Hidden_N_np[:,1], Hidden_N_np[:,2], label="Class N") # 正常
ax.scatter(Hidden_A1_np[:,0], Hidden_A1_np[:,1], Hidden_A1_np[:,2], label="Class A1") # 異常1
ax.scatter(Hidden_A2_np[:,0], Hidden_A2_np[:,1], Hidden_A2_np[:,2], label="Class A2") # 異常2

ax.set_xlabel("AE Feature 1")
ax.set_ylabel("AE Feature 2")
ax.set_zlabel("AE Feature 3")
ax.set_title("Feature space of AE: all view")
plt.legend()

# 正常箇所の拡大図
ax = fig.add_subplot(1,2,2, projection='3d')
ax.scatter(Hidden_N_np[:,0], Hidden_N_np[:,1], Hidden_N_np[:,2], label="Class N") # 正常
ax.scatter(Hidden_A1_np[:,0], Hidden_A1_np[:,1], Hidden_A1_np[:,2], label="Class A1") # 異常1
ax.scatter(Hidden_A2_np[:,0], Hidden_A2_np[:,1], Hidden_A2_np[:,2], label="Class A2") # 異常2
ax.set_xlabel("AE Feature 1")
ax.set_ylabel("AE Feature 2")
ax.set_zlabel("AE Feature 3")
ax.set_title("Feature space of AE: Zooming")
ax.set_xlim([0, 500])
ax.set_ylim([-50, 50])
ax.set_zlim([-20, 20])
plt.legend()

plt.show()

左が3クラスのプロット、右が正常箇所を拡大したプロットです。AEにより次元圧縮された空間は、当初の説明通り、正常データは一箇所に集まるが、正常とは離れたデータだと、離れた箇所に現れることがわかりました。

この空間ならば、容易に異常値検出を行えそうです。それでは、最後の仕上げにAEにより圧縮された空間でOC-SVMによる判定器を作ってみます。

In [67]:
OCSVM_AE = OneClassSVM(kernel='rbf', gamma=0.001, nu = 0.001)
OCSVM_AE.fit(Hidden_N_np) # 異常値検知(教師なし学習)なので、ラベルは不要
Out[67]:
OneClassSVM(cache_size=200, coef0=0.0, degree=3, gamma=0.001, kernel='rbf',
            max_iter=-1, nu=0.001, random_state=None, shrinking=True, tol=0.001,
            verbose=False)

モデルができたところで、手元にあるデータを判定させてみます。

In [69]:
res_N = OCSVM_AE.predict(Hidden_N_np)
res_A1 = OCSVM_AE.predict(Hidden_A1_np)
res_A2 = OCSVM_AE.predict(Hidden_A2_np)

print("正常データの判定結果: ")
print(res_N)
print("")
print("異常1データの判定結果: ")
print(res_A1)
print("")
print("異常2データの判定結果: ")
print(res_A2)
正常データの判定結果: 
[ 1  1  1  1 -1  1  1 -1  1  1  1  1  1 -1  1  1  1  1 -1  1  1  1  1  1
  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1  1
  1  1]

異常1データの判定結果: 
[-1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1
 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1
 -1 -1]

異常2データの判定結果: 
[-1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1
 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1 -1
 -1 -1]

1が正常で、-1が異常でしたね。正常データだけを使って、正常と異常を切り分けるモデルを作ることができました。しかも、概ね良い判定結果が得られています。オートエンコーダを利用すると少しややこしくなりますが、目で見て自分で良さそうな特徴量を選ばなくても、自発的に良い特徴量を獲得することができる(場合が多い)です。ここら辺が、ディープラーニング系のメリットですね。