Fire Engine

化学の修士号→消防士→ITエンジニア(2016年11月〜)

Pythonで学ぶUnixプロセスの基礎

 先日、自身が主催するPyFukuokaというイベントにて「Pythonで学ぶUnixプロセスの基礎」というタイトルの発表をしました。スライドはこちらです。

speakerdeck.com

 内容としては「なるほどUnixプロセス」という本を参考にしています。本書は、Unixの基礎から、ファイルディスクリプタの話や、fork(2)を使って子プロセスを生成する方法など、非常に内容が濃く、超オススメです。本の中ではRubyを使って解説されていますが、私の発表では、この本の一部をPythonを使ってまとめました。
 以下、スライドの内容に沿って書いていきます。

動作環境

プロセスにはIDがある

プロセスはプロセスIDという値を持っています。プロセスIDはプロセスの中身とは関連づいていない単に連番になった数値のラベルです。
それではPythonインタプリタを起動して、プロセスに振られたIDを確認してみます。
(補足:Pythonでは「os」という標準ライブラリを利用することで、OS関連の機能を利用することができます。 )

$ python
>>> import os
>>> os.getpid()
1229

「1229」という数値が返ってきました。ターミナルの別ウィンドウでpsコマンドをたたくと、

$ ps -p 1229
  PID TTY           TIME CMD
 1229 ttys000    0:00.06 python

確かにPIDが1229でPythonのプロセス立ち上がっていることがわかります。(PIDはプロセスIDのこと)

プロセスには親がいる

すべてのプロセスには親となるプロセス(親プロセス)がいます。たいていの場合、親プロセスはそのプロセスを起動したプロセスとなります。
Pythonインタプリタを起動して、親プロセスのIDを確認してみます。

$ python
>>> import os
>>> os.getppid()
1165

親プロセスを確認する関数はgetppidで、pが一つ増えていることに注意してください。(parentのp)
Pythonプロセスの親はそれを起動しているbashプロセスになります。

プロセスは子プロセスを作れる

fork(2)システムコールを使うと、実行中のプロセスから新しいプロセスを生成できます。システムコールは、OSの機能を呼び出すための仕組みのことです。
以下、fork(2)システムコールの特徴を挙げます。

  • fork(2)を呼ぶ側のプロセスは「親プロセス」、生成されるプロセスは「子プロセス」となる。
  • 子プロセスは、親プロセスで使われているすべてのメモリのコピーと、親プロセスが開いているファイルディスクリプタを引き継ぐ。
  • 子プロセスがコピーしたメモリは、親プロセス側に影響を与えることなく自由に変更ができる。

それでは、fork(2)を使って、プロセスを生成してみましょう。Pythonからfork(2)を呼ぶためにもosライブラリを利用します。

import os

if os.fork():
    print("Here in the if block.")
else:
    print("Here in the else block.")

上記のプログラムに「fork.py」というファイル名をつけて実行すると、

$ python fork.py
Here in the if block.
Here in the else block.

if句の中もelse句の中もprint文が実行されており、すごく不思議な現象です。
なにが起きてるのか追ってみましょう。

forkの挙動を追ってみる

forkの挙動を追うためにif句・else句それぞれで、以下の2点を調べます。

  1. プロセスIDはどうなっているか
  2. fork関数の返値はどうなっているか
import os

print("---- before fork ----")
print("pid: {0}".format(os.getpid()))
ret = os.fork()

if ret:
    print("----- if block ------")
    print("pid: {0}".format(os.getpid()))
    print("return value: {0}".format(ret))
else:
    print("---- else block -----")
    print("pid: {0}".format(os.getpid()))
    print("return value: {0}".format(ret))

上記のプログラムに「fork_detail.py」というファイル名をつけて実行すると、下のような結果になります。

$ python fork_detail.py
---- before fork ----
pid: 1103
----- if block ------
pid: 1103
return value: 1104
---- else block -----
pid: 1104
return value: 0

まず、プロセスIDについて見てみると、forkする前(プロセスを生成する前)とif句の中が同じプロセスIDで、else句の中はif句に1足したプロセスIDを持っていることがわかります。これは、if句は親プロセス(プロセスID:1103)によって実行されており、else句はfork関数によって生成された子プロセス(プロセスID:1104)によって実行されていることを意味しています。
次に、fork関数の返値を見てみると、親プロセス側には「生成した子プロセスのID」を返し、子プロセス側には「0」を返しています。Pythonの条件式では、正の整数値はTrue、0はFalseと評価されるため、この返値によって、親プロセスはif句の中、子プロセスはfalse句の中と実行内容を分けて記述することができます。

プロセスは通信できる

