めもめも

このブログに記載の内容は個人の見解であり、必ずしも所属組織の立場、戦略、意見を代表するものではありません。

データサイエンスに関する初心者向けの参考書

筆者が実際に読んだ(書いた)書籍の中で初心者向けのものを紹介しています。

※英語の書籍については、日本語版は読んでいないので、翻訳のクオリティなどは未確認です。

データサイエンスとは?

「そもそもデータサイエンスって何?」という事を理解するのに役立つ書籍です。

日本語版はこちらになります。

機械学習アルゴリズム

データサイエンスに必要なツールの1つに「機械学習」があります。上記の本にも機械学習アルゴリズムのふわっとした解説がありますが、もう少し、かっちりと理解したい方は下記をお勧めします。(手前味噌です。すいません。)

下記の本は、アルゴリズムに加えて、機械学習ライブラリ(scikit-learn)の使い方を学ぶことができます。

日本語版はこちらになります。

scikit-learnによる機械学習処理

scikit-learnを使って、具体的な機械学習処理を行う方法を学ぶには、こちらが最適です。

日本語版はこちらになります。

対話的分析ライブラリ

データサイエンスの手法である EDA(Exploratory Data Analysis)では、対話的にデータを分析していきます。下記は、EDA に必要となる Python のライブラリー(NumPy, pandas, matplotlib など)の使い方を学ぶ本です。

日本語版はこちらになります。

こちらも参考にしてください。

ディープラーニング

ディープラーニングに興味がある方は、こちらをどうぞ。

こちらは入門編です。

こちらはより本格的な解説書です。内容は高度ですが、前提知識のある方にはとても分かりやすくてお勧めです。

こちらは、後半部分でTensorFlowの基礎的な概念(グラフ、分散学習など)がきちんと説明されていて、DQN/RNNの実装例なども紹介されています。

Kerasの使い方が知りたい方は、こちらをどうぞ。

こちらは、自然言語処理に特化したニューラルネットワークの解説書です。RNNの構造なども詳しく解説されています。

強化学習

強化学習についてはこちらをどうぞ

「強化学習がなぜ上手くいくのか?」という理論的な基礎を根本から丁寧に解説しています。

より専門的な教科書です。

その他

決して初心者向けではありませんが、下記は一般的な機械学習の理論について網羅的に説明された書籍です。

日本語版はこちらになります。

Riemann幾何学ユーザーのための「双対平坦な多様体」入門

play.google.com

※ 本記事を大幅に加筆した「Riemann幾何学ユーザーのための情報幾何学入門」を上記の書籍に掲載しています。

何の話かというと

Riemann幾何学の知識を前提に、「双対平坦な多様体」を自然な形で導入する説明方法ってないかなーと考えていて、思いついた説明のストーリーです。

Riemann幾何学の復習

Riemann幾何学では、接続 ∇ に対して、計量的(平行移動が内積を保存する)、かつ、捩率 T がゼロという次の条件が課せられます。

X g(Y,Z) = g(\nabla_XY,Z)+g(Y,\nabla_XZ) ―― (1)

T=\nabla_XY-\nabla_YX - [X,Y] = 0 ―― (2)

これらの条件を満たす接続は計量から一意に決まり、これが、Riemann接続と呼ばれるものでした。

これらの条件は、成分表示で次のようにも書けます。

\partial_kg_{ij}=\Gamma_{ki,j} + \Gamma_{kj,i} ―― (3)

T_{ij}^{\,\,\,k} = \Gamma_{ij}^{\,\,\,k} - \Gamma_{ji}^{\,\,\,k} = 0 ―― (4)

ここでさらに、曲率 R がゼロという条件を付け足すと、よく知られているように、この空間は平坦なユークリッド空間になります。すなわち、大域的に \Gamma_{ij}^{\,\,\,k}=0 となる局所座標系(アファイン座標系)が存在することになります。この事を実際に証明してみると、接続 ∇ が計量的であるという条件が大きな役割を果たすことがわかります。それでは、仮に、接続 ∇ が計量的であるという条件をはずしてみると、いったいどのような幾何学を展開することができるのでしょうか・・・。

双対接続の導入

接続 ∇ が計量的であるという条件をただはずすだけでは、あまりにも条件がゆるくなるので、ここで、ちょっとユニークな次の条件を考えてみます。

計量 g を持つ多様体 M に対して、二種類の接続 \nabla,\,\nabla^* が定義されており、次の2つの条件を満たすものとします。

X g(Y,Z) = g(\nabla_XY,Z)+g(Y,\nabla^*_XZ) ―― (5)

 T + T^* = 0 ―― (6)

(5) は、ある点の2つの接ベクトルについて、一方を \nabla で平行移動して、もう一方を \nabla^* で平行移動した際に、その内積が変化しないという条件を表します。これより、\overline\nabla = \frac{1}{2}(\nabla + \nabla^*) で新しい接続を定義すると、\overline\nabla について (1) が成り立つことが計算で確認できます。

また、捩率の成分表示 (4) を見ると \overline\nabla の捩率は \overline T = \frac{1}{2}(T + T^*) になることがわかります。つまり、(6) は、\overline\nabla の捩率がゼロであることを要請しており、結局は、\overline\nabla がRiemann接続であることになります。

このような条件を満たす接続の組 (\nabla,\,\nabla^*) を双対接続と呼ぶことにします。(厳密には、単に「双対接続」と言った場合は、(6)の条件は課せられません。ただし、本稿の主題となる「双対平坦な空間」では必ず (6) の条件が満たされます。)

双対平坦な空間

ここで、双対接続における、それぞれの接続についての曲率、および、捩率について考えると、次の面白い関係が証明されます。

[定理 1] \nabla に関する曲率 R がゼロであることと、\nabla^* に関する曲率 R^* がゼロであることは同値である。

(証明)
点 P で接ベクトル A と B を取って、これらを閉曲線にそって、A は\nabla に関する平行移動、B は\nabla^* に関する平行移動を行う。点 P に戻ってきた際の接ベクトルを A' および B' とすると、(5) の条件より次が成り立つ。

 g(A,B) = g(A',B')

したがって、R=0 とすると、A=A' より、任意の A について、g(A,B) = g(A,B')、すなわち、g(A,B-B') = 0 が成立する。計量の正定値性から、これは、B=B'、すなわち、R'=0 を意味する。(証明終わり)

これと同様に、(6) の条件から、 \nabla に関する捩率 T がゼロであることと、\nabla^* に関する捩率 T^* がゼロであることが同値になることもすぐにわかります。つまり、 \nabla が(R=0, T=0 という意味で)平坦ならば、 \nabla^* も(R^*=0, T^*=0 という意味で)平坦になるのです。このように、双対接続を持っており、さらに、両方の接続について平坦な空間を「双対平坦な空間」と呼ぶことにします。

双対平坦な空間では、それぞれの接続についてアファイン座標系、すなわち、大域的に \Gamma_{ij}^{\,\,\,k}=0 となる局所座標系が存在します。ただし、一方の接続に対するアファイン座標系は、他方の接続に対するアファイン座標系にはなりません。それぞれのアファイン座標系の間には、何か特別な関係があるのでしょうか・・・?

双対平坦な空間の構成方法

一般の双対平坦な空間を調べる前に、具体的な双対平坦な空間(双対接続)を作り出す「3分クッキング」の例を紹介します。

まずはじめに、内積も接続も持たない多様体 M を持ってきて、この上に、適当な座標系 \theta と、この座標系で見た時に、微分可能で凸関数になっている関数 \psi(\theta) を用意します。\psi(\theta) が凸関数であることから、ヘッセ行列は正定値になるので、これをこの空間の計量として導入します。

 g_{ij}(\theta) = \frac{\partial^2}{\partial \theta_i \partial \theta_j}\psi(\theta) ―― (7)

さらに、この座標系がアファイン座標系になるように、接続 \nabla を導入しておきます。

つづいて、双対座標の方を用意します。\psi(\theta) が凸関数であることから、次のルジャンドル変換によって、双対座標 {\eta} と双対凸関数 \varphi({\eta}) を定義することができます。

 \eta_i := \partial_i\psi({\theta}) ―― (8)

 \varphi({\eta}) := \max_{{\theta}'}\left\{\theta'^i\eta_i-\psi({\theta}')\right\} ―― (9)

以下の議論では、\theta{\eta} は (8) の関係で互いの関数になっていると理解します。

ちなみに、ルジャンドル変換の性質として、次の双対関係が成り立つことも証明されます。

 \theta^i = \partial^i\varphi({\eta})

 \psi({\theta}) = \max_{{\eta}'}\left\{\theta^i\eta'_i-\varphi({\eta}')\right\}

 \psi({\theta})+\varphi({\eta})=\theta^i\eta_i

(証明は下記を参照)
enakai00.hatenablog.com

そして、双対座標 \eta がアファイン座標系になるように、接続 \nabla^* を導入します。

そうすると、接続の組 (\nabla,\,\nabla^*) は、(5) の条件を満たす「双対接続」になることが証明されます。

[証明]
\theta 座標系では、\Gamma の成分はゼロになるので、(3) と同様の成分表示を考えて、(5) は次と同値になる。

 \partial_kg_{ij}=\Gamma^*_{kj,i} ―― (10)

一方、\eta 座標系では \Gamma^* の成分はゼロになることから、\Gamma^* の成分を \eta 座標系から \theta 座標系に変換する公式を書き下すと次になる。

 {\Gamma^*}_{ij}^{\,\,\,k} = \frac{\partial^2\eta_l}{\partial\theta^i\partial\theta^j}\frac{\partial \theta^k}{\partial \eta_l}

ここで g_{km} = \partial_k\partial_m\psi(\theta) = \frac{\partial\eta_m}{\partial\theta_k} を両辺にかけると

 (左辺)= {\Gamma^*}_{ij}^{\,\,\,k}g_{km} = {\Gamma^*}_{ij,m}

 (右辺)= \frac{\partial^2\eta_l}{\partial\theta^i\partial\theta^j}\frac{\partial \theta^k}{\partial \eta_l}\frac{\partial\eta_m}{\partial\theta_k}
=\frac{\partial^2\eta_l}{\partial\theta^i\partial\theta^j}\delta_m^l
=\frac{\partial^2\eta_m}{\partial\theta^i\partial\theta^j}
=\partial_i\partial_j\partial_m\psi(\theta)
=\partial_ig_{mj}

これで (10) が示された。(証明終わり)

ちなみに、この証明の中で使った関係式 g_{ij} = \frac{\partial\eta_i}{\partial\theta_j} は、g_{ij} が座標変換 \eta \rightarrow \theta のヤコビ行列になっていることを示しており、その逆行列は逆変換のヤコビ行列 g^{ij}= \frac{\partial\theta_i}{\partial\eta_j} で与えられることになります。これはちょうど、座標変換の公式と g_{ij}g^{ij} による足の上げ下げが一致することを示しています。

 \eta_i = g_{ij}\theta^j,\,\,\theta^i = g^{ij}\eta_j

同様に \partial_i\theta^i による偏微分、\partial^i\eta_i による偏微分とみなすことにすれば、偏微分演算子についても足の上げ下げが可能となります。

すると次の計算からわかるように、それぞれの座標系における基底ベクトル \{\partial_i\}\{\partial^i\} は互いに直行する座標系になっていることもわかります。

 g(\partial_i,\partial^j) = g(\partial_i,g^{jk}\partial_k) = g^{jk}g_{ik} = \delta^j_i ―― (11)

ここから、\partial_i\partial^i の内積は座標に依存しない定数ということになりますが、これは、双対接続の意味を考えると当然です。座標系 \theta は接続 \nabla に対するアファイン座標系ですので、定数成分の接ベクトル場 a^i\partial_i は接続 \nabla に関して並行なベクトルの集まりになります。同様に、定数成分の接ベクトル場 b_j\partial^j は接続 \nabla^* に関して並行です。したがって、これらの各点での内積はすべて同じ値になるはずで、それが g(a^i\partial_i, b_j\partial^j) = a^ib_i になるというわけです。

見方を変えると、この構成方法のポイントは、(7)(8) に集約されると言えます。これらから、g_{ij} = \frac{\partial\eta_i}{\partial\theta_j} という「計量=ヤコビ行列」という関係が生まれて、その結果、(11) のようにそれぞれの平行移動で正規直行性(すなわち計量)が保存されるような基底ベクトルが作られたというわけです。

以上から、任意の座標系 \theta と任意の凸関数 \psi(\theta) があれば、そこからルジャンドル変換を通して、自然な形で、双対平坦な空間とそれぞれの接続に対するアファイン座標系が構成できることがわかりました。凸関数の数だけ双対平坦な空間があるというわけです。

それでは逆に、任意の双対平坦な空間があった時、逆にそれを導くような凸関数を見つけることはできるのでしょうか・・・? 実はこれができてしまうのです。どどーーーん。

任意の双対平坦な空間に対応する凸関数がある事の証明

(7)(8)から(11)に至る道筋を逆にたどっていきます・・・。

任意の双対平坦な多様体 M があるとして、その計量と双対接続をまとめて (g,\nabla,\nabla^*) と表記します。

[補題 1] 双対平坦な多様体 M に対して、次を満たす \nabla アファイン座標系 \theta\nabla^* アファイン座標系 \eta を取ることができる。

 g(\partial_i,\partial^j) = g(\partial_i,g^{jk}\partial_k) = g^{jk}g_{ik} = \delta^j_i ―― (11)

ここで、\partial_i=\frac{\partial}{\partial \theta^i},\,\,\partial^i=\frac{\partial}{\partial \eta_i} とする。

(証明)
M が双対平坦であることから、何らかの \nabla アファイン座標系 x\nabla^* アファイン座標系 y が存在する。M 上の点 P を固定して、点 P における内積値を用いて、定数行列 G を次で定義する。

 G_{ij} = g\left(\left(\frac{\partial}{\partial x^i}\right)_P, \left(\frac{\partial}{\partial y^j}\right)_P\right)

ここで新しい座標系 \theta\eta を次で定義する。

 \theta^i = x^i,\,\,\eta_i=G_{ij}y^j

アファイン座標系の定数行列 G による一次変換は再びアファイン座標系になるので、\eta\nabla^* アファイン座標系である。これらが (11) の関係を満たすことを示す。

まず、点 P において考えると、次が自明に成り立つ。

 g\left(\left(\frac{\partial}{\partial \theta^i}\right)_P, \left(\frac{\partial}{\partial \eta_j}\right)_P\right)
= g\left(\left(\frac{\partial}{\partial x^i}\right)_P, \left(\sum_k (G^{-1})_{jk}\frac{\partial}{\partial y_k}\right)_P\right)
= G^{-1}_{jk}G_{ik} = \delta^j_i

一方、\frac{\partial}{\partial \theta^i}\frac{\partial}{\partial \eta_j} を定数係数 1 を持った定数成分の接ベクトル場と考えると、 \theta\eta がアファイン座標系であることから、これらは、それぞれ、\nabla\nabla^* に関して平行な接ベクトル場と言える。したがって、双対接続の定義よりすべての点で内積が同じになることから、すべての点で (11) が成り立つことが言える。(証明終わり)

[補題 2] 補題 1 を満たす座標系 \theta,\,\,\eta について、次が成り立つ。

 g_{ij} = g(\partial_i, \partial_j) = \frac{\partial\eta_i}{\partial\theta^j} ―― (12)

 g^{ij} = g(\partial^i, \partial^j) = \frac{\partial\theta^i}{\partial\eta_j} ―― (13)

(証明)
g(\partial_i, \partial_j)=g\left(\frac{\partial\eta_k}{\partial\theta_i}\partial^k, \partial_j\right)
= \frac{\partial\eta_k}{\partial\theta_i} \delta^k_j = \frac{\partial\eta_j}{\partial\theta_i}

g の対称性より、これは (12) に等しい。(13) も同様の計算になる。(証明終わり)

[補題 3] 補題 1 を満たす座標系 \theta,\,\,\eta について、2つの凸関数 \psi(\theta), \,\,\varphi(\eta) が存在して、次が成立する。

 \eta_i = \partial_i\psi(\theta),\,\,\theta^i = \partial^i\varphi(\eta)

 \psi(\theta) + \varphi(\eta) - \theta^i\eta_i = 0

(証明)
(12) において g の対称性より、\partial_j\eta_i = \partial_i\eta_j となるので、両辺を \theta_j で積分して

 \eta_i = \int^{\theta_j}\partial_i\eta_jd\theta_j + c=\partial_i\left( \int^{\theta_j}\eta_jd\theta_j + c\theta_i \right)c は積分の始点に依存する定数)

従って、\psi(\theta) = \int^{\theta_j}\eta_jd\theta_j + c\theta_i +CC は任意の定数)として、\eta_i = \partial_i\psi(\theta) が成り立つ。\theta^i = \partial^i\varphi(\eta) についても同様。

また、(12)(13) より、\psi(\theta), \,\,\varphi(\eta) のヘッセ行列は、計量の行列(およびその逆行列)に一致するので、正定値行列であり、これらは凸関数になっている。

さらに、次の計算から、関数 \psi(\theta) + \varphi(\eta) - \theta^i\eta_i の全微分は 0 になることがわかる。

 d\left(\psi(\theta) + \varphi(\eta) - \theta^i\eta_i\right) = (\partial_i\psi)d\theta^i + (\partial^i\varphi)d\eta_i -\eta_id\theta^i - \theta^id\eta_i = 0

従って、この関数は定数関数であり、\psi(\theta) が持つ任意定数 C を調整すれば、\psi(\theta) + \varphi(\eta) - \theta^i\eta_i = 0 にできる。(証明終わり)

補題 1 〜補題 3 により、任意の双対平坦な空間 M に対して、凸関数 \psi(\theta) によるルジャンドル変換で結びついた、双対アファイン座標系が構成できることがわかりました。

以上の議論を振り返ると、「双対的な意味で計量が保存される」ことと、「双対的な意味でのアファイン座標系(双対アファイン座標系)が存在」することから、「計量=ヤコビ行列」という計量に対する強い縛りが得られたことがわかります。これは、Riemann多様体において「計量の保存」と「アファイン座標系の存在」という条件から「計量=単位行列」(つまりユークリッド空間)という縛りが得られたことに対応すると考えられます。

Riemann多様体の場合、ユークリッド空間になってしまえばそれ以上の広がりはありませんが、双対平坦な空間の場合は、「計量=ヤコビ行列」において、計量の対称性からヤコビ行列の対称性という奇妙な可積分条件が生まれて、そこからポテンシャル関数 \psi(\theta) の存在とルジャンドル変換による座標系の繋がりが生み出されたことになります。

で・・・

ここから先は、ポテンシャル関数からダイバージェンスを導入して、拡張ピタゴラスの定理へと標準的にすすめばよいのかと。

Using Cloud ML with distributed TensorFlow code

What is this?

enakai00.hatenablog.com
As I mentioned in the article above, you need to make some modifications on your TensorFlow codes when you train the model with the distributed mode. I will explain some key aspects you need know to write TensorFlow codes for the distributed training.

There are some strategies to train a model with multiple nodes, and I will focus on the simplest one, "asynchronous data parallel" where all nodes share the same neural network and calculate a gradient vector independently from some part of the training set (as in the same manner with the mini-batch process). Then variables are updated with the gradient vectors from those nodes. Roughly speaking, if you iterate 10,000 batches with 10 nodes, each node works on 1,000 batches in parallel. You can find more details in the Google research paper.

Note: The code snippets in this note are based on TensorFlow r0.12.

Basic architecture

The distributed training is done by the three players:

  • Parameter server:Update variables with gradient vectors from workers (and a master).
  • Worker:Calculate a gradient vector from the training set.
  • Master:Coordinates the operations of workers. (A master can be one of workers, and can do some additional house keeping tasks if necessary.)

In typical deployments, there are a few parameter servers, a single master and a bunch of workers. When you submit a job into Cloud ML, these nodes are created in containers and your code starts running on them.

Getting a cluster configuration and a role of the node

Since the same code runs in all nodes, it needs to branch the operation based on the node's role. So first of all, you need to get a cluster configuration and a role of the node with the following code.

  # Get cluster and node info
  env = json.loads(os.environ.get('TF_CONFIG', '{}'))
  cluster_info = env.get('cluster', None)
  cluster_spec = tf.train.ClusterSpec(cluster_info)
  task_info = env.get('task', None)
  job_name, task_index = task_info['type'], task_info['index']

It retrieves the necessary information from the environmental variable TF_CONFIG and stores the following information.

  • cluster_spec: the cluster configuration
  • job_name: the node's role (ps, master, worker)
  • task_info: a serial number starting from 0 to distinguish nodes with the same role.

By passing these data, you can create a server object which handles the communication with other nodes.

  server = tf.train.Server(cluster_spec,
                           job_name=job_name, task_index=task_index)

Code for parameter servers

If the job_name is 'ps', you simply start the server object and it works as a parameter server. That's it. The code never returns until terminated by an external signal.

  if job_name == "ps": # Parameter server
    server.join()

Code for workers

On workers and a master, you iterate the variable update loops with a coordination mechanism provided by a Supervisor object. Tasks which require a coordination such as creating a session and saving a checkpoint are done through the supervisor. Internally, a master node works as a chief coordinator.

The following code creates a Supervisor object.

  if job_name == "master" or job_name == "worker": # Worker node
    is_chief = (job_name == "master")
...
        global_step = tf.Variable(0, trainable=False)
        init_op = tf.global_variables_initializer()
        saver = tf.train.Saver()
        # Create a supervisor
        sv = tf.train.Supervisor(is_chief=is_chief, logdir=LOG_DIR,
                                 init_op=init_op, saver=saver, summary_op=None,
                                 global_step=global_step, save_model_secs=0)

Options:

  • is_chief:True if the node is a master.
  • logdir:a directory to store checkpoint files.
  • init_op:a variable initialization operation at the session creation.
  • saver:a Saver object to save checkpoint files.
  • global_step:a global counter of the training loop.
  • summary_op:a summary operation (used by TensorBoard). None if you save summary logs by hand without a supervisor.
  • save_model_secs:time interval to save checkpoints. None if you save checkpoints by hand without a supervisor.

When you define a model, it must be done inside a "with tf.device" clause which specifies the node's role through job_name and task_index.

  device_fn = tf.train.replica_device_setter(
    cluster=cluster_spec,
    worker_device="/job:%s/task:%d" % (job_name, task_index)
  )
...
  if job_name == "master" or job_name == "worker": # Worker node
    is_chief = (job_name == "master")

    with tf.Graph().as_default() as graph:
      with tf.device(device_fn):

Now you can create a session and start the training loop. The following is a simple example to clarify some key points.

        # Create a session and run training loops
        with sv.managed_session(server.target) as sess:
          reports, step = 0, 0
          start_time = time.time()
          while not sv.should_stop() and step < MAX_STEPS:
             images, labels = mnist_data.train.next_batch(BATCH_SIZE)
             feed_dict = {x:images, t:labels, keep_prob:0.5}
             _, loss_val, step = sess.run([train_step, loss, global_step],
                                          feed_dict=feed_dict)
             if step > CHECKPOINT * reports:
               reports += 1
               logging.info("Step: %d, Train loss: %f" % (step, loss_val))
               if is_chief:
                 # Save checkpoint
                 sv.saver.save(sess, sv.save_path, global_step=step)
...
                 # Save summary
                 feed_dict = {test_loss:loss_val, test_accuracy:acc_val}
                 sv.summary_computed(sess,
                   sess.run(summary, feed_dict=feed_dict), step)
                 sv.summary_writer.flush()

The first point is you evaluate the variable "global step" together with the optimization algorithm "train_step". It increases the global_step by one and you can see the total iteration count from the resulting value (stored in "step" in this example.) In this example, by checking the value of step, each node outputs the training step and training loss periodically with the interval of CHECKPOINT. In addition, it does some additional tasks on the master (by checking is_cheif variable defined before) such as saving a checkpoint file and saving a summary log. These tasks are done through the Supervisor object 'sv'. (sv.save_path corresponds to the directory specified with the logdir option.)

Exporting the trained model

One of the tricky points in the distributed training is to export the trained model. The model defined in the previous code has some additional cluster and node information, but these are unnecessary when you restore the model for predictions, especially, on a single node. So you need to build another model which can be restored on a single node, and then, export it.

This should be done at the exit point of the training loop. In the following example, it saves a checkpoint file with the latest variables and calls a model export function.

          if is_chief: # Export the final model
            sv.saver.save(sess, sv.save_path, global_step=sess.run(global_step))
            export_model(tf.train.latest_checkpoint(LOG_DIR))

The following is an example of the model exporter which builds a new model for predictions and restores the latest checkpoint. The extra variables in the checkpoint (which are not used in the new model) are simply ignored. Then the model is exported with the saver object. The API inputs/outputs objects stored in a collection are required by the Cloud ML's prediction service.

def export_model(last_checkpoint):
  # create a session with a new graph.
  with tf.Session(graph=tf.Graph()) as sess:
    x = tf.placeholder(tf.float32, [None, 784])
    p = mnist.get_model(x, None, training=False)

    # Define key elements
    input_key = tf.placeholder(tf.int64, [None,])
    output_key = tf.identity(input_key)

    # Define API inputs/outpus object
    inputs = {'key': input_key.name, 'image': x.name}
    outputs = {'key': output_key.name, 'scores': p.name}
    tf.add_to_collection('inputs', json.dumps(inputs))
    tf.add_to_collection('outputs', json.dumps(outputs))

    init_op = tf.global_variables_initializer()
    sess.run(init_op)

    # Restore the latest checkpoint and save the model
    saver = tf.train.Saver()
    saver.restore(sess, last_checkpoint)
    saver.export_meta_graph(filename=MODEL_DIR + '/export.meta')
    saver.save(sess, MODEL_DIR + '/export',
               write_meta_graph=False)

The full example code.

trainer
├── __init__.py # empty file.
├── mnist.py # model definition.
└── task.py # training code.

Note that the model definition is just a quick example of CNN for MNIST dataset.

trainer/task.py

import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
import numpy as np
import time, json, os, logging

import mnist

flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_integer('batch_size', 100,
                     'Batch size. Must divide evenly into the dataset sizes.')
flags.DEFINE_integer('max_steps', 10000, 'Number of steps to run trainer.')
flags.DEFINE_integer('checkpoint', 100, 'Interval steps to save checkpoint.')
flags.DEFINE_string('log_dir', '/tmp/logs',
                    'Directory to store checkpoints and summary logs')
flags.DEFINE_string('model_dir', '/tmp/model',
                    'Directory to store trained model')


# Global flags
BATCH_SIZE = FLAGS.batch_size
MODEL_DIR = FLAGS.model_dir
LOG_DIR = FLAGS.log_dir
MAX_STEPS = FLAGS.max_steps
CHECKPOINT = FLAGS.checkpoint


def export_model(last_checkpoint):
  # create a session with a new graph.
  with tf.Session(graph=tf.Graph()) as sess:
    x = tf.placeholder(tf.float32, [None, 784])
    p = mnist.get_model(x, None, training=False)

    # Define key elements
    input_key = tf.placeholder(tf.int64, [None,])
    output_key = tf.identity(input_key)

    # Define API inputs/outpus object
    inputs = {'key': input_key.name, 'image': x.name}
    outputs = {'key': output_key.name, 'scores': p.name}
    tf.add_to_collection('inputs', json.dumps(inputs))
    tf.add_to_collection('outputs', json.dumps(outputs))

    init_op = tf.global_variables_initializer()
    sess.run(init_op)

    # Restore the latest checkpoint and save the model
    saver = tf.train.Saver()
    saver.restore(sess, last_checkpoint)
    saver.export_meta_graph(filename=MODEL_DIR + '/export.meta')
    saver.save(sess, MODEL_DIR + '/export',
               write_meta_graph=False)
  

def run_training():
  # Get cluster and node info
  env = json.loads(os.environ.get('TF_CONFIG', '{}'))
  cluster_info = env.get('cluster', None)
  cluster_spec = tf.train.ClusterSpec(cluster_info)
  task_info = env.get('task', None)
  job_name, task_index = task_info['type'], task_info['index']

  device_fn = tf.train.replica_device_setter(
    cluster=cluster_spec,
    worker_device="/job:%s/task:%d" % (job_name, task_index)
  )

  logging.info('Start job:%s, index:%d' % (job_name, task_index))

  # Create server
  server = tf.train.Server(cluster_spec,
                           job_name=job_name, task_index=task_index)

  if job_name == "ps": # Parameter server
    server.join()

  if job_name == "master" or job_name == "worker": # Worker node
    is_chief = (job_name == "master")

    with tf.Graph().as_default() as graph:
      with tf.device(device_fn):

        # Prepare training data
        mnist_data = input_data.read_data_sets("/tmp/data/", one_hot=True)

        # Create placeholders
        x = tf.placeholder_with_default(
          tf.zeros([BATCH_SIZE, 784], tf.float32), shape=[None, 784])
        t = tf.placeholder_with_default(
          tf.zeros([BATCH_SIZE, 10], tf.float32), shape=[None, 10])
        keep_prob = tf.placeholder_with_default(
          tf.zeros([], tf.float32), shape=[])
        global_step = tf.Variable(0, trainable=False)

        # Add test loss and test accuracy to summary
        test_loss = tf.placeholder_with_default(
          tf.zeros([], tf.float32), shape=[])
        test_accuracy = tf.placeholder_with_default(
          tf.zeros([], tf.float32), shape=[])
        tf.summary.scalar("Test_loss", test_loss) 
        tf.summary.scalar("Test_accuracy", test_accuracy) 

        # Define a model
        p = mnist.get_model(x, keep_prob, training=True)
        train_step, loss, accuracy = mnist.get_trainer(p, t, global_step)

        init_op = tf.global_variables_initializer()
        saver = tf.train.Saver()
        summary = tf.summary.merge_all()

        # Create a supervisor
        sv = tf.train.Supervisor(is_chief=is_chief, logdir=LOG_DIR,
                                 init_op=init_op, saver=saver, summary_op=None,
                                 global_step=global_step, save_model_secs=0)
    
        # Create a session and run training loops
        with sv.managed_session(server.target) as sess:
          reports, step = 0, 0
          start_time = time.time()
          while not sv.should_stop() and step < MAX_STEPS:
             images, labels = mnist_data.train.next_batch(BATCH_SIZE)
             feed_dict = {x:images, t:labels, keep_prob:0.5}
             _, loss_val, step = sess.run([train_step, loss, global_step],
                                          feed_dict=feed_dict)
             if step > CHECKPOINT * reports:
               reports += 1
               logging.info("Step: %d, Train loss: %f" % (step, loss_val))
               if is_chief:
                 # Save checkpoint
                 sv.saver.save(sess, sv.save_path, global_step=step)

                 # Evaluate the test loss and test accuracy
                 loss_vals, acc_vals = [], []
                 for _ in range(len(mnist_data.test.labels) // BATCH_SIZE):
                   images, labels = mnist_data.test.next_batch(BATCH_SIZE)
                   feed_dict = {x:images, t:labels, keep_prob:1.0}
                   loss_val, acc_val = sess.run([loss, accuracy],
                                                feed_dict=feed_dict)
                   loss_vals.append(loss_val)
                   acc_vals.append(acc_val)
                 loss_val, acc_val = np.sum(loss_vals), np.mean(acc_vals)

                 # Save summary
                 feed_dict = {test_loss:loss_val, test_accuracy:acc_val}
                 sv.summary_computed(sess,
                   sess.run(summary, feed_dict=feed_dict), step)
                 sv.summary_writer.flush()

                 logging.info("Time elapsed: %d" % (time.time() - start_time))
                 logging.info("Step: %d, Test loss: %f, Test accuracy: %f" %
                              (step, loss_val, acc_val))

          # Finish training
          if is_chief: # Export the final model
            sv.saver.save(sess, sv.save_path, global_step=sess.run(global_step))
            export_model(tf.train.latest_checkpoint(LOG_DIR))

        sv.stop()  


def main(_):
  run_training()


if __name__ == '__main__':
  logging.basicConfig(level=logging.INFO) 
  tf.app.run()

trainer/mnist.py

import tensorflow as tf
import json

def get_model(x, keep_prob, training=True):
  num_filters1 = 32
  num_filters2 = 64

  with tf.name_scope('cnn'):
    with tf.name_scope('convolution1'):
      x_image = tf.reshape(x, [-1,28,28,1])
      
      W_conv1 = tf.Variable(tf.truncated_normal([5,5,1,num_filters1],
                                                stddev=0.1))
      h_conv1 = tf.nn.conv2d(x_image, W_conv1,
                             strides=[1,1,1,1], padding='SAME')
      
      b_conv1 = tf.Variable(tf.constant(0.1, shape=[num_filters1]))
      h_conv1_cutoff = tf.nn.relu(h_conv1 + b_conv1)
      
      h_pool1 = tf.nn.max_pool(h_conv1_cutoff, ksize=[1,2,2,1],
                               strides=[1,2,2,1], padding='SAME')

    with tf.name_scope('convolution2'):
      W_conv2 = tf.Variable(
                  tf.truncated_normal([5,5,num_filters1,num_filters2],
                                      stddev=0.1))
      h_conv2 = tf.nn.conv2d(h_pool1, W_conv2,
                             strides=[1,1,1,1], padding='SAME')
      
      b_conv2 = tf.Variable(tf.constant(0.1, shape=[num_filters2]))
      h_conv2_cutoff = tf.nn.relu(h_conv2 + b_conv2)
      
      h_pool2 = tf.nn.max_pool(h_conv2_cutoff, ksize=[1,2,2,1],
                               strides=[1,2,2,1], padding='SAME')

    with tf.name_scope('fully-connected'):
      h_pool2_flat = tf.reshape(h_pool2, [-1, 7*7*num_filters2])
      num_units1 = 7*7*num_filters2
      num_units2 = 1024
      w2 = tf.Variable(tf.truncated_normal([num_units1, num_units2]))
      b2 = tf.Variable(tf.constant(0.1, shape=[num_units2]))
      hidden2 = tf.nn.relu(tf.matmul(h_pool2_flat, w2) + b2)

    with tf.name_scope('output'):
      if training:
        hidden2_drop = tf.nn.dropout(hidden2, keep_prob)
      else:
        hidden2_drop = hidden2
      w0 = tf.Variable(tf.zeros([num_units2, 10]))
      b0 = tf.Variable(tf.zeros([10]))
      p = tf.nn.softmax(tf.matmul(hidden2_drop, w0) + b0)

  tf.summary.histogram("conv_filters1", W_conv1)
  tf.summary.histogram("conv_filters2", W_conv2)

  return p

  
def get_trainer(p, t, global_step):
  with tf.name_scope('optimizer'):
    loss = -tf.reduce_sum(t * tf.log(p), name='loss')
    train_step = tf.train.AdamOptimizer(0.0001).minimize(loss, global_step=global_step)
      
  with tf.name_scope('evaluator'):
    correct_prediction = tf.equal(tf.argmax(p, 1), tf.argmax(t, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction,
                                      tf.float32), name='accuracy')

  return train_step, loss, accuracy

You can train the model on Cloud ML by running the following commands from the Cloud Shell. You should run the commands in the parent directory of "trainer".

$ PROJECT_ID=project01 # your project ID
$ TRAIN_BUCKET="gs://$PROJECT_ID-mldata"
$ gsutil mkdir $TRAIN_BUCKET

$ cat << EOF > config.yaml
trainingInput:
  # Use a cluster with many workers and a few parameter servers.
  scaleTier: STANDARD_1
EOF

$ JOB_NAME="job01"
$ gsutil rm -rf $TRAIN_BUCKET/$JOB_NAME
$ touch .dummy
$ gsutil cp .dummy $TRAIN_BUCKET/$JOB_NAME/train/
$ gsutil cp .dummy $TRAIN_BUCKET/$JOB_NAME/model/

$ gcloud beta ml jobs submit training ${JOB_NAME} \
  --package-path=trainer \
  --module-name=trainer.task \
  --staging-bucket="${TRAIN_BUCKET}" \
  --region=us-central1 \
  --config=config.yaml \
  -- \
  --log_dir=$TRAIN_BUCKET/$JOB_NAME/train \
  --model_dir=$TRAIN_BUCKET/$JOB_NAME/model \
  --max_steps=10000

You can also see the training process on TensorBoard with the following command.

$  tensorboard --port 8080 --logdir $TRAIN_BUCKET/$JOB_NAME/train

After the training, the resulting model is stored under $TRAIN_BUCKET/$JOB_NAME/model. The following is a quick example of using the exported model for a prediction.

#!/usr/bin/python
import tensorflow as tf
import numpy as np
import json
from tensorflow.examples.tutorials.mnist import input_data

model_meta = 'gs://project01-mldata/job01/model/export.meta'
model_param = 'gs://project01-mldata/job01/model/export'

with tf.Graph().as_default() as graph:
  sess = tf.InteractiveSession()
  saver = tf.train.import_meta_graph(model_meta)
  saver.restore(sess, model_param)

  inputs = json.loads(tf.get_collection('inputs')[0])
  outputs = json.loads(tf.get_collection('outputs')[0])
  x = graph.get_tensor_by_name(inputs['image'])
  input_key = graph.get_tensor_by_name(inputs['key'])
  p = graph.get_tensor_by_name(outputs['scores'])
  output_key = graph.get_tensor_by_name(outputs['key'])

  mnist_data = input_data.read_data_sets("/tmp/data/", one_hot=True)
  images, labels = mnist_data.test.next_batch(10)
  index = range(10)
  keys, preds = sess.run([output_key, p], feed_dict={input_key:index, x:images})
  for key, pred, label in zip(keys, preds, labels):
    print key, np.argmax(pred), np.argmax(label)

The more sophisticated example

The example used in this note is straightforward but may not practical. If you are interested in a more sophisticated example, see the following one.

github.com

Disclaimer: All code snippets are released under Apache 2.0 License. This is not an official Google product.