めもめも

このブログに記載の内容は個人の見解であり、必ずしも所属組織の立場、戦略、意見を代表するものではありません。

Using Cloud ML with distributed TensorFlow code

What is this?

enakai00.hatenablog.com
As I mentioned in the article above, you need to make some modifications on your TensorFlow codes when you train the model with the distributed mode. I will explain some key aspects you need know to write TensorFlow codes for the distributed training.

There are some strategies to train a model with multiple nodes, and I will focus on the simplest one, "asynchronous data parallel" where all nodes share the same neural network and calculate a gradient vector independently from some part of the training set (as in the same manner with the mini-batch process). Then variables are updated with the gradient vectors from those nodes. Roughly speaking, if you iterate 10,000 batches with 10 nodes, each node works on 1,000 batches in parallel. You can find more details in the Google research paper.

Note: The code snippets in this note are based on TensorFlow r0.12.

Basic architecture

The distributed training is done by the three players:

  • Parameter server:Update variables with gradient vectors from workers (and a master).
  • Worker:Calculate a gradient vector from the training set.
  • Master:Coordinates the operations of workers. (A master can be one of workers, and can do some additional house keeping tasks if necessary.)

In typical deployments, there are a few parameter servers, a single master and a bunch of workers. When you submit a job into Cloud ML, these nodes are created in containers and your code starts running on them.

Getting a cluster configuration and a role of the node

Since the same code runs in all nodes, it needs to branch the operation based on the node's role. So first of all, you need to get a cluster configuration and a role of the node with the following code.

  # Get cluster and node info
  env = json.loads(os.environ.get('TF_CONFIG', '{}'))
  cluster_info = env.get('cluster', None)
  cluster_spec = tf.train.ClusterSpec(cluster_info)
  task_info = env.get('task', None)
  job_name, task_index = task_info['type'], task_info['index']

It retrieves the necessary information from the environmental variable TF_CONFIG and stores the following information.

  • cluster_spec: the cluster configuration
  • job_name: the node's role (ps, master, worker)
  • task_info: a serial number starting from 0 to distinguish nodes with the same role.

By passing these data, you can create a server object which handles the communication with other nodes.

  server = tf.train.Server(cluster_spec,
                           job_name=job_name, task_index=task_index)

Code for parameter servers

If the job_name is 'ps', you simply start the server object and it works as a parameter server. That's it. The code never returns until terminated by an external signal.

  if job_name == "ps": # Parameter server
    server.join()

Code for workers

On workers and a master, you iterate the variable update loops with a coordination mechanism provided by a Supervisor object. Tasks which require a coordination such as creating a session and saving a checkpoint are done through the supervisor. Internally, a master node works as a chief coordinator.

The following code creates a Supervisor object.

  if job_name == "master" or job_name == "worker": # Worker node
    is_chief = (job_name == "master")
...
        global_step = tf.Variable(0, trainable=False)
        init_op = tf.global_variables_initializer()
        saver = tf.train.Saver()
        # Create a supervisor
        sv = tf.train.Supervisor(is_chief=is_chief, logdir=LOG_DIR,
                                 init_op=init_op, saver=saver, summary_op=None,
                                 global_step=global_step, save_model_secs=0)

Options:

  • is_chief:True if the node is a master.
  • logdir:a directory to store checkpoint files.
  • init_op:a variable initialization operation at the session creation.
  • saver:a Saver object to save checkpoint files.
  • global_step:a global counter of the training loop.
  • summary_op:a summary operation (used by TensorBoard). None if you save summary logs by hand without a supervisor.
  • save_model_secs:time interval to save checkpoints. None if you save checkpoints by hand without a supervisor.

When you define a model, it must be done inside a "with tf.device" clause which specifies the node's role through job_name and task_index.

  device_fn = tf.train.replica_device_setter(
    cluster=cluster_spec,
    worker_device="/job:%s/task:%d" % (job_name, task_index)
  )
...
  if job_name == "master" or job_name == "worker": # Worker node
    is_chief = (job_name == "master")

    with tf.Graph().as_default() as graph:
      with tf.device(device_fn):

Now you can create a session and start the training loop. The following is a simple example to clarify some key points.

        # Create a session and run training loops
        with sv.managed_session(server.target) as sess:
          reports, step = 0, 0
          start_time = time.time()
          while not sv.should_stop() and step < MAX_STEPS:
             images, labels = mnist_data.train.next_batch(BATCH_SIZE)
             feed_dict = {x:images, t:labels, keep_prob:0.5}
             _, loss_val, step = sess.run([train_step, loss, global_step],
                                          feed_dict=feed_dict)
             if step > CHECKPOINT * reports:
               reports += 1
               logging.info("Step: %d, Train loss: %f" % (step, loss_val))
               if is_chief:
                 # Save checkpoint
                 sv.saver.save(sess, sv.save_path, global_step=step)
...
                 # Save summary
                 feed_dict = {test_loss:loss_val, test_accuracy:acc_val}
                 sv.summary_computed(sess,
                   sess.run(summary, feed_dict=feed_dict), step)
                 sv.summary_writer.flush()

The first point is you evaluate the variable "global step" together with the optimization algorithm "train_step". It increases the global_step by one and you can see the total iteration count from the resulting value (stored in "step" in this example.) In this example, by checking the value of step, each node outputs the training step and training loss periodically with the interval of CHECKPOINT. In addition, it does some additional tasks on the master (by checking is_cheif variable defined before) such as saving a checkpoint file and saving a summary log. These tasks are done through the Supervisor object 'sv'. (sv.save_path corresponds to the directory specified with the logdir option.)

Exporting the trained model

One of the tricky points in the distributed training is to export the trained model. The model defined in the previous code has some additional cluster and node information, but these are unnecessary when you restore the model for predictions, especially, on a single node. So you need to build another model which can be restored on a single node, and then, export it.

This should be done at the exit point of the training loop. In the following example, it saves a checkpoint file with the latest variables and calls a model export function.

          if is_chief: # Export the final model
            sv.saver.save(sess, sv.save_path, global_step=sess.run(global_step))
            export_model(tf.train.latest_checkpoint(LOG_DIR))

