何の話かというと
TensorFlow Tutorialの最初に登場する「MNIST For ML Beginners」では、次の方針で手書き文字の分類器を作成しています。(今の段階では、下記が何を言ってるのか分からなくても大丈夫です。)
・28x28ピクセルの手書き文字画像を各ピクセルの濃度を並べた784次元ベクトルと見なす。
・784次元ベクトル空間を10箇所に分類する線形の多項分類器を用意する。
・多項分類器の出力値をsoftmax関数に入れて、784次元空間の各点について、「0」〜「9」のそれぞれの文字である確率を定義する。
・上記の定義の下で、トレーニングセットが得られる確率を最大にするよう、線形多項分類器のパラメーターを調整する。
これが一体何を言ってるのか・・・ということを数学的に理解していただくことが目標です。今回は、下準備として、より単純化したデータで上記と同じ処理を実装してみます。
平面データの分類問題
上記の例では、「784次元空間を10個に分類する」ということをやっているわけですが、これをぐっと単純化して、「2次元平面を2つに分類する」ということを考えます。これは、「機械学習理論入門」の「第3章 最尤推定法:確率を用いた推定理論」で解説しているものと同じ問題です。
- 作者: 中井悦司
- 出版社/メーカー: 技術評論社
- 発売日: 2015/10/17
- メディア: 単行本(ソフトカバー)
- この商品を含むブログを見る
先に結果を示すと、下図のようになります。
2次元平面に◯と×の2種類のデータがちらばっており、これらをできるだけうまく分類する直線を見つけ出します。青色は学習に使用するデータで赤色は学習結果をテストするためのデータです。この例では、(あくまで偶然ですが)テストデータに対して100%の正解率を達成しています。
これをTensorFlowで実装したものが下記になります。
logistic_regression.py
from __future__ import absolute_import from __future__ import division from __future__ import print_function import tensorflow as tf import numpy as np from numpy.random import rand, multivariate_normal import matplotlib.pyplot as plt # dataset class Dataset(): def __init__(self): variance = 40 n1, n2 = 60, 40 mu1, mu2 = [13,10], [0,-3] cov1 = np.array([[variance,0],[0,variance]]) cov2 = np.array([[variance,0],[0,variance]]) data1 = multivariate_normal(mu1,cov1,n1) data2 = multivariate_normal(mu2,cov2,n2) data = np.r_[np.c_[data1,np.ones(n1)], np.c_[data2,np.zeros(n2)]] np.random.shuffle(data) self.test_data, self.test_label = np.hsplit(data[0:20],[2]) self.train_data, self.train_label = np.hsplit(data[20:],[2]) self.index = 0 def next_batch(self, n): if self.index + n > len(self.train_data): self.index = 0 data = self.train_data[self.index:self.index+n] label = self.train_label[self.index:self.index+n] self.index += n return data, label def plot_result(dataset, weight, bias, mult): fig = plt.figure() subplot = fig.add_subplot(1,1,1) data0_x, data0_y, data1_x, data1_y = [], [], [], [] for i in range(len(dataset.train_data)): if dataset.train_label[i][0] == 0: data0_x.append(dataset.train_data[i][0]) data0_y.append(dataset.train_data[i][1]) else: data1_x.append(dataset.train_data[i][0]) data1_y.append(dataset.train_data[i][1]) subplot.scatter(data0_x, data0_y, marker='x', color='blue') subplot.scatter(data1_x, data1_y, marker='o', color='blue') data0_x, data0_y, data1_x, data1_y = [], [], [], [] for i in range(len(dataset.test_data)): if dataset.test_label[i][0] == 0: data0_x.append(dataset.test_data[i][0]) data0_y.append(dataset.test_data[i][1]) else: data1_x.append(dataset.test_data[i][0]) data1_y.append(dataset.test_data[i][1]) subplot.scatter(data0_x, data0_y, marker='x', color='red') subplot.scatter(data1_x, data1_y, marker='o', color='red') xs, ys = np.hsplit(dataset.train_data,[1]) wx, wy, b = weight[0][0], weight[1][0], bias[0] linex = np.arange(xs.min()-5, xs.max()+5) liney = - linex * wx/wy - b*mult/wy subplot.plot(linex, liney, color='red') plt.show() # Main if __name__ == '__main__': dataset = Dataset() sess = tf.InteractiveSession() writer = tf.train.SummaryWriter('/tmp/logistic_logs', sess.graph_def) mult = dataset.train_data.flatten().mean() # Create the model x = tf.placeholder(tf.float32, [None, 2]) w = tf.Variable(tf.zeros([2, 1])) b = tf.Variable(tf.zeros([1])) y = tf.sigmoid(tf.matmul(x, w) + b*mult) # Define loss and optimizer y_ = tf.placeholder(tf.float32, [None, 1]) log_probability = tf.reduce_sum(y_*tf.log(y) + (1-y_)*tf.log(1-y)) train_step = tf.train.GradientDescentOptimizer(0.001).minimize(-log_probability) correct_prediction = tf.equal(tf.sign(y-0.5), tf.sign(y_-0.5)) accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) # Logging data for TensorBoard _ = tf.histogram_summary('weight', w) _ = tf.histogram_summary('bias', b) _ = tf.histogram_summary('probability of training data', y) _ = tf.scalar_summary('log_probability', log_probability) _ = tf.scalar_summary('accuracy', accuracy) # Train tf.initialize_all_variables().run() for i in range(240): batch_xs, batch_ys = dataset.next_batch(10) feed = {x: batch_xs, y_: batch_ys} sess.run(train_step, feed_dict=feed) feed = {x: dataset.test_data, y_: dataset.test_label} summary_str, lp, acc = sess.run( [tf.merge_all_summaries(), log_probability, accuracy], feed_dict=feed) writer.add_summary(summary_str, i) print('LogProbability and Accuracy at step %s: %s, %s' % (i, lp, acc)) plot_result(dataset, w.eval(), b.eval(), mult)
少し長いコードですが、本質的には、「# Main」以下の部分を見ればOKです。その他の部分は、テストデータの生成とグラフの描画を行うサブルーチンになっています。
なお、TensorFlowの実行環境のセットアップは下記を参考にしてください。本記事では、Fedoraにvirtualenvで導入しています。
数学的背景
それでは、上記のコードの背景にある数学を解説します。まず、平面を直線で分割するわけですので、直線を表す式を用意します。
ここでは、平面上の座標を としています。 が分割線を表しており、 の符号で平面上の2つの領域が区別されます。分割線から遠くなるほど、 の絶対値が大きくなることに注意してください。
そこで下図のように、 の値を「データが◯である確率 」に変換することを考えます。
これは、図の下側にあるように、「0から1になめらかに増加する関数 」に を代入することで実現されます。このような形状の関数をシグモイド関数と呼びます。ここまでの準備が整うと、「トレーニング用のデータが得られる確率 」の計算が可能になります。
まず、番目のデータを とします。はデータの種類を表す変数で、◯は 、×は とします。この時、この特定のデータが得られる確率は、次になります。( と で場合分けして考えてください。)
したがって、トレーニングセット全体が得られる確率は、次になります。
ここで、次のように置いています。
―― (1)
あとは、最尤推定法の考え方に従って、この確率が最大になるように直線のパラメーター を調整すればそれが答えになります。計算上は、この対数をとった対数尤度関数を最大化するようなアルゴリズムを適用します。
―― (2)
TensorFlowには、このような関数を最大化するパラメーターを見つけ出すアルゴリズムが最初から内蔵されているので、アルゴリズムそのものを自分で記述する必要はありません。(1)(2)の定義だけ与えて、あとの計算はTensorFlowにまかせてしまいます。先ほどのコードの中では、まさにこれらの定義を行っています。
コードの解説
まず、(1)を定義します。TensorFlowでは、多次元配列(Tensor)を用いて計算するのが基本となるので、行列形式で計算を行ないます。具体的には次のコードになります。
# Create the model x = tf.placeholder(tf.float32, [None, 2]) w = tf.Variable(tf.zeros([2, 1])) b = tf.Variable(tf.zeros([1])) y = tf.sigmoid(tf.matmul(x, w) + b*mult)
まず、x は、トレーニングセットに含まれる を縦にならべた N×2 行列です。トレーニングセットのデータはこの後で注入するので、ここではデータの入れ物となるplaceholderクラスのインスタンスとしています。データ数はまだ分からないので縦方向のデータ数は None を指定しています。
w は、係数 を縦に並べた 2×1 行列です。変数を扱うVariableクラスのインスタンスです。同様に、b は、定数項に対応する 1×1 行列です。あくまで行列として扱う点に注意してください。
最後に y は、(1)の定義に対応します。matmul()は行列の掛け算です。bをmult倍しているのは、アルゴリズムを高速化するテクニックなので、ここでは気にしないでください。tf.matmul(x, w)の結果は、N×1 行列で、b は 1×1 行列なので、これらを足すのは(数学的には)変な操作ですが、これは、NumPyのブロードキャストルールと同じで、N×1 行列の各要素に b の値が足されます。sigmoidは、前述のシグモイド関数を返すメソッドです。このように定義された y は、「placeholderである x に M 個のデータを入れると、Mx1 行列の値が得られる関数」になっています。
続いて、(2)を定義します。
# Define loss and optimizer y_ = tf.placeholder(tf.float32, [None, 1]) log_probability = tf.reduce_sum(y_*tf.log(y) + (1-y_)*tf.log(1-y))
y_ は、トレーニングセットのラベル()の値を縦にならべた N×1 行列です。データはこれから注入するので、placeholderクラスのインスタンスです。そして、log_probabilityが、(2)に対応します。reduce_sum()は、行列の全ての要素を足し合わせる関数で、 の操作に対応します。
そして、(2)を最大化するアルゴリズムを呼び出すために、具体的なアルゴリズムの指定を行ないます。
train_step = tf.train.GradientDescentOptimizer(0.001).minimize(-log_probability) correct_prediction = tf.equal(tf.sign(y-0.5), tf.sign(y_-0.5)) accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
train_stepがアルゴリズムの指定になります。ここでは、勾配降下法を用いています。これには、最大化ではなくて、最小化するメソッド minimize() が用意されているので、-log_probability を最小化するという指定を行っています。
その後のcorrect_predictionとaccuracyは、テストセットに対する正解率を計算するための関数です。「◯である確率 y」が0.5より大きいかどうかで◯×の予測を行うという前提で、予測結果と実際のラベル y_(1または0)の比較をした結果(Boolean)を並べた行列が correct_prediction で、その平均を取って、正解率(accuracy)を計算しています。
そして、実際にアルゴリズムを実行するのが下記の部分です。
# Train tf.initialize_all_variables().run() for i in range(240): batch_xs, batch_ys = dataset.next_batch(10) feed = {x: batch_xs, y_: batch_ys} sess.run(train_step, feed_dict=feed)
変数を初期化した後、240回のループを回しています。1回のループでは、トレーニングセットのデータを10個取り出して、それを先に指定したアルゴリズム train_step と共に、セッションインスタンス sess に放り込んでいます。このインスタンスがアルゴリズムに含まれる変数(Variableインスタンス)を見つけ出して、その値を修正していきます。この例では、トレーニングセットのデータは80個ありますが、dataset.next_batch()は、すべてのデータを取り出したらまた最初から同じデータを返すように作ってあります。同じデータを何度も舐めながらパラメーターの修正を繰り返して、-log_probability が最小となるところを探っているというわけです。
同じループの中にある下記の部分は、TensorBoard(学習状況をWeb GUIで表示するツール)で利用するデータを出力しています。
feed = {x: dataset.test_data, y_: dataset.test_label} summary_str, lp, acc = sess.run( [tf.merge_all_summaries(), log_probability, accuracy], feed_dict=feed) writer.add_summary(summary_str, i) print('LogProbability and Accuracy at step %s: %s, %s' % (i, lp, acc))
その時点でのトレーニングセットに対する対数尤度(log_probability)とテストセットに対する正解率(accuracy)をデータとして出力します。TensorBoardに表示するデータの種類は、事前に下記の部分で登録されています。
# Logging data for TensorBoard _ = tf.histogram_summary('weight', w) _ = tf.histogram_summary('bias', b) _ = tf.histogram_summary('probability of training data', y) _ = tf.scalar_summary('log_probability', log_probability) _ = tf.scalar_summary('accuracy', accuracy)
データの出力先は、冒頭の下記の部分でディレクトリー「/tmp/logistic_logs」を指定しています。
writer = tf.train.SummaryWriter('/tmp/logistic_logs', sess.graph_def)
このディレクトリーを指定してTensorBoardを起動すると、次のような学習状況が確認できます。
python ~/tensorflow/lib/python2.7/site-packages/tensorflow/tensorboard/tensorboard.py --logdir=/tmp/logistic_logs/
まとめ
TensorFlowを利用して、二次元平面のデータを分類することができました。これを画像のピクセルデータを並べた784次元空間に拡張すれば、画像データの分類が可能になるというわけです。次回は、実際にこの拡張を行ってみます。
次回のエントリーはこちらです。