複数のプロセス間でデータをやりとりするための仕組みのことをプロセス間通信(IPC: Inter Process Communication)と呼びます。 IPCには様々な手法がありますが、今回は比較的単純な「パイプ」を使った方法を紹介します。
パイプは親子関係にあるプロセス間の単方向の通信を実現します。
まず、パイプの概略図を示します。(参考リンク

f:id:hirotsuru314:20171224144448p:plain

forkによって生成された子プロセスは、親プロセスが開いているファイルディスクリプタを引き継ぎます。ファイルディスクリプタとは、OSがカーネルのレイヤーで用意している抽象化の仕組みです。Unixの世界ではファイルシステム上のファイルだけでなく、デバイス、ソケット、パイプなどもファイルとして扱われるため、ファイルディスクリプタが割り振られます。
プロセスがパイプを作ったあとでforkを呼び出した場合、生成された子プロセスは同じパイプを読み書きできるため、親子間でデータを渡すことができます。

import os
reader, writer = os.pipe()
if os.fork():
    os.close(reader)
    write_pipe = os.fdopen(writer, 'w')
    write_pipe.write('Hello child!')
    write_pipe.close()
else:
    os.close(writer)
    read_pipe = os.fdopen(reader, 'r')
    message= read_pipe.readline()
    read_pipe.close()
    print(message)

上記のプログラムに「pipe.py」というファイル名をつけて実行すると、

$ python pipe.py
Hello child!

親から子にメッセージを渡せていることがわかります。

まとめ

プロセスは、IDがあって、親がいて、子を作れて、通信ができます。そして、その様子をPythonから確認できます。今回紹介した内容は、特にPythonである必要はありません。しかし、Pythonなどの高級言語を使って、OS周りの理解を深めることは、学び方の一つのアプローチとして有効だと私は考えます。
なるほどUnixプロセス」という本には今回の内容以外にも様々なプロセスの特徴について解説しています。 下記がその例です。

  • 孤児プロセス
  • ゾンビプロセス
  • デーモンプロセス
  • ソケット通信

この辺りに興味がある方はぜひ本を読まれることをオススメします!

消防士からエンジニアに転職して1年が経った

 消防士として働いていた私がゼロからプログラミングを始めて、ITエンジニアに転職してから1年が経ちました。今回は、1年間エンジニアとして働いた今の率直な思いと、1年の振り返りをしていこうと思います。

転職してどうだったか

 私はエンジニアという仕事は自分に合っているし、転職して本当によかったと思っています。しかしそれは、すべての人に「ITエンジニアっていいよ!」って勧められるというわけではなく、当たり前のことですが、合う・合わないがあると思います。
 私が思うエンジニアの仕事が合う人の特徴は「学ぶことが好きだ」ということに尽きると思っています。正直、エンジニアとして1年間働いて、未経験でもそこそこいけるなっという感覚の方が強かったです。それは、目の前の業務にだけ集中し、業務で必要な技術だけを追えばなんとか仕事はこなせるようになるからです。しかし、そのような場合は大抵、業務で使うフレームワークやライブラリの使い方の暗記ゲームをしているだけで、自分自身にはそんなに力がついてない状態だと思います。(私も最初の半年は完全にこの状態でした。。)
 私は目指すべきゴールを、所属している組織で活躍することではなく、エンジニアとしての市場価値を高めるところに置くようにしました。技術が高く、市場価値が高い状態がエンジニアとしての安定なんだと思います。

エンジニアとして大切にしていること

 私がエンジニアとして働く上でテーマに掲げているのが「継続的なインプットとアウトプット」です。出社前や退社後に毎日勉強し、定期的にイベントへの登壇やブログへのアウトプットを心がけています。インプットとアウトプットはどちらも大切ですが、両者のバランスに悩むことはしばしばあります。その両者のバランスについて、ある方から「アウトプットするのはインプットが溢れ出たとき」という話を聞きました。なるほど、確かにインプットして新しいことを覚えると、人に話したくなりますもんね。私はこれからはとにかくインプットしまくって、それが溢れ出るように、開発・登壇・ブログなどへアウトプットできるようにしたいと思います。意識の上で重きを置くのはアウトプットではなくインプットの量です。

1年間でやったことのまとめ

ライブラリ開発

Banpeiという異常検知ライブラリを作りました。特異スペクトル変換という手法を用いて時系列データの変化点検知が行えるのが特徴です。

github.com

f:id:hirotsuru314:20171115011620p:plain

<デモ動画>


特異スペクトル変換による傾きの変化点検知

<関連記事>
異常検知ライブラリを作ってみた - Fire Engine
特異スペクトル変換法による時系列データの異常検知(Python) - Fire Engine

イベント登壇

Date Event Slide
2017/11/18 九州 Data Scientist MeetUp 2017 異常検知ライブラリを作った話
2017/10/26 PyFukuoka #2 Pythonによるパッケージ開発〜異常検知パッケージをつくってみた〜  
2017/09/20 プログラマのための数学勉強会@福岡 #6 異常検知の基礎と実践 〜正規分布による異常検知〜
2017/09/01 PyFukuoka #1 Pythonによる可視化まわりの話
2017/06/17 第3回データサイエンスLT&勉強会 in LINE福岡! 時系列データ分析とPython〜カルマンフィルタによる状態推定〜
2017/01/30 Oisix機械学習勉強会 トピックモデルでテキストをクラスタリングしてみた
2017/01/15 第2回データサイエンスLT&勉強会 ニューラルネットワークでニュース記事を自動分類してみた

コミュニティ活動

PyFukuokaというコミュニティを立ち上げ、福岡でPythonを盛り上げるために活動中です。定期的にLTイベントを開催し、私も登壇してます!

f:id:hirotsuru314:20171126223507p:plain

正直どれをとっても納得する部分はないですが、こうやって過去の活動をまとめるのは自分の現在の状況を顧みるいい機会になりそうです。

これからやりたいこと

 私はこれまでの1年間は主に機械学習統計学といったデータサイエンスの分野に取り組んできました。その中で感じたことは、データサイエンスは課題解決の強力なツールの一つだということです。私はデータサイエンスに主軸を置くのではなく、興味のある分野の仕事をしながら、そこで出てきた課題をデータサイエンスの力で解決することがおもしろいと思っています。
 現在一番興味がある分野が、これまで全くといっていいほど触れてこなかったインフラです。アプリケーション側よりもそれらを支えるインフラ側に興味がシフトしてきました。また、ITインフラの現場においても、サーバのCPUやメモリ等の各種リソースの使用量、ネットワークトラフィック、I/O要求といった時系列データが時々刻々と蓄積されており、データサイエンスの応用の幅は広いと思います。中でも、システム監視の自動化に興味があります。(Banpeiを使って挑戦してみたい)データサイエンス抜きに考えても、ネットワークやOSまわりにはかなり興味が惹かれています。将来的にはインフラとデータサイエンスの二つの知識・技術をもったエンジニアになりたいと考え始めました。

さいごに

 本当の天才達からすると、多くの人間は地頭の差なんて誤差の範囲であり、一番大事なのはマインドの部分だと思います。さいごに、未熟者の私のマインドを常に押し上げてくれた方々の記事を紹介させていただきます。どの記事も本当に素晴らしく俄然モチベーションがあがります。

異常検知ライブラリを作ってみた

今回の記事は、前職消防士でゼロからプログラミングを始めた超未熟者の私が、異常検知ライブラリを作った話です。

なぜ作ったか

マインド的背景

消防士を辞めてエンジニアに転職してから1年、いろんな技術に触れました。TensorFlow、scikit-learn、Dockerなどなど、必死になって使い方を覚えました。しかしだんだん、「これ、コマンド覚えてるだけで自分に何も技術身についてなくない?」という疑問や焦りが湧いてきて、自分は便利なツールを使いこなせるようになりたいんじゃなくて、いつの日かみんなが使って便利なツールを作る側になりたいんだ、ということに気づきました。そのような思いから今回初めてライブラリを作り、公開してみました。

データサイエンス的背景

 世の中は時系列データで溢れています。ビジネスの場において、データの何かしらの変化の兆候を捉えて、いち早く意思決定をしたいという場面がよくあります。例えば、観測データが商品の受注データであれば、急に売れなくなった(もしくは売れ出した)変化をいち早く検知し、次の一手を打てると機会損失を防げるかもしれません。私も実際に業務で時系列データの変化を捉えたい場面がありましたが、変化の兆候というのはデータによって様々であり、明らかなものもあれば、そうでないものもありました。様々なタイプの変化に対して柔軟に検知するためには、閾値やルール設定といった力技では難しく、コンピュータに「いつもと違う状態」を自動に検知させることが求められています。これがいわゆる異常検知(ここでは特に変化点検知)の技術です。

作ったもの

ソースコード

Banpei(番兵)
github.com

使い方

インストールは下記のコマンドで行います。(Pythonパッケージ化はしていますが、PyPIには登録していないのでgit cloneしたのちにpip installする必要があります。)

git clone https://github.com/tsurubee/banpei.git
cd banpei
pip install .

一番簡単な使い方は、異常を検知したいデータをPythonのリスト型やNumPy arrayなどの配列にして、detect関数に渡してやると、データの変化度を計算して配列を返します。
下記のコードのように3行のみで異常検知できます。まず、Banpeiをimportし、SSTという特異スペクトル変換法(異常検知アルゴリズムの一つ)を使うためのインスタンスを作成し、detect関数にインプットデータを渡すだけです。インスタンス作成時の引数wはスライド窓の幅であり、チューニングが必要なパラメータです。(詳しくは以前の記事を参照ください)

import banpei
model   = banpei.SST(w=50)
results = model.detect(data)

resultsには変化度が入っており、インプットであるdataと、アウトプットであるresultsを可視化すると、
(data: 緑、results: 赤)※見やすいように縦軸は調整しています。

f:id:hirotsuru314:20171115011620p:plain

このようにデータの兆候が変化する点で、変化度である赤線が立ち上がっているのが見て取れます。この赤の立ち上がりをみることで変化点検知ができそうです。

リアルタイム異常監視への挑戦

もともと私がBanpeiをつくったときは、リアルタイム監視の環境を作りたいというモチベーションがありました。(売り上げのリアルタイム監視やサーバリソースのリアルタイム監視などなど)そこで今回、ブラウザを通してリアルタイム異常監視ができる簡単な仕組みを構築しました。
可視化のツールとしてBokehというPython製の可視化ライブラリを採用しました。簡単なデモ動画を載せておきます。動画の10秒あたりで変化点検知をしています。


特異スペクトル変換による傾きの変化点検知

このようなリアルタイム監視環境を構築したければ、pip install bokehでBokehをインストールして、下のようなスクリプトで可視化できます。(私の環境ではbokeh==0.12.10)

import banpei
import numpy as np
import pandas as pd
from bokeh.io import curdoc
from bokeh.models import ColumnDataSource, DatetimeTickFormatter
from bokeh.plotting import figure
from datetime import datetime
from math import radians
from pytz import timezone

data = []
results = []

def get_new_data():
    global data
    #ここでデータ取得(例:new_data = psutil.cpu_percent())
    data.append(new_data)

def update_data():
    global results
    get_new_data()
    ret = model.stream_detect(data)
    results.append(ret)
    now = datetime.now(tz=timezone("Asia/Tokyo"))
    new_data = dict(x=[now], y=[data[-1]], ret=[results[-1]])
    source.stream(new_data, rollover=500)

# Create Data Source
source = ColumnDataSource(dict(x=[], y=[], ret=[]))
# Create Banpei instance
model = banpei.SST(w=30)
# Draw a graph
fig = figure(x_axis_type="datetime",
             x_axis_label="Datetime",
             plot_width=950,
             plot_height=650)
fig.title.text = "Realtime monitoring with Banpei"
fig.line(source=source, x='x', y='y', line_width=2, alpha=.85, color='blue', legend='observed data')
fig.line(source=source, x='x', y='ret', line_width=2, alpha=.85, color='red', legend='change-point score')
fig.circle(source=source, x='x', y='y', line_width=2, line_color='blue', color='blue')
fig.legend.location = "top_left"
# Configuration of the axis
format = "%Y-%m-%d-%H-%M-%S"
fig.xaxis.formatter = DatetimeTickFormatter(
    seconds=[format],
    minsec =[format],
    minutes=[format],
    hourmin=[format],
    hours  =[format],
    days   =[format],
    months =[format],
    years  =[format]
)
fig.xaxis.major_label_orientation=radians(90)
# Configuration of the callback
curdoc().add_root(fig)
curdoc().add_periodic_callback(update_data, 1000) #ms

get_new_data関数の部分をカスタマイズして、独自のデータ取得の処理を書けば、bokeh serve file名.pylocalhostが立ち上がりブラウザ上で上記の動画のようなものが見れます。Banpei側ではstream_detectという関数を使っていますが、これは渡されたインプットデータのうち、最新の1点の変化度を計算して、数値を返す関数です。

Banpeiの異常検知アルゴリズム

 Banpeiには異常検知アルゴリズムとして特異スペクトル変換を採用しています。概要は以前記事に書きました。

www.hirotsuru.com

特異スペクトル変換の特徴としては、特定の確率分布を仮定していないため、①多様な変化に頑強に対応できる、②パラメータチューニングが容易、などが挙げられます。パラメータに関しては、スライド窓のサイズwの実質1個のみのチューニングで済みます。(他の必要なパラメータはwから経験的に算出)特異スペクトル変換は愚直に線形計算しているだけ、というところがすごく素敵です。
一方、内部の計算過程で特異分解と呼ばれる行列計算を用いており、これがかなり計算コストが高いです。したがって、検知を行いたい時間間隔や、パラメータwの最適値によっては、リアルタイム計算が厳しくなるかもしれません。
今後、特異スペクトル変換の高速化のために、クリロフ部分空間を用いた下の論文を実装したいと思ってます。

http://epubs.siam.org/doi/pdf/10.1137/1.9781611972771.54

さいごに

 作った感想としては、やっぱりモノ作り楽しいな、です。一旦、画面上で動くものまで作れたのですごく達成感はありました。ここからどのように実際の現場で使えるように持っていくかは真剣に考えています。例えば、前述のアルゴリズムの高速化でよりリアルタイム性を高めたり、メールやSlackに異常通知を送れる機能を実装したりと・・まだまだやりたいことが山積みなので、着実に開発を進めていきたいと思っています。

参考資料

異常検知って?

時系列データって?

ライブラリ(パッケージ)開発するには?

Bokehって?

特異スペクトル変換って?

Bokehを用いたビットコイン価格のリアルタイム可視化(Python)

 今回はBokehというライブラリを使って、ビットコイン価格のリアルタイム可視化を行う方法について書いていきます。Bokehを使うと、Pythonオンリーで可視化までできるので、非常に便利です。

Bokehとは

 BokehとはPython製の対話的可視化ライブラリです。対話的とはどういうことかというと、作成したグラフのズームアップ・ズームアウト・軸の調整といった様々な操作を手元で簡単に行えるということです。

f:id:hirotsuru314:20171111151354p:plain

これは、Bokeh内部で、HTML・CSSJavaScriptを自動で生成する仕組みがあることで達成されており、ユーザは基本的にはPythonコードしか書く必要がありません。
Bokehの公式のギャラリーを見るとすごくファンシーな可視化がたくさんあるので見るだけでも楽しいです!(実際に対話的な操作もできます)

Gallery — Bokeh 0.12.10 documentation

以前の記事に対話的なグラフの作成方法を書きましたので、併せてどうぞ

Bokehの特徴

 Bokehの特徴をいくつか挙げると、

  • 対話的な可視化ができる
  • ストリーミングデータのリアルタイム可視化ができる
  • グラフを作るためにHTML・CSSJavaScriptを書かなくてもよい

といった感じです。
今回は『ストリーミングデータのリアルタイム可視化ができる』に着目した記事になります。
対話的というだけでもすごいのですが、動的に流れてくるデータに対しても、Bokehにデータを流すことでリアルタイム可視化ができ、これが本当に便利です!

用いるデータについて

 ストリーミングデータの例として『ビットコイン価格』を取り上げて見ることにします。(私自身はシステムトレードなどは全くやっていません。笑)
 下記リンクの中のLast Tradeという部分を取得します。この部分は、数秒ごとにリロードすると値が変わるので、定期的にスクレイピングすることで、時系列データを得ることができます。

Bitcoincharts | bitFlyer - JPY Summary

f:id:hirotsuru314:20171111151600p:plain

ビットコイン価格のリアルタイム可視化

開発環境
Python 3.6.3
bokeh==0.12.10
beautifulsoup4==4.6.0
requests==2.13.0

作成プログラム

from bokeh.io import curdoc
from bokeh.models import ColumnDataSource, DatetimeTickFormatter
from bokeh.plotting import figure
import requests
from bs4 import BeautifulSoup
from datetime import datetime
from math import radians
from pytz import timezone

URL = "https://bitcoincharts.com/markets/"
MARKET="bitflyerJPY"

def get_data():
    res = requests.get(URL + MARKET + ".html", headers={"User-Agent": "Mozilla/5.0"})
    content = res.content
    soup = BeautifulSoup(content, "html.parser")
    last_trade = int(soup.find_all("p")[0].span.text)
    return last_trade

def update_data():
    now = datetime.now(tz=timezone("Asia/Tokyo"))
    new_data = dict(x=[now], y=[get_data()])
    source.stream(new_data, rollover=200)

#データソースの作成
source = ColumnDataSource(dict(x=[], y=[]))

#グラフの作成
fig = figure(x_axis_type="datetime",
             x_axis_label="Datetime",
             y_axis_label="Last Trade",
             plot_width=800,
             plot_height=600)
fig.title.text = "Bitcoin Charts"
fig.line(source=source, x="x", y="y", line_width=2, alpha=.85, color="blue")
fig.circle(source=source, x="x", y="y", line_width=2, color="blue")

#軸の設定
format = "%Y-%m-%d-%H-%M-%S"
fig.xaxis.formatter = DatetimeTickFormatter(
    seconds=[format],
    minsec =[format],
    minutes=[format],
    hourmin=[format],
    hours  =[format],
    days   =[format],
    months =[format],
    years  =[format]
)
fig.xaxis.major_label_orientation=radians(90)

#コールバックの設定
curdoc().add_root(fig)
curdoc().add_periodic_callback(update_data, 3000) #ms単位

上記プログラムにbitcoin_streaming.pyというファイル名をつけ、コマンドラインから下のように実行すると、私の場合

$ bokeh serve bitcoin_streaming.py
2017-11-11 12:24:32,513 Starting Bokeh server version 0.12.2
2017-11-11 12:24:32,520 Starting Bokeh server on port 5006 with applications at paths ['/bitcoin_streaming']
2017-11-11 12:24:32,520 Starting Bokeh server with process id: 1995

のように表示され、http://localhost:5006/bitcoin_streaming にアクセスすると、下のようにグラフが表示されます。

f:id:hirotsuru314:20171111151837p:plain

実際はリアルタイムで動くグラフなので、動画にしてみました。


Bokehを用いたビットコイン価格のリアルタイム可視化

ちょっとわかりづらいですが、3秒に1回スクレイピングビットコイン価格を取得して、リアルタイムに可視化しています。

プログラムについての補足

Beautiful Soupドキュメント — BeautifulSoup Document 3.0 ドキュメント

  • ColumnarDataSource
    BokehにはColumnarDataSourceというデータ構造があります。ColumnarDataSourceは 列指向でデータを保持します。
    リアルタイム可視化の場合には、このデータ構造にstream関数を使って、データを流し込むとBokehの方でデータを保持から可視化までやってくれます。
    ちなみに
def update_data():
    now = datetime.now(tz=timezone("Asia/Tokyo"))
    new_data = dict(x=[now], y=[get_data()])
    source.stream(new_data, rollover=200)

ここのstream関数の引数であるrolloverは、保持するデータ数の上限を決めるもので、その上限がそのまま可視化にも反映されるので、重要です。

  • コールバック
    作成したプログラムの最後の行の
curdoc().add_periodic_callback(update_data, 3000)

部分で、update_data関数を3秒(3000ms)に1回呼び出す設定を書いていて、update_data関数が実行されると、ColumnarDataSourceのインスタンスに新しいデータが追加され、可視化される仕組みになっています。

参考

おわりに

 上記のソースコードはデータ取得のget_data関数の部分だけ変えるといろんなリアルタイム可視化ができます。例えば、CPU使用率を監視しようと思うと、こんな感じに入れ替えればできます

import psutil
def get_new_data():
    return psutil.cpu_percent()

 いろいろ楽しいことができそうですね!

ちなみに・・・

 私は最近個人で『Banpei』という異常検知ライブラリを作っており、Bokehと連携させ、リアルタイム異常監視などができるように開発を進めています。リアルタイム異常監視については近日中にまた記事にします!
 異常検知ライブラリBanpeiはGitHubで公開しているので見ていただけると嬉しいです!

github.com

Pythonによる「べき乗法(Power Method)」の実装 - 最大特異値・特異ベクトルを求める

はじめに

 理工学の分野において、行列の固有値固有ベクトルを求める問題に直面することが多々あります。一般に、固有値固有ベクトル固有値分解という行列分解で求められますが、これは正方行列に対してのみ定義されています。特異値分解は、固有値分解を長方形行列に拡張したものであり、より適用範囲が広く、画像圧縮や自然言語処理における潜在意味解析などにも応用されています。特異値分解は比較的計算コストが高いため、例えば絶対値最大の特異値やその特異ベクトルを求めたいだけの場合は、他の手法を使った方が計算効率が高いです。
 今回はべき乗法(Power method)と呼ばれる数値計算アルゴリズムPythonで実装し、行列の最大特異値および特異ベクトルを求める方法について書いていきます。

Pythonによる実装

 べき乗法の理論については、下記リンクの資料(P5〜)を参考にしました。

http://www.cs.yale.edu/homes/el327/datamining2013aFiles/07_singular_value_decomposition.pdf

import numpy as np

def power_method(A, iter_num=1):
    """
    Calculate the first singular vector/value of a target matrix based on the power method.
    Parameters
    ----------
    A : numpy array
        Target matrix
    iter_num : int
               Number of iterations

    Returns
    -------
    u : numpy array
        first left singular vector of A
    s : float
        first singular value of A
    v : numpy array
        first right singular vector of A
    """
    # set initial vector q
    q = np.random.normal(size=A.shape[1])
    q = q / np.linalg.norm(q)

    for i in range(iter_num):
        q = np.dot(np.dot(A.T, A), q)

    v = q / np.linalg.norm(q)
    Av = np.dot(A, v)
    s = np.linalg.norm(Av)
    u = Av / s

    return u, s, v


特異値分解との比較

 特異値分解の結果と一致しているか確かめます。Python特異値分解を行うにはNumPyのlinalg.svgを使うとよいでしょう。
以下のように4×3行列を作成します。

A = np.array([[1,2,3],[4,5,6],[7,8,9],[10,11,12]])

power_method関数を使うと、

u, s, v = power_method(A, iter_num=1)

u
#-> array([-0.14091245, -0.34396479, -0.54701712, -0.75006945])
s
#-> 25.462398134435947
v
#-> array([-0.50388184, -0.57446659, -0.64505134])

NumPyで特異値分解を行うと

U, S, V = np.linalg.svd(A, full_matrices=False)

U
#-> array([[-0.14087668,  0.82471435,  0.54771037],
#         [-0.34394629,  0.42626394, -0.72755711],
#         [-0.54701591,  0.02781353, -0.1880169 ],
#         [-0.75008553, -0.37063688,  0.36786364]])
S
#-> array([  2.54624074e+01,   1.29066168e+00,   1.97614008e-15])
V
#-> array([[-0.50453315, -0.5745157 , -0.64449826],
#         [-0.76077568, -0.05714052,  0.64649464],
#         [-0.40824829,  0.81649658, -0.40824829]])

両者を比較すると、左特異ベクトル、特異値、右特異ベクトルがほぼ一致していることがわかります。

特異スペクトル変換法による時系列データの異常検知(Python)

はじめに

 今回は、特異スペクトル変換法というアルゴリズムPythonで実装します。このアルゴリズムは時系列データの異常検知に対して非常に強い力を発揮します。また、ハイパーパラメータ(人が調整する必要のあるパラメータ)が少なく、比較的チューニングが容易であることも特徴の一つです。数学の理論については深追いはせず、アルゴリズムの概要と実装まで書いていきたいと思います。

【目次】

時系列データについて

 時系列データとは、時間の推移ととともに観測されるデータのことです。昨今、様々な企業がデータ活用を推進していますが、世の中の実務の現場に貯まっていく多くのデータは時系列のデータです。
 データ分析において、時系列データの取り扱いは、他のデータ(クロスセクションデータなどと言われる)に比べて、理論的にも実銭的にも難しいです。例えば、分析をする上で、ヒストグラムを描くということをよくやると思いますが、ヒストグラムを描くということは、暗にデータの相関を無視し、互いに独立であることを仮定しています。 時系列データでは、隣り合う時刻の観測値同士には明らかに相関関係があり、「各観測値が互いに独立である」という仮定は成り立ちません。このようなデータをきちんと取り扱えることは非常に有用です。

【参考】 実務の現場に多い時系列データ分析の際に注意しておきたい点を列挙してみる - 六本木で働くデータサイエンティストのブログ

時系列データの異常と変化点検知 

 一口に異常と言ってもいろいろありますが、明らかに他の大多数のデータから外れた値を持っている場合は非常に検知が容易です。このような異常を検知することは外れ値検知と呼ばれ、正規分布を用いた単純な外れ値検知の話を以前記事に書きました。

www.hirotsuru.com

一方、時系列データの異常パターンの特徴としては、各観測データの時刻(横軸)を適当にシャッフルすると検知できなくなるということです。実際に例を見てみましょう。

f:id:hirotsuru314:20171011215303p:plain

 データの真ん中あたりで波が激しくなって、また戻っているのがわかると思います。変化点という観点で言うと、波の周波数が激しくなっている点(横軸が30のあたり)と、またゆるやかに戻っている点(横軸が60のあたり)が変化点です。このように人間の目でデータを見ると、どこが異常かだいたいわかりますが、コンピュータにこの異常を検知させるのはけっこう難しいです。これを自動で検知してやろうというのが今回の目的です。今回は、1変数の時系列データにおける異常検知の話です。

特異スペクトル変換法の概要

 時系列データの異常検知の手法はいろいろありますが、今回は特異スペクトル変換というアルゴリズムを実装してみます。特異スペクトル変換法の概略は下の図に集約されます。

f:id:hirotsuru314:20171011214314p:plain

(出典:上の図は井手剛氏の著書「入門 機械学習による異常検知―Rによる実践ガイド」のP200 図7.4を元に作成しました。)

履歴行列とテスト行列

 一次元時系列データの分析をする際に、ある任意の幅 wを設定して、 w個の隣接した観測データをまとめて w次元のベクトルとして取り出すということをよくやります。さらに wを時間の経過とともにずらしながら取り出すことで、時系列片をつくっていきます。この取り出す領域のことをスライド窓と呼び、スライド窓により取り出したベクトルを部分時系列と呼びます。
 上の概略図からわかるように、特異スペクトル変換法ではこの部分時系列をさらにまとめて行列としてデータを取り出します。この行列自体の特徴が、ある時刻の時系列データの特徴となるわけです。ある時刻 tのまわりデータを用いて生成した行列をテスト行列と呼び、時刻 tより以前のデータを用いて生成した行列を履歴行列と呼びます。特異スペクトル変換法では、この二つの行列の食い違いの大きさを変化度として異常検知を行います。

特異値分解

 特異スペクトル変換法の理論の理解には線形代数の知識がある程度必要なので、場合によっては「特異スペクトル変換法は現在と過去の行列を取り出して、その食い違いの大きさをもとに変化度を計算しているのだな」くらいの理解でよいかもしれません。(後述のPythonによる実装まで読み飛ばしていただいてかまいません。)
 特異値分解は、固有値分解を長方形行列に拡張したものと見なすことができます。詳細な説明については下記の参考リンクがわかりやすいと思います。固有値分解ってなに?という方にはプログラミングのための線形代数という本が超絶おすすめです。

特異値分解の定義,性質,具体例 | 高校数学の美しい物語

http://www.misojiro.t.u-tokyo.ac.jp/~murota/lect-kisosuri/singularvalue050129.pdf

 数式を追うことも大事ですが、今回やっていることをざっと書くと、前述の履歴行列とテスト行列に特異値分解を行い、特異値上位の左特異ベクトルを取り出し行列としてまとめてます。(行列のランクより少ない数を取り出す) これはマイナーな方向を無視し、ノイズ除去または次元削除をしたことに相当すると考えることができます。また、取り出したそれぞれの列ベクトルで張られる空間を主部分空間などと呼びます。
 後に出てくるコードを見ればわかりますが、いかにも難しそうな特異値分解もNumPyの力を借りれば、関数を1個呼び出すだけで計算結果が返ってきます。

numpy.linalg.svd — NumPy v1.13 Manual

変化度の定義

 時刻 tにおける過去と現在の行列の主部分空間から、両者の食い違いを定量化することで、時刻 tの変化度を定義します。これは部分空間同士の距離を求める問題になるのですが、空間同士の距離を求めるというのはすごく理論的にも難しいです。今回は、行列2ノルムが行列の最大特異値に等しいということを利用して、異常度を算出しています。
 まとめると、特異スペクトル変換法は、まずスライド窓を使って、現在と過去のデータを行列として取り出し、特異値分解に基づき行列の特徴パターンを求め、そこから両者の変化度を算出するアルゴリズムです。

Pythonによる実装

 作りたいものは、入力として1次元データのリスト(Pythonの組み込みリストやNumPyのarray)を渡すと、出力として特異スペクトル変換法に基づき変化度に変換した1次元データリストを返すというものです。
※今回処理の流れを掴みやすくするため、あえて2つの関数に詰め込んで実装しています。

import numpy as np

def extract_matrix(data, start, end, w):
    row = w
    column = end - start + 1
    matrix = np.empty((row, column))
    i = 0
    for t in range(start, end+1):
        matrix[:, i] = data[t-1:t-1+row]
        i += 1

    return matrix

def sst(data, w, m=2, k=None, L=None):
    """
    Parameters
    ----------
    data : array_like
           Input array or object that can be converted to an array.
    w    : int
           Window size
    m    : int
           Number of basis vectors
    k    : int
           Number of columns for the trajectory and test matrices
    L    : int
           Lag time

    Returns
    -------
    Numpy array contains the degree of change.
    """
    # Set variables
    if not isinstance(data, np.ndarray):
        data = np.array(data)
    if k is None:
        k = w // 2
    if L is None:
        L = k // 2
    T = len(data)

    # Calculation range
    start_cal = k + w
    end_cal = T - L + 1

    # Calculate the degree of change
    change_scores = np.zeros(len(data))
    for t in range(start_cal, end_cal + 1):
        # Trajectory matrix
        start_tra = t - w - k + 1
        end_tra = t - w
        tra_matrix = extract_matrix(data, start_tra, end_tra, w)

        # Test matrix
        start_test = start_tra + L
        end_test = end_tra + L
        test_matrix = extract_matrix(data, start_test, end_test, w)

        # Singular value decomposition(SVD)
        U_tra, _, _  = np.linalg.svd(tra_matrix, full_matrices=False)
        U_test, _, _ = np.linalg.svd(test_matrix, full_matrices=False)
        U_tra_m  = U_tra[:, :m]
        U_test_m = U_test[:, :m]
        s = np.linalg.svd(np.dot(U_tra_m.T, U_test_m), full_matrices=False, compute_uv=False)
        change_scores[t] = 1 - s[0]

    return change_scores

実際にこのコードを使って、先に挙げた周波数異常のデータを変化度に変換し、描画した結果が下記になります。(縦軸は見やすいようにスケールを調整しています。)

change_scores = sst(data, 50)

f:id:hirotsuru314:20171011214524p:plain

変化度を示している赤のラインを見ると周波数変化が起こっている2点で、きわめて明瞭にピークをもつことがわかります。このように、波の周波数が激しくなっている点も、ゆるやかに戻っている点のどちらも検知できています。若干遅れて検知しているのは、パラメータの調整で改善できる部分でもありますが、ある程度限界もあります。
 今回の例は周波数の異常パターンでしたが、同アルゴリズムを使って、データの傾きの変化点なども検知することができます。

特異スペクトル変換法の課題

 特異スペクトル変換法の課題のひとつとして、計算コストが高いことが挙げられます。一連の処理の中でも特に特異値分解の部分の計算量がネックになります。計算が重く時間を要すると言うことは、データのリアルタイム処理には使えないということであり、時系列解析の幅を制限する要因にもなり得ます。
 この課題を解決する方法の一つとして、クリロフ部分空間法という手法と併用することで計算の高速化することができるようです。また、そもそもリアルタイム処理には別のアルゴリズムを使うという選択肢もあるでしょう。例えば、ChangeFinderは逐次的な計算アルゴリズムになっているため、リアルタイム処理には向いています。(一方、ハイパーパラメータの闇は深そうなので、要は適材適所を見極める必要があるということですね。)

おわりに

 現在、今回記事に書いた特異スペクトル変換法による異常検知も実装した『Banpei』という異常検知パッケージを絶賛開発中です。
  github.com

 今後、様々なアルゴリズムを実装したり、ストリーミングデータの取り扱いができるように機能拡張したり、などいろいろ計画中です。
 私自身エンジニア歴がまだ1年にも満たない未熟者なので、書き方がおかしいところなどあればぜひご指導をよろしくお願いいたしますorz

プログラマのための数学勉強会@福岡 #6に登壇しました

先日「プログラマのための数学勉強会@福岡 #6」に登壇しました。

maths4pg-fuk.connpass.com

こちらがそのときの発表スライドになります。

speakerdeck.com

内容は、正規分布を使って1次元データの異常検知をする話で、理論・実装の詳細は下記の記事に書きました。

www.hirotsuru.com

このイベントは、@tkengoさんが主催されており、@tkengoさんは最近機械学習の数学の本も出版されたとてもすごい方です!

私のような末端エンジニアが@tkengoさんから登壇のお誘いをいただき大変恐縮でしたが、発表準備の過程で、私自身がかなり勉強させていただきました。また、発表のおかげで、今回のテーマである「異常検知」についてかなり興味を持ち、継続して勉強中なので、そのあたりはまたアウトプットできればと思います!