The following is an example of the model exporter which builds a new model for predictions and restores the latest checkpoint. The extra variables in the checkpoint (which are not used in the new model) are simply ignored. Then the model is exported with the saver object. The API inputs/outputs objects stored in a collection are required by the Cloud ML's prediction service.

def export_model(last_checkpoint):
  # create a session with a new graph.
  with tf.Session(graph=tf.Graph()) as sess:
    x = tf.placeholder(tf.float32, [None, 784])
    p = mnist.get_model(x, None, training=False)

    # Define key elements
    input_key = tf.placeholder(tf.int64, [None,])
    output_key = tf.identity(input_key)

    # Define API inputs/outpus object
    inputs = {'key': input_key.name, 'image': x.name}
    outputs = {'key': output_key.name, 'scores': p.name}
    tf.add_to_collection('inputs', json.dumps(inputs))
    tf.add_to_collection('outputs', json.dumps(outputs))

    init_op = tf.global_variables_initializer()
    sess.run(init_op)

    # Restore the latest checkpoint and save the model
    saver = tf.train.Saver()
    saver.restore(sess, last_checkpoint)
    saver.export_meta_graph(filename=MODEL_DIR + '/export.meta')
    saver.save(sess, MODEL_DIR + '/export',
               write_meta_graph=False)

The full example code.

trainer
├── __init__.py # empty file.
├── mnist.py # model definition.
└── task.py # training code.

Note that the model definition is just a quick example of CNN for MNIST dataset.

trainer/task.py

import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
import numpy as np
import time, json, os, logging

import mnist

flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_integer('batch_size', 100,
                     'Batch size. Must divide evenly into the dataset sizes.')
flags.DEFINE_integer('max_steps', 10000, 'Number of steps to run trainer.')
flags.DEFINE_integer('checkpoint', 100, 'Interval steps to save checkpoint.')
flags.DEFINE_string('log_dir', '/tmp/logs',
                    'Directory to store checkpoints and summary logs')
flags.DEFINE_string('model_dir', '/tmp/model',
                    'Directory to store trained model')


# Global flags
BATCH_SIZE = FLAGS.batch_size
MODEL_DIR = FLAGS.model_dir
LOG_DIR = FLAGS.log_dir
MAX_STEPS = FLAGS.max_steps
CHECKPOINT = FLAGS.checkpoint


def export_model(last_checkpoint):
  # create a session with a new graph.
  with tf.Session(graph=tf.Graph()) as sess:
    x = tf.placeholder(tf.float32, [None, 784])
    p = mnist.get_model(x, None, training=False)

    # Define key elements
    input_key = tf.placeholder(tf.int64, [None,])
    output_key = tf.identity(input_key)

    # Define API inputs/outpus object
    inputs = {'key': input_key.name, 'image': x.name}
    outputs = {'key': output_key.name, 'scores': p.name}
    tf.add_to_collection('inputs', json.dumps(inputs))
    tf.add_to_collection('outputs', json.dumps(outputs))

    init_op = tf.global_variables_initializer()
    sess.run(init_op)

    # Restore the latest checkpoint and save the model
    saver = tf.train.Saver()
    saver.restore(sess, last_checkpoint)
    saver.export_meta_graph(filename=MODEL_DIR + '/export.meta')
    saver.save(sess, MODEL_DIR + '/export',
               write_meta_graph=False)
  

def run_training():
  # Get cluster and node info
  env = json.loads(os.environ.get('TF_CONFIG', '{}'))
  cluster_info = env.get('cluster', None)
  cluster_spec = tf.train.ClusterSpec(cluster_info)
  task_info = env.get('task', None)
  job_name, task_index = task_info['type'], task_info['index']

  device_fn = tf.train.replica_device_setter(
    cluster=cluster_spec,
    worker_device="/job:%s/task:%d" % (job_name, task_index)
  )

  logging.info('Start job:%s, index:%d' % (job_name, task_index))

  # Create server
  server = tf.train.Server(cluster_spec,
                           job_name=job_name, task_index=task_index)

  if job_name == "ps": # Parameter server
    server.join()

  if job_name == "master" or job_name == "worker": # Worker node
    is_chief = (job_name == "master")

    with tf.Graph().as_default() as graph:
      with tf.device(device_fn):

        # Prepare training data
        mnist_data = input_data.read_data_sets("/tmp/data/", one_hot=True)

        # Create placeholders
        x = tf.placeholder_with_default(
          tf.zeros([BATCH_SIZE, 784], tf.float32), shape=[None, 784])
        t = tf.placeholder_with_default(
          tf.zeros([BATCH_SIZE, 10], tf.float32), shape=[None, 10])
        keep_prob = tf.placeholder_with_default(
          tf.zeros([], tf.float32), shape=[])
        global_step = tf.Variable(0, trainable=False)

        # Add test loss and test accuracy to summary
        test_loss = tf.placeholder_with_default(
          tf.zeros([], tf.float32), shape=[])
        test_accuracy = tf.placeholder_with_default(
          tf.zeros([], tf.float32), shape=[])
        tf.summary.scalar("Test_loss", test_loss) 
        tf.summary.scalar("Test_accuracy", test_accuracy) 

        # Define a model
        p = mnist.get_model(x, keep_prob, training=True)
        train_step, loss, accuracy = mnist.get_trainer(p, t, global_step)

        init_op = tf.global_variables_initializer()
        saver = tf.train.Saver()
        summary = tf.summary.merge_all()

        # Create a supervisor
        sv = tf.train.Supervisor(is_chief=is_chief, logdir=LOG_DIR,
                                 init_op=init_op, saver=saver, summary_op=None,
                                 global_step=global_step, save_model_secs=0)
    
        # Create a session and run training loops
        with sv.managed_session(server.target) as sess:
          reports, step = 0, 0
          start_time = time.time()
          while not sv.should_stop() and step < MAX_STEPS:
             images, labels = mnist_data.train.next_batch(BATCH_SIZE)
             feed_dict = {x:images, t:labels, keep_prob:0.5}
             _, loss_val, step = sess.run([train_step, loss, global_step],
                                          feed_dict=feed_dict)
             if step > CHECKPOINT * reports:
               reports += 1
               logging.info("Step: %d, Train loss: %f" % (step, loss_val))
               if is_chief:
                 # Save checkpoint
                 sv.saver.save(sess, sv.save_path, global_step=step)

                 # Evaluate the test loss and test accuracy
                 loss_vals, acc_vals = [], []
                 for _ in range(len(mnist_data.test.labels) // BATCH_SIZE):
                   images, labels = mnist_data.test.next_batch(BATCH_SIZE)
                   feed_dict = {x:images, t:labels, keep_prob:1.0}
                   loss_val, acc_val = sess.run([loss, accuracy],
                                                feed_dict=feed_dict)
                   loss_vals.append(loss_val)
                   acc_vals.append(acc_val)
                 loss_val, acc_val = np.sum(loss_vals), np.mean(acc_vals)

                 # Save summary
                 feed_dict = {test_loss:loss_val, test_accuracy:acc_val}
                 sv.summary_computed(sess,
                   sess.run(summary, feed_dict=feed_dict), step)
                 sv.summary_writer.flush()

                 logging.info("Time elapsed: %d" % (time.time() - start_time))
                 logging.info("Step: %d, Test loss: %f, Test accuracy: %f" %
                              (step, loss_val, acc_val))

          # Finish training
          if is_chief: # Export the final model
            sv.saver.save(sess, sv.save_path, global_step=sess.run(global_step))
            export_model(tf.train.latest_checkpoint(LOG_DIR))

        sv.stop()  


def main(_):
  run_training()


if __name__ == '__main__':
  logging.basicConfig(level=logging.INFO) 
  tf.app.run()

trainer/mnist.py

import tensorflow as tf
import json

def get_model(x, keep_prob, training=True):
  num_filters1 = 32
  num_filters2 = 64

  with tf.name_scope('cnn'):
    with tf.name_scope('convolution1'):
      x_image = tf.reshape(x, [-1,28,28,1])
      
      W_conv1 = tf.Variable(tf.truncated_normal([5,5,1,num_filters1],
                                                stddev=0.1))
      h_conv1 = tf.nn.conv2d(x_image, W_conv1,
                             strides=[1,1,1,1], padding='SAME')
      
      b_conv1 = tf.Variable(tf.constant(0.1, shape=[num_filters1]))
      h_conv1_cutoff = tf.nn.relu(h_conv1 + b_conv1)
      
      h_pool1 = tf.nn.max_pool(h_conv1_cutoff, ksize=[1,2,2,1],
                               strides=[1,2,2,1], padding='SAME')

    with tf.name_scope('convolution2'):
      W_conv2 = tf.Variable(
                  tf.truncated_normal([5,5,num_filters1,num_filters2],
                                      stddev=0.1))
      h_conv2 = tf.nn.conv2d(h_pool1, W_conv2,
                             strides=[1,1,1,1], padding='SAME')
      
      b_conv2 = tf.Variable(tf.constant(0.1, shape=[num_filters2]))
      h_conv2_cutoff = tf.nn.relu(h_conv2 + b_conv2)
      
      h_pool2 = tf.nn.max_pool(h_conv2_cutoff, ksize=[1,2,2,1],
                               strides=[1,2,2,1], padding='SAME')

    with tf.name_scope('fully-connected'):
      h_pool2_flat = tf.reshape(h_pool2, [-1, 7*7*num_filters2])
      num_units1 = 7*7*num_filters2
      num_units2 = 1024
      w2 = tf.Variable(tf.truncated_normal([num_units1, num_units2]))
      b2 = tf.Variable(tf.constant(0.1, shape=[num_units2]))
      hidden2 = tf.nn.relu(tf.matmul(h_pool2_flat, w2) + b2)

    with tf.name_scope('output'):
      if training:
        hidden2_drop = tf.nn.dropout(hidden2, keep_prob)
      else:
        hidden2_drop = hidden2
      w0 = tf.Variable(tf.zeros([num_units2, 10]))
      b0 = tf.Variable(tf.zeros([10]))
      p = tf.nn.softmax(tf.matmul(hidden2_drop, w0) + b0)

  tf.summary.histogram("conv_filters1", W_conv1)
  tf.summary.histogram("conv_filters2", W_conv2)

  return p

  
def get_trainer(p, t, global_step):
  with tf.name_scope('optimizer'):
    loss = -tf.reduce_sum(t * tf.log(p), name='loss')
    train_step = tf.train.AdamOptimizer(0.0001).minimize(loss, global_step=global_step)
      
  with tf.name_scope('evaluator'):
    correct_prediction = tf.equal(tf.argmax(p, 1), tf.argmax(t, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction,
                                      tf.float32), name='accuracy')

  return train_step, loss, accuracy

You can train the model on Cloud ML by running the following commands from the Cloud Shell. You should run the commands in the parent directory of "trainer".

$ PROJECT_ID=project01 # your project ID
$ TRAIN_BUCKET="gs://$PROJECT_ID-mldata"
$ gsutil mkdir $TRAIN_BUCKET

$ cat << EOF > config.yaml
trainingInput:
  # Use a cluster with many workers and a few parameter servers.
  scaleTier: STANDARD_1
EOF

$ JOB_NAME="job01"
$ gsutil rm -rf $TRAIN_BUCKET/$JOB_NAME
$ touch .dummy
$ gsutil cp .dummy $TRAIN_BUCKET/$JOB_NAME/train/
$ gsutil cp .dummy $TRAIN_BUCKET/$JOB_NAME/model/

$ gcloud beta ml jobs submit training ${JOB_NAME} \
  --package-path=trainer \
  --module-name=trainer.task \
  --staging-bucket="${TRAIN_BUCKET}" \
  --region=us-central1 \
  --config=config.yaml \
  -- \
  --log_dir=$TRAIN_BUCKET/$JOB_NAME/train \
  --model_dir=$TRAIN_BUCKET/$JOB_NAME/model \
  --max_steps=10000

You can also see the training process on TensorBoard with the following command.

$  tensorboard --port 8080 --logdir $TRAIN_BUCKET/$JOB_NAME/train

After the training, the resulting model is stored under $TRAIN_BUCKET/$JOB_NAME/model. The following is a quick example of using the exported model for a prediction.

#!/usr/bin/python
import tensorflow as tf
import numpy as np
import json
from tensorflow.examples.tutorials.mnist import input_data

model_meta = 'gs://project01-mldata/job01/model/export.meta'
model_param = 'gs://project01-mldata/job01/model/export'

with tf.Graph().as_default() as graph:
  sess = tf.InteractiveSession()
  saver = tf.train.import_meta_graph(model_meta)
  saver.restore(sess, model_param)

  inputs = json.loads(tf.get_collection('inputs')[0])
  outputs = json.loads(tf.get_collection('outputs')[0])
  x = graph.get_tensor_by_name(inputs['image'])
  input_key = graph.get_tensor_by_name(inputs['key'])
  p = graph.get_tensor_by_name(outputs['scores'])
  output_key = graph.get_tensor_by_name(outputs['key'])

  mnist_data = input_data.read_data_sets("/tmp/data/", one_hot=True)
  images, labels = mnist_data.test.next_batch(10)
  index = range(10)
  keys, preds = sess.run([output_key, p], feed_dict={input_key:index, x:images})
  for key, pred, label in zip(keys, preds, labels):
    print key, np.argmax(pred), np.argmax(label)

The more sophisticated example

The example used in this note is straightforward but may not practical. If you are interested in a more sophisticated example, see the following one.

github.com

Disclaimer: All code snippets are released under Apache 2.0 License. This is not an official Google product.

分散学習用TensorFlowコードの書き方

何の話かというと

Google Cloud MLを利用して、TensorFlowの分散学習を行う方法です。取り急ぎ、自分用のメモとして公開しておきます。

分散学習にはいくつかのパターンがありますが、最もシンプルな「データ分散」の場合を説明します。各ノードは同じモデルに対して、個別に学習データを適用して、Variableを修正する勾配ベクトルを計算します。それぞれで計算した勾配ベクトルを用いて、共通のVariableを修正していきます。

前提知識

TensorFlowの分散学習処理を行う際は、3種類のノードを使用します。

・Parameter Server:Workerが計算した勾配ベクトルを用いて、Variableのアップデートを行います。
・Worker:教師データから勾配ベクトルを計算します。
・Master:Workerと同様の処理に加えて、学習済みモデルの保存やテストセットに対する評価などの追加処理を行います。

一般にParameter Serverは1〜2ノード、Workerは必要に応じた沢山のノード、Masterは1ノードだけで動かします。Cloud MLにジョブを投げると、これらのノード群がコンテナで生成されて、各ノードでコードの実行が行われます。

クラスター構成情報とノードの役割の取得

TensorFlowのコードは全ノードで共通ですが、コード内でノードの種類に応じて処理を分岐させます。コード内では環境変数 TF_CONFIG を通じて、クラスターの構成情報と自分自身の役割を取得します。次のコードは、クラスター構成情報オブジェクト cluster_spec を作成して、job_name と task_index にノードの役割を格納します。

  # Get cluster and node info
  env = json.loads(os.environ.get('TF_CONFIG', '{}'))
  cluster_info = env.get('cluster', None)
  cluster_spec = tf.train.ClusterSpec(cluster_info)
  task_info = env.get('task', None)
  job_name, task_index = task_info['type'], task_info['index']

job_name には、「ps」「master」「worker」のいずれかの文字列が入ります。task_index は同じ種類のノードが複数ある際に 0 からの通し番号が入ります。これらの情報を用いて、次のように Server オブジェクトを作成すると、このオブジェクトがクラスター内の他のノードとの通信処理を担います。

  server = tf.train.Server(cluster_spec,
                           job_name=job_name, task_index=task_index)

Parameter Serverの処理

Parameter Serverの場合は、次のコマンドで Server オブジェクトを起動すれば、それで必要な処理は終わりです。あとは Server オブジェクトが Parameter Server としての機能を提供してくれます。このコマンドは外部からプロセスを停止するまで、戻ってくることはありません。

  if job_name == "ps": # Parameter server
    server.join()

Workerの処理

Worker(および、Master)では、学習処理のループを回す必要がありますが、この際、他のノードと協調動作するために、Supervisorオブジェクトを生成した後に、このオブジェクトを経由して、チェックポイントファイルの保存やセッションの作成といった処理を実施します。

次は、Supervisorオブジェクトを生成するコードの例です。

  if job_name == "master" or job_name == "worker": # Worker node
    is_chief = (job_name == "master")
...
        global_step = tf.Variable(0, trainable=False)
        init_op = tf.global_variables_initializer()
        saver = tf.train.Saver()
        # Create a supervisor
        sv = tf.train.Supervisor(is_chief=is_chief, logdir=LOG_DIR,
                                 init_op=init_op, saver=saver, summary_op=None,
                                 global_step=global_step, save_model_secs=0)

・is_chief:Masterノードの場合に True を渡します。
・logdir:チェックポイントファイルの保存ディレクトリー
・init_op:セッション作成時のVariable初期化処理
・saver:チェックポイントファイルを保存するためのSaverオブジェクト
・global_step:最適化処理の実施回数をカウントするVariable
・summary_op:TensorBoard用のサマリーオブジェクト(Supervisorオブジェクトを介さずにサマリーを保存する際はNoneを指定)
・save_model_secs:チェックポイントの定期保存間隔(自動保存ではなく、明示的に保存する際はNoneを指定)

また、モデルを定義する際は、次の様に、job_name と task_index を用いて、自分の役割を tf.device で設定した with 構文の中で定義していきます。

  device_fn = tf.train.replica_device_setter(
    cluster=cluster_spec,
    worker_device="/job:%s/task:%d" % (job_name, task_index)
  )
...
  if job_name == "master" or job_name == "worker": # Worker node
    is_chief = (job_name == "master")

    with tf.Graph().as_default() as graph:
      with tf.device(device_fn):

そして次は、セッションを生成して、学習処理のループを回す部分です。

        # Create a session and run training loops
        with sv.managed_session(server.target) as sess:
          reports, step = 0, 0
          start_time = time.time()
          while not sv.should_stop() and step < MAX_STEPS:
             images, labels = mnist_data.train.next_batch(BATCH_SIZE)
             feed_dict = {x:images, t:labels, keep_prob:0.5}
             _, loss_val, step = sess.run([train_step, loss, global_step],
                                          feed_dict=feed_dict)
             if step > CHECKPOINT * reports:
               reports += 1
               logging.info("Step: %d, Train loss: %f" % (step, loss_val))
               if is_chief:
                 # Save checkpoint
                 sv.saver.save(sess, sv.save_path, global_step=step)
...
                 # Save summary
                 feed_dict = {test_loss:loss_val, test_accuracy:acc_val}
                 sv.summary_computed(sess,
                   sess.run(summary, feed_dict=feed_dict), step)
                 sv.summary_writer.flush()

ここでのポイントは、最適化アルゴリズム train_step を評価して Variable をアップデートする際に、global_step を一緒に評価する点です。これにより、global_step の値が 1 増加して、他のノードを含めた学習処理のトータルの回数が取得できます。この例では取得した値を変数 step に格納して、全体として CHECKPOINT 回評価するごとに進捗をログ出力するということを行っています。また、先に定義しておいた is_chief (Masterの場合に True)を用いて、Masterだけで追加の処理をすることもできます。この例では、サマリーの出力とチェックポイントの保存を行っています。sv.save_path には、Supervisorを作成した時に logdir で指定したディレクトリーが入ります。

学習済みモデルの保存

分散学習でトリッキーな点の1つが学習済みモデルの保存方法です。先に構築したモデルは、分散学習用にノードの情報がひも付いていますが、学習済みモデルで分類を行う際は、これらの情報は不要です。そこで、単体ノードでもリストア可能な分類処理専用のモデルを再構築して、それを保存するという処理を行います。

まず、学習処理のループを抜けた部分で次を実行します。これは、最終状態の Variable を一旦チェックポイントファイルに保存して、それからモデルの再構築・保存処理(export_model)を呼び出しています。

          if is_chief: # Export the final model
            sv.saver.save(sess, sv.save_path, global_step=sess.run(global_step))
            export_model(tf.train.latest_checkpoint(LOG_DIR))

そして、再構築・保存処理の例は次のようになります。

def export_model(last_checkpoint):
  # create a session with a new graph.
  with tf.Session(graph=tf.Graph()) as sess:
    x = tf.placeholder(tf.float32, [None, 784])
    p = mnist.get_model(x, None, training=False)

    # Define key elements
    input_key = tf.placeholder(tf.int64, [None,])
    output_key = tf.identity(input_key)

    # Define API inputs/outpus object
    inputs = {'key': input_key.name, 'image': x.name}
    outputs = {'key': output_key.name, 'scores': p.name}
    tf.add_to_collection('inputs', json.dumps(inputs))
    tf.add_to_collection('outputs', json.dumps(outputs))

    init_op = tf.global_variables_initializer()
    sess.run(init_op)

    # Restore the latest checkpoint and save the model
    saver = tf.train.Saver()
    saver.restore(sess, last_checkpoint)
    saver.export_meta_graph(filename=MODEL_DIR + '/export.meta')
    saver.save(sess, MODEL_DIR + '/export',
               write_meta_graph=False)

ここでは、新たなグラフとセッションを用意して、入力 x に対して、予測結果 p を計算する最低限の関係を定義した後に、先ほど保存したチェックポイントの内容をリストアしています。このセッションに含まれていない Variable の値は単純に無視されます。また、分類用コードに入出力変数を渡すために、入出力に関連した変数名を JSON にまとめたものを collection に入れてあります。output_key は、Placeholder の input_key にいれた値がそのまま出てくる変数で、複数データをバッチ処理する際にどの出力データがどの入力データに対応するかを紐付けるために使用します。

コードの全体像

モデル定義の中身は適当に用意した MNIST 用 CNN です。

trainer
├── __init__.py # 空ファイル
├── mnist.py # モデル定義
└── task.py # 学習処理用コード


trainer/task.py

import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
import numpy as np
import time, json, os, logging

import mnist

flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_integer('batch_size', 100,
                     'Batch size. Must divide evenly into the dataset sizes.')
flags.DEFINE_integer('max_steps', 10000, 'Number of steps to run trainer.')
flags.DEFINE_integer('checkpoint', 100, 'Interval steps to save checkpoint.')
flags.DEFINE_string('log_dir', '/tmp/logs',
                    'Directory to store checkpoints and summary logs')
flags.DEFINE_string('model_dir', '/tmp/model',
                    'Directory to store trained model')


# Global flags
BATCH_SIZE = FLAGS.batch_size
MODEL_DIR = FLAGS.model_dir
LOG_DIR = FLAGS.log_dir
MAX_STEPS = FLAGS.max_steps
CHECKPOINT = FLAGS.checkpoint


def export_model(last_checkpoint):
  # create a session with a new graph.
  with tf.Session(graph=tf.Graph()) as sess:
    x = tf.placeholder(tf.float32, [None, 784])
    p = mnist.get_model(x, None, training=False)

    # Define key elements
    input_key = tf.placeholder(tf.int64, [None,])
    output_key = tf.identity(input_key)

    # Define API inputs/outpus object
    inputs = {'key': input_key.name, 'image': x.name}
    outputs = {'key': output_key.name, 'scores': p.name}
    tf.add_to_collection('inputs', json.dumps(inputs))
    tf.add_to_collection('outputs', json.dumps(outputs))

    init_op = tf.global_variables_initializer()
    sess.run(init_op)

    # Restore the latest checkpoint and save the model
    saver = tf.train.Saver()
    saver.restore(sess, last_checkpoint)
    saver.export_meta_graph(filename=MODEL_DIR + '/export.meta')
    saver.save(sess, MODEL_DIR + '/export',
               write_meta_graph=False)
  

def run_training():
  # Get cluster and node info
  env = json.loads(os.environ.get('TF_CONFIG', '{}'))
  cluster_info = env.get('cluster', None)
  cluster_spec = tf.train.ClusterSpec(cluster_info)
  task_info = env.get('task', None)
  job_name, task_index = task_info['type'], task_info['index']

  device_fn = tf.train.replica_device_setter(
    cluster=cluster_spec,
    worker_device="/job:%s/task:%d" % (job_name, task_index)
  )

  logging.info('Start job:%s, index:%d' % (job_name, task_index))

  # Create server
  server = tf.train.Server(cluster_spec,
                           job_name=job_name, task_index=task_index)

  if job_name == "ps": # Parameter server
    server.join()

  if job_name == "master" or job_name == "worker": # Worker node
    is_chief = (job_name == "master")

    with tf.Graph().as_default() as graph:
      with tf.device(device_fn):

        # Prepare training data
        mnist_data = input_data.read_data_sets("/tmp/data/", one_hot=True)

        # Create placeholders
        x = tf.placeholder_with_default(
          tf.zeros([BATCH_SIZE, 784], tf.float32), shape=[None, 784])
        t = tf.placeholder_with_default(
          tf.zeros([BATCH_SIZE, 10], tf.float32), shape=[None, 10])
        keep_prob = tf.placeholder_with_default(
          tf.zeros([], tf.float32), shape=[])
        global_step = tf.Variable(0, trainable=False)

        # Add test loss and test accuracy to summary
        test_loss = tf.placeholder_with_default(
          tf.zeros([], tf.float32), shape=[])
        test_accuracy = tf.placeholder_with_default(
          tf.zeros([], tf.float32), shape=[])
        tf.summary.scalar("Test_loss", test_loss) 
        tf.summary.scalar("Test_accuracy", test_accuracy) 

        # Define a model
        p = mnist.get_model(x, keep_prob, training=True)
        train_step, loss, accuracy = mnist.get_trainer(p, t, global_step)

        init_op = tf.global_variables_initializer()
        saver = tf.train.Saver()
        summary = tf.summary.merge_all()

        # Create a supervisor
        sv = tf.train.Supervisor(is_chief=is_chief, logdir=LOG_DIR,
                                 init_op=init_op, saver=saver, summary_op=None,
                                 global_step=global_step, save_model_secs=0)
    
        # Create a session and run training loops
        with sv.managed_session(server.target) as sess:
          reports, step = 0, 0
          start_time = time.time()
          while not sv.should_stop() and step < MAX_STEPS:
             images, labels = mnist_data.train.next_batch(BATCH_SIZE)
             feed_dict = {x:images, t:labels, keep_prob:0.5}
             _, loss_val, step = sess.run([train_step, loss, global_step],
                                          feed_dict=feed_dict)
             if step > CHECKPOINT * reports:
               reports += 1
               logging.info("Step: %d, Train loss: %f" % (step, loss_val))
               if is_chief:
                 # Save checkpoint
                 sv.saver.save(sess, sv.save_path, global_step=step)

                 # Evaluate the test loss and test accuracy
                 loss_vals, acc_vals = [], []
                 for _ in range(len(mnist_data.test.labels) // BATCH_SIZE):
                   images, labels = mnist_data.test.next_batch(BATCH_SIZE)
                   feed_dict = {x:images, t:labels, keep_prob:1.0}
                   loss_val, acc_val = sess.run([loss, accuracy],
                                                feed_dict=feed_dict)
                   loss_vals.append(loss_val)
                   acc_vals.append(acc_val)
                 loss_val, acc_val = np.sum(loss_vals), np.mean(acc_vals)

                 # Save summary
                 feed_dict = {test_loss:loss_val, test_accuracy:acc_val}
                 sv.summary_computed(sess,
                   sess.run(summary, feed_dict=feed_dict), step)
                 sv.summary_writer.flush()

                 logging.info("Time elapsed: %d" % (time.time() - start_time))
                 logging.info("Step: %d, Test loss: %f, Test accuracy: %f" %
                              (step, loss_val, acc_val))

          # Finish training
          if is_chief: # Export the final model
            sv.saver.save(sess, sv.save_path, global_step=sess.run(global_step))
            export_model(tf.train.latest_checkpoint(LOG_DIR))

        sv.stop()  


def main(_):
  run_training()


if __name__ == '__main__':
  logging.basicConfig(level=logging.INFO) 
  tf.app.run()

trainer/mnist.py

import tensorflow as tf
import json

def get_model(x, keep_prob, training=True):
  num_filters1 = 32
  num_filters2 = 64

  with tf.name_scope('cnn'):
    with tf.name_scope('convolution1'):
      x_image = tf.reshape(x, [-1,28,28,1])
      
      W_conv1 = tf.Variable(tf.truncated_normal([5,5,1,num_filters1],
                                                stddev=0.1))
      h_conv1 = tf.nn.conv2d(x_image, W_conv1,
                             strides=[1,1,1,1], padding='SAME')
      
      b_conv1 = tf.Variable(tf.constant(0.1, shape=[num_filters1]))
      h_conv1_cutoff = tf.nn.relu(h_conv1 + b_conv1)
      
      h_pool1 = tf.nn.max_pool(h_conv1_cutoff, ksize=[1,2,2,1],
                               strides=[1,2,2,1], padding='SAME')

    with tf.name_scope('convolution2'):
      W_conv2 = tf.Variable(
                  tf.truncated_normal([5,5,num_filters1,num_filters2],
                                      stddev=0.1))
      h_conv2 = tf.nn.conv2d(h_pool1, W_conv2,
                             strides=[1,1,1,1], padding='SAME')
      
      b_conv2 = tf.Variable(tf.constant(0.1, shape=[num_filters2]))
      h_conv2_cutoff = tf.nn.relu(h_conv2 + b_conv2)
      
      h_pool2 = tf.nn.max_pool(h_conv2_cutoff, ksize=[1,2,2,1],
                               strides=[1,2,2,1], padding='SAME')

    with tf.name_scope('fully-connected'):
      h_pool2_flat = tf.reshape(h_pool2, [-1, 7*7*num_filters2])
      num_units1 = 7*7*num_filters2
      num_units2 = 1024
      w2 = tf.Variable(tf.truncated_normal([num_units1, num_units2]))
      b2 = tf.Variable(tf.constant(0.1, shape=[num_units2]))
      hidden2 = tf.nn.relu(tf.matmul(h_pool2_flat, w2) + b2)

    with tf.name_scope('output'):
      if training:
        hidden2_drop = tf.nn.dropout(hidden2, keep_prob)
      else:
        hidden2_drop = hidden2
      w0 = tf.Variable(tf.zeros([num_units2, 10]))
      b0 = tf.Variable(tf.zeros([10]))
      p = tf.nn.softmax(tf.matmul(hidden2_drop, w0) + b0)

  tf.summary.histogram("conv_filters1", W_conv1)
  tf.summary.histogram("conv_filters2", W_conv2)

  return p

  
def get_trainer(p, t, global_step):
  with tf.name_scope('optimizer'):
    loss = -tf.reduce_sum(t * tf.log(p), name='loss')
    train_step = tf.train.AdamOptimizer(0.0001).minimize(loss, global_step=global_step)
      
  with tf.name_scope('evaluator'):
    correct_prediction = tf.equal(tf.argmax(p, 1), tf.argmax(t, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction,
                                      tf.float32), name='accuracy')

  return train_step, loss, accuracy

このコードを使って、Cloud MLで学習する際は、Cloud Shellから次のコマンドを実行していきます。(trainerディレクトリーの親ディクレクトリーで実行します。)

$ PROJECT_ID=project01 # your project ID
$ TRAIN_BUCKET="gs://$PROJECT_ID-mldata"
$ gsutil mkdir $TRAIN_BUCKET

$ cat << EOF > config.yaml
trainingInput:
  # Use a cluster with many workers and a few parameter servers.
  scaleTier: STANDARD_1
EOF

$ JOB_NAME="job01"
$ gsutil rm -rf $TRAIN_BUCKET/$JOB_NAME
$ touch .dummy
$ gsutil cp .dummy $TRAIN_BUCKET/$JOB_NAME/train/
$ gsutil cp .dummy $TRAIN_BUCKET/$JOB_NAME/model/

$ gcloud beta ml jobs submit training ${JOB_NAME} \
  --package-path=trainer \
  --module-name=trainer.task \
  --staging-bucket="${TRAIN_BUCKET}" \
  --region=us-central1 \
  --config=config.yaml \
  -- \
  --log_dir=$TRAIN_BUCKET/$JOB_NAME/train \
  --model_dir=$TRAIN_BUCKET/$JOB_NAME/model \
  --max_steps=10000

TensorBoardで進捗を見る時は、次を実行します。

$  tensorboard --port 8080 --logdir $TRAIN_BUCKET/$JOB_NAME/train

学習が終わると、「$TRAIN_BUCKET/$JOB_NAME/model」以下に学習済みモデル(export.meta、および、exprot-data.xxxx)が出力されます。

学習済みモデルをリストアして、予測処理を行うコードの例は次になります。

#!/usr/bin/python
import tensorflow as tf
import numpy as np
import json
from tensorflow.examples.tutorials.mnist import input_data

model_meta = 'gs://project01-mldata/job01/model/export.meta'
model_param = 'gs://project01-mldata/job01/model/export'

with tf.Graph().as_default() as graph:
  sess = tf.InteractiveSession()
  saver = tf.train.import_meta_graph(model_meta)
  saver.restore(sess, model_param)

  inputs = json.loads(tf.get_collection('inputs')[0])
  outputs = json.loads(tf.get_collection('outputs')[0])
  x = graph.get_tensor_by_name(inputs['image'])
  input_key = graph.get_tensor_by_name(inputs['key'])
  p = graph.get_tensor_by_name(outputs['scores'])
  output_key = graph.get_tensor_by_name(outputs['key'])

  mnist_data = input_data.read_data_sets("/tmp/data/", one_hot=True)
  images, labels = mnist_data.test.next_batch(10)
  index = range(10)
  keys, preds = sess.run([output_key, p], feed_dict={input_key:index, x:images})
  for key, pred, label in zip(keys, preds, labels):
    print key, np.argmax(pred), np.argmax(label)


Disclaimer: All code snippets are released under Apache 2.0 License. This is not an official Google product.

ネイピア数の導出と極限計算に関する補足

何の話かというと

www.yukisako.xyz

こちらの記事では、

 \lim_{h\to 0}\frac{a^h-1}{h}=1 ―― (1)

を満たす a は、

 a=\lim_{h\to 0}(1+h)^{\frac{1}{h}} ―― (2)

であることを導いて、これがネイピア数 e の定義に一致することを示しています。

この結論は正しいのですが、(1) から (2) を導出する課程で、極限の計算の取扱に不正確な部分があるので、その点を補足してみます。

まず上記の記事では、(1) から

 \lim_{h\to 0}a^h = 1 + \lim_{h\to 0}h ―― (3)

という式を導いて、さらにこれを

 a=\lim_{h\to 0}(1+h)^{\frac{1}{h}} ―― (4)

と変形しています。

ここで、(1) → (3)、および、(3) → (4) の式変形をそれぞれ詳しく吟味してみます。

導出における問題点(その1)

まず、(1) → (3) は同値変形ではありません。つまり、(3) → (1) の逆変形ができません。

なぜなら、(3) の両辺を個別に計算すると、これはどちらも 1 になります。つまり、(3) は  1=1 という当たり前のことを言っており、よく考えると a の値が何であっても成り立ってしまいます。したがって、この後で説明するように、(3) → (4) を導くこともできません。

それでは、なぜ、(3) → (1) の逆変形はできないのでしょうか?

まず、(3) 右辺の 1 を左辺に移項すると次になります。これは同値変形です。

 \lim_{h\to 0}(a^h-1)= \lim_{h\to 0}h ―― (5)

では、この式の両辺を右辺で割って、(1) を導くことはできるでしょうか? 残念ながらそれはできません。

たとえば、単純に割り算すると次になります。

 \frac{\lim_{h\to 0}(a^h-1)}{\lim_{h\to 0}h} = 1 ―― (6)

ですが、よく考えると (5) の右辺は値で言うと 0 です。これは両辺を 0 で割るという、やってはいけない計算になってしまいます。

(1) の意味は「割り算した後に h\to 0 の極限をとる」という操作であって、極限を取る前の (1) 左辺の分母は 0 ではありませんので、これは許される計算です。一方、(6) は極限を取った後に割り算をしているため、0 で割るという禁止事項に触れてしまっています。(1) と (6) は同じ内容を表わすものではない、という点をよく味わってみてください。

違う言い方をすると、(1) から (3) を導く際は、「極限を取る前の計算」を「極限を取った後の計算」に置き換えてしまっているというわけです。極限を含む式を扱う際は、このような「極限を取るタイミング」によく注意する必要があります。

導出における問題点(その2)

(3) から (4) の変形では、両辺を \frac{1}{h} 乗するという操作をしていますが、ここでも「極限を取った後の計算」と「極限を取る前の計算」に注意する必要があります。(3) の両辺を \frac{1}{h} 乗した結果は、あくまで次のものです。

 \left(\lim_{h\to 0}a^h\right)^{\frac{1}{h}} = \left(1 + \lim_{h\to 0}h\right)^{\frac{1}{h}} ―― (7)

ここで「極限を取った後に \frac{1}{h} 乗する」という操作が「 \frac{1}{h} 乗した後に極限を取る」という操作と同じであれば、これを (4) に書き直すことができるのですが、残念ながら、この2つの操作は同じではありません。(もし同じだとすると、任意の a で成り立つ (3) から、特定の a のみで成り立つ (4) が導かれるというおかしな話になってしまいます。そもそも (7) は、\frac{1}{h} 乗の中身ですでに \lim h\to 0 の極限をとっているので、\frac{1}{h} 乗の h はいったい何者かよくわかりません。)

ちなみに、それでは、(3) からはどのような変形が可能なのでしょうか? (3) には極限が入っているために、次の式変形が難しくなっています。そこで、(3) を極限をとる前の関係式に直してみます。

 a^h = 1 + h + f(h) ―― (8)

極限を取る前は、両辺の値はかならずしも一致しませんので、その差分を f(h) と置いています。極限で一致することから \lim_{h\to 0}f(h) = 0 が成り立ちます。

(8) は「極限を取る前」の関係ですので、安心して、両辺を \frac{1}{h} 乗することができます。

 a = \left\{1+h+f(h)\right\}^{\frac{1}{h}} ―― (9)

この後、両辺について、h\to 0 の極限をとると次が得られます。

 a = \lim_{h\to 0}\left\{1+h+f(h)\right\}^{\frac{1}{h}} ―― (10)

仮に、f(h) \equiv 0 (すべての h について f(h)=0)であれば、(10) は (4) に一致しますが、残念ながらそうではありません。f(h) をいろいろ取り替えることで、a はいろいろな異なる値になります。(3) が任意の a で成り立つという事実が、この f(h) の自由度に反映されているわけです。

 「えー。\lim_{h\to 0}f(h) = 0 やねんから、(10) と (4) はおんなじとちゃうん???」

ちゃいます。その理屈が通るのであれば、\lim_{h\to 0}h=0 ですので、(10) は、そもそも a=\lim_{h\to 0}\left\{1\right\}^{\frac{1}{h}} と同じになってしまいます。そんなわけはありません。あくまで、\left\{1+h+f(h)\right\}^{\frac{1}{h}} という計算を一式終わらせた後に、h \to 0 の極限をとるというのが (10) の意味であって、個別のパーツだけで先に極限を取ってしまうと、同じ計算にはなりません。「計算してから極限をとる」と「極限をとってから計算する」の違いをもう一度味わってみてください。

(1)から(4)を導く厳密な計算

・・・と、不正確な点を指摘するだけでは不親切なので、上記の問題点をさけて、厳密に (1) から (4) を導く方法を考えてみましょう。いろんなやり方が考えられますが、根本原理に立ち戻って、ε-δ 論法を使ってみましょう。先ほど関数 f(x) を導入したように、ε-δ 論法を用いると、「極限をとる前の関係式で、極限の様子を表現する」ということが出来るので、極限のことを気にせずに自由に式変形ができるようになります。これこそが、ε-δ 論法が有用な理由なのです。

まず、(1) の関係を ε-δ 論法で述べると次になります。

「(十分に小さな)任意の \epsilon > 0 に対して、ある \delta > 0 をうまく選択すると、 \left|h\right| < \delta を満たすすべての h に対して  \left| \frac{a^h-1}{h} - 1 \right| < \epsilon が成立する」 ―― (11)

いかがでしょうか。(11) は極限を取る前の関係式ですので、やや複雑ではありますが、通常の関係式として、安心して式変形することができます。

そして、最終的に示したい事実 (4) は、ε-δ 論法で述べると次になります。

「(十分に小さな)任意の \epsilon' > 0 に対して、ある \delta' > 0 をうまく選択すると、 \left|h\right| < \delta' を満たすすべての h に対して \left|a - (1+h)^{\frac{1}{h}}\right| < \epsilon' が成立する」―― (12)

(11)と(12)に含まれる \epsilon\delta は異なるものなので、混乱しないように記号をわけておきました。

そこで、(11) から出発して、(12) を示すことができれば、 (1) から (4) が導けたことになるわけです。




・・・約3時間の沈黙・・・



ぐああああああああ。


すいません。紙の上で証明はできたのですが、ノート5ページぐらいになってしまって、とてもTeX記法で整理して書く気がしません。。。。くやしぃ。

代わりに、ε-δ 論法をつかわずに極限の落とし穴をすりぬけてうまいこと証明する方法を紹介しておきます。。。。。

まず、(1) において、t=\frac{1}{a^h-1} という変数変換を行います。これは、h について解くと h=\log_a\left(1+\frac{1}{t}\right) になります。さらに、h\to 0 の時 t\to\infty であることに注意すると、(1) は、次の式と同値です。

 \lim_{t\to \infty}\frac{1}{t\log_a\left(1+\frac{1}{t}\right)}=1

さらに、この左辺は次のように変形できます。

 \lim_{t\to \infty}\frac{1}{\log_a\left(1+\frac{1}{t}\right)^t} = 1

ここで、左辺に含まれる関数 \frac{1}{\log_a x} は、x について連続関数なので、極限を対数の中に入れることができます。

 \frac{1}{\log_a\lim_{t\to\infty}\left(1+\frac{1}{t}\right)^t} = 1

つまり、

 \log_a\lim_{t\to\infty}\left(1+\frac{1}{t}\right)^t = 1

となります。最後にこの両辺を a の肩にのせると、a^{\log_ax} = x に注意して、

 \lim_{t\to\infty}\left(1+\frac{1}{t}\right)^t =a

が得られます。

来ました! ここで h' = \frac{1}{t} で新たに h' を定義すると、結局、次が成り立つことがわかります。

 a=\lim_{h'\to 0}(1+h')^{\frac{1}{h'}}

この h' はいわゆる束縛変数なので、好きな文字に置き換えられます。h' をあらたに h に置き直すことで (4) が導かれました。

この証明のミソは、連続関数 f(x) については、\lim f(\cdots) = f(\lim\cdots) が成立するという性質にあるわけです。