めもめも

このブログに記載の内容は個人の見解であり、必ずしも所属組織の立場、戦略、意見を代表するものではありません。

Cloud DatastoreのEventual Consistencyに関するメモ(google.cloudクライアント編)

何の話かというと

Cloud Datastoreに対するQueryは、"Ancestor Query" を使用する事でStrong Consistencyが保証されます。逆に Ancestor Query を使用しなかった場合にどのような現象が発生するのかを雑多にメモしておきます。ここでは、GCEのVMからDatastoreにアクセスする前提で、google.cloudクライアントを使用します。(現在、ndbクライアントはGAE限定なので。)

Ancestor Queryとは?

Cloud Datastoreに格納するEntityは、それぞれに「親」Entityを指定することで、ツリー状のグループを構成します。1つのツリーを「Entity Group」と呼びます。Ancestor Queryは、Queryの検索条件として「Ancestor Key」を指定して、検索範囲をその下にぶら下がったEntityに限定することを言います。(親Entityは、自分と同じKindである必要はありませんので、Entity Gorupには、さまざまなKindのEntityが入り交じる点に注意してください。)

一方、Ancestor Keyを指定しない場合は、Datastoreに格納されたすべてのEntityが検索対象となります。これを「Global Query」と呼びます。

プロパティによるGlobal Query

まず次のスクリプトで1秒ごとにEntityを生成します。EntityのID(Key name string)の他に、'myid' というプロパティに同じIDの文字列を格納します。プロパティ 'timestamp' にはEntityを生成したタイムスタンプを入れておきます。

writer.py

#!/usr/bin/python

from google.cloud import datastore
import time, datetime, os, uuid

project_id = os.environ.get('PROJECT_ID')
ds = datastore.Client(project_id)

os.remove('/tmp/tmp0')
for i in range(1000):
    myid = str(uuid.uuid4())
    key = ds.key('Kind01', myid)
    ent = datastore.Entity(key)
    ts = datetime.datetime.now()
    ent.update({'timestamp': ts, 'myid': myid})
    ds.put(ent)
    with open('/tmp/tmp0', 'a') as file:
        file.write('%s\n' % myid)
    print ts, myid 
    time.sleep(1)

実行するとこんな感じ・・・

$ ./writer.py 
2016-10-26 00:50:27.589180 36a1fca7-a4f8-4d74-b2e0-5a8521f68cff
2016-10-26 00:50:28.749832 e1cb00a0-9ccc-4558-b7d6-2bb41b10be56
2016-10-26 00:50:29.866750 8245581d-2b0c-4888-90e4-710f683837e9
2016-10-26 00:50:30.950333 b1058f07-3c58-4ad1-8ea1-c4191be2bcbe
2016-10-26 00:50:32.061670 9874503a-7318-4d96-a0ca-78668f736205
2016-10-26 00:50:33.229060 be9fd3f0-d2e6-4e53-a748-4d26b766e7d8
...

この時、ファイル /tmp/tmp0 にEntityのIDが追記されていきます。

同時に次のスクリプトを実行して、/tmp/tmp0に書き込まれたIDを用いて、'myid' = ID という条件でQuery(Global Query)を実行します。

reader.py

#!/usr/bin/python

from google.cloud import datastore
import time, datetime, os
import subprocess

project_id = os.environ.get('PROJECT_ID')
ds = datastore.Client(project_id)

f = subprocess.Popen(['tail','-F','/tmp/tmp0'],
                     stdout=subprocess.PIPE,stderr=subprocess.PIPE)

while True:
    myid = f.stdout.readline().strip()
    query = ds.query(kind='Kind01')
    query.add_filter('myid', '=', myid)
    ts = datetime.datetime.now()
    print ts, 'Trying to find ', myid
    while True:
        iter = query.fetch()
        ent, _, _ = iter.next_page()
        if len(ent) == 0:
          print 'Failed and retry...'
          continue
        ts = datetime.datetime.now()
        print ts, 'Succeeded.'
        print ent[0]['timestamp'], myid
        break

実行するとこんな感じ・・・

2016-10-26 00:50:27.748414 Trying to find  36a1fca7-a4f8-4d74-b2e0-5a8521f68cff
Failed and retry...
Failed and retry...
Failed and retry...
Failed and retry...
2016-10-26 00:50:27.948764 Succeeded.
2016-10-26 00:50:27.589180+00:00 36a1fca7-a4f8-4d74-b2e0-5a8521f68cff
2016-10-26 00:50:28.865377 Trying to find  e1cb00a0-9ccc-4558-b7d6-2bb41b10be56
Failed and retry...
Failed and retry...
Failed and retry...
Failed and retry...
2016-10-26 00:50:29.066650 Succeeded.
2016-10-26 00:50:28.749832+00:00 e1cb00a0-9ccc-4558-b7d6-2bb41b10be56
2016-10-26 00:50:29.948932 Trying to find  8245581d-2b0c-4888-90e4-710f683837e9
Failed and retry...
2016-10-26 00:50:30.004495 Succeeded.
2016-10-26 00:50:29.866750+00:00 8245581d-2b0c-4888-90e4-710f683837e9
2016-10-26 00:50:31.060321 Trying to find  b1058f07-3c58-4ad1-8ea1-c4191be2bcbe
2016-10-26 00:50:31.177282 Succeeded.
2016-10-26 00:50:30.950333+00:00 b1058f07-3c58-4ad1-8ea1-c4191be2bcbe

タイミングによって、Queryに失敗していることがわかります。これは、Global Queryでは、作成直後のEntityが発見されない可能性があることを示しています。(この例では、1秒以内には発見されています。)

(ちなみに、Ancestor Queryだとなぜこのような事が起こらないかというと、Datastoreの実装として、Ancestor Queryが発行された場合は、該当のEntity Groupに対して内部的にデータ同期状態のチェックが入るようになっているからです。Global Queryの場合、検索対象がすべてのEntityになるため、データ同期状態のチェックはコストが高すぎて実行できません。そのため、Eventual Consistencyな検索となります。)

ID指定によるEntityの取得

次のスクリプトは、プロパティではなく、明示的に ID を指定してEntityを取得します。

reader2.py

#!/usr/bin/python

from google.cloud import datastore
import time, datetime, os
import subprocess

project_id = os.environ.get('PROJECT_ID')
ds = datastore.Client(project_id)

f = subprocess.Popen(['tail','-F','/tmp/tmp0'],
                     stdout=subprocess.PIPE,stderr=subprocess.PIPE)

while True:
    myid = f.stdout.readline().strip()
    key = ds.key('Kind01', myid)
    ts = datetime.datetime.now()
    print ts, 'Trying to find ', myid
    while True:
        ent = ds.get(key)
        if not ent:
            print 'Failed and retry...'
            continue
        ts = datetime.datetime.now()
        print ts, 'Succeeded.'
        print ent['timestamp'], myid
        break

こちらを実行すると、次のようになります。

$ ./reader2.py
2016-10-26 00:59:28.266182 Trying to find  2d4ad50f-488c-466d-8273-3533c95de6ed
2016-10-26 00:59:28.378003 Succeeded.
2016-10-26 00:59:28.065682+00:00 2d4ad50f-488c-466d-8273-3533c95de6ed
2016-10-26 00:59:29.422306 Trying to find  1e1b6e94-0697-4a8f-9b2c-96696085e429
2016-10-26 00:59:29.560694 Succeeded.
2016-10-26 00:59:29.267432+00:00 1e1b6e94-0697-4a8f-9b2c-96696085e429
2016-10-26 00:59:30.570010 Trying to find  a159836f-cdf5-4148-a305-9b2f01e8f7d0
2016-10-26 00:59:30.679568 Succeeded.
2016-10-26 00:59:30.423487+00:00 a159836f-cdf5-4148-a305-9b2f01e8f7d0
2016-10-26 00:59:31.701340 Trying to find  add815ed-7cb9-4e61-8377-38eca8dac14d
2016-10-26 00:59:31.817533 Succeeded.
2016-10-26 00:59:31.571264+00:00 add815ed-7cb9-4e61-8377-38eca8dac14d
2016-10-26 00:59:32.844253 Trying to find  9ce71c69-d5e0-48c1-a065-ec41aad386d3
2016-10-26 00:59:32.919830 Succeeded.
2016-10-26 00:59:32.702581+00:00 9ce71c69-d5e0-48c1-a065-ec41aad386d3
2016-10-26 00:59:33.908359 Trying to find  5c250b18-4b03-45c7-bbc4-c8a703737148
2016-10-26 00:59:33.926057 Succeeded.
2016-10-26 00:59:33.845533+00:00 5c250b18-4b03-45c7-bbc4-c8a703737148

この場合は、Strong Consistencyとなり、かならず、Entityの最新の内容が取得されます。

Ancestor Queryの場合

Entityグループを構成して、親Entityを指定したAncestor Queryを実施する場合は、作成直後のEntityも必ず取得されるはずですが、念のために確認しておきましょう。

ここでは、次のようなEntityグループを構成します。

Kind00: 'grandpa'
    |
    |
Kind00: 'father'
    |
    ---------------------
    |                |
Kind01: uuid     Kind01: uuid ・・・

Entityを作成するスクリプトは、こんな感じ。(Kind00: 'grandpa' と Kind00: 'father' に対応するEntityは実際には作成していませんが、これは問題ありません。「Parent Key」は実際のところ、Entityグループのローカルインデックスの検索Keyとして使われるだけですので・・・)

writer3.py

#!/usr/bin/python

from google.cloud import datastore
import time, datetime, os, uuid

project_id = os.environ.get('PROJECT_ID')
ds = datastore.Client(project_id)

os.remove('/tmp/tmp0')

for i in range(1000):
    myid = str(uuid.uuid4())
    parent_key = ds.key('Kind00', 'grandpa', 'Kind00', 'father')
    key = ds.key('Kind01', myid, parent=parent_key)
    ent = datastore.Entity(key)
    ts = datetime.datetime.now()
    ent.update({'timestamp': ts, 'myid': myid})
    ds.put(ent)
    with open('/tmp/tmp0', 'a') as file:
        file.write('%s\n' % myid)
    print ts, myid 
    time.sleep(1)

Entityを検索する方はこんな感じ。ここでは、直近の親 ds.key('Kind00', 'grandpa', 'Kind00', 'father') を検索の起点としていますが、もちろんその上の ds.key('Kind00', 'grandpa') を起点にしても構いません。

#!/usr/bin/python

from google.cloud import datastore
import time, datetime, os
import subprocess

project_id = os.environ.get('PROJECT_ID')
ds = datastore.Client(project_id)

f = subprocess.Popen(['tail','-F','/tmp/tmp0'],
                     stdout=subprocess.PIPE,stderr=subprocess.PIPE)

while True:
    myid = f.stdout.readline().strip()
    ancestor_key = ds.key('Kind00', 'grandpa', 'Kind00', 'father')
    query = ds.query(kind='Kind01', ancestor=ancestor_key)
    query.add_filter('myid', '=', myid)
    ts = datetime.datetime.now()
    print ts, 'Trying to find ', myid
    while True:
        iter = query.fetch()
        ent, _, _ = iter.next_page()
        if len(ent) == 0:
          print 'Faild and retry...'
          continue
        ts = datetime.datetime.now()
        print ts, 'Succeeded.'
        print ent[0]['timestamp'], myid
        break

実行結果は次の通りで、予想通り、取りこぼしはありません。

$ ./reader3.py 
2016-11-02 07:21:02.236359 Trying to find  6b77f8da-0775-4f77-980e-14c5c59a451f
2016-11-02 07:21:02.392515 Succeeded.
2016-11-02 07:21:01.482085+00:00 6b77f8da-0775-4f77-980e-14c5c59a451f
2016-11-02 07:21:02.634073 Trying to find  3681841d-72e1-43ac-9d91-0dfcbf137713
2016-11-02 07:21:02.662415 Succeeded.
2016-11-02 07:21:02.582529+00:00 3681841d-72e1-43ac-9d91-0dfcbf137713
2016-11-02 07:21:03.682064 Trying to find  164ff921-bf65-4835-9a3b-6c0a5e7948d4
2016-11-02 07:21:03.719449 Succeeded.
2016-11-02 07:21:03.635356+00:00 164ff921-bf65-4835-9a3b-6c0a5e7948d4
2016-11-02 07:21:04.777847 Trying to find  eeb6702b-2f4b-4254-8957-1db2ae52a23e
2016-11-02 07:21:04.805840 Succeeded.
2016-11-02 07:21:04.683344+00:00 eeb6702b-2f4b-4254-8957-1db2ae52a23e
2016-11-02 07:21:05.852476 Trying to find  a514fe27-37df-4639-b269-5192b926624b
2016-11-02 07:21:05.946797 Succeeded.

Cloud ML Super Quick Tour

cloud.google.com

Background

Google Cloud ML is now available as a Beta release (as of 2016/10/11). Super simply stated, you can do the following things using Cloud ML.

 (1) Train your custom TensorFlow models on GCP.
 (2) Serve prediction API with your custom models.

Regarding the custom model training, you can use useful features such as hyper-parameter tuning and distributed training, but in this post, I will show you the minimum steps to migrate your existing TensorFlow models to Cloud ML. As an example, I will use the following code. It classifies the MNIST dataset with a single layer neural network.

MNIST single layer network.ipynb

Modification to the existing code

First, you have to create a library by putting all files in a single directory. If you have a single executable file 'task.py', your library directory is something like this:

trainer/
├── __init__.py   # Empty file
└── task.py       # Executable file

The name of library directory and executable file can be arbitrary.

Then you will add the following code at the end of the executable file:

if __name__ == '__main__':
    tf.app.run()

The run() method at the end implicitly calls the main() function. And you need to use Cloud Storage to exchange files with the runtime environment. It can be done by specifying the Cloud Storage URI "gs://..." for file paths in your code. Considering the use case where you test the code on your local machine before submitting it to Cloud ML, you'd better make your code such that you can specify the file paths through command line options. The followings are the typical directories you need to consider:

  • Directory to store checkpoint files during the training.
  • Directory to store the trained model binary (The filename should be 'export'.)
  • Directory to store log data for TensorBoard.
  • Directory to store training data.

Note that you don't necessarily have to use Cloud Storage for the training data. You can use other data sources such as Cloud Dataflow as training data.

In this example, I will make the entry point of my code 'task.py' like this:

def main(_):
    parser = argparse.ArgumentParser()
    parser.add_argument('--train_dir', type=str, default='/tmp/train')  # Checkpoint file
    parser.add_argument('--model_dir', type=str, default='/tmp/model')  # Model file
    parser.add_argument('--train_step', type=int, default=2000)         # Training steps
    args, _ = parser.parse_known_args()
    run_training(args)

if __name__ == '__main__':
    tf.app.run()

This enables users to specify directories for checkpoint files and model binary with the command line options '--train_dir' and '--model_dir'. In addition, the users can specify the number of training iterations with '--train_step'. In this example, the training data is directly fetched from the Internet using the TensorFlow library.

In addition, as a particular point in Cloud ML, you have to specify the input/output objects for the prediction API service using the Collection object of TensorFlow. Collection is a generic object to store arbitrary key-value style data. In Cloud ML, you store Placeholders as API inputs with the key 'inputs', and store prediction value objects as API outputs with the key 'outputs' like this:

input_key = tf.placeholder(tf.int64, [None,])
x = tf.placeholder(tf.float32, [None, 784])

inputs = {'key': input_key.name, 'image': x.name}
tf.add_to_collection('inputs', json.dumps(inputs))

p = tf.nn.softmax(tf.matmul(hidden1, w0) + b0)
output_key = tf.identity(input_key)

outputs = {'key': output_key.name, 'scores': p.name}
tf.add_to_collection('outputs', json.dumps(outputs))

More precisely, you create dictionaries containing the name attributes of input/output objects and store JSON serialization of them in the Collection object using the tf.add_to_collection() method. The keys in the dictionaries are used as the name attributes in the API request/response. In this case, in addition to the input image 'x' and the prediction result 'p' (list of probabilities for each category), 'input_key' and 'output_key' are included in the input/output objects. The 'output_key' simply returns the same value as the 'input_key'. When you send multiple entries to the prediction API, you can match the entries in the response using these key values.

That's all. The following is the modified code considering what I have explained so far:

task.py

import tensorflow as tf
import numpy as np
from tensorflow.examples.tutorials.mnist import input_data
import argparse, os, json
mnist = input_data.read_data_sets("/tmp/data/", one_hot=True)

def run_training(args):
    # Define filepath for checkpoint and final model
    checkpoint_path = os.path.join(args.train_dir, 'checkpoint')
    model_path = os.path.join(args.model_dir, 'export') # Filename should be 'export'.
    num_units = 1024
    
    x = tf.placeholder(tf.float32, [None, 784])
    
    w1 = tf.Variable(tf.truncated_normal([784, num_units]))
    b1 = tf.Variable(tf.zeros([num_units]))
    hidden1 = tf.nn.relu(tf.matmul(x, w1) + b1)
    
    w0 = tf.Variable(tf.zeros([num_units, 10]))
    b0 = tf.Variable(tf.zeros([10]))
    p = tf.nn.softmax(tf.matmul(hidden1, w0) + b0)
    
    t = tf.placeholder(tf.float32, [None, 10])
    loss = -tf.reduce_sum(t * tf.log(p))
    train_step = tf.train.AdamOptimizer().minimize(loss)
    correct_prediction = tf.equal(tf.argmax(p, 1), tf.argmax(t, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
    # Define key element
    input_key = tf.placeholder(tf.int64, [None,], name='key')
    output_key = tf.identity(input_key)

    # Define API inputs/outpus object
    inputs = {'key': input_key.name, 'image': x.name}
    outputs = {'key': output_key.name, 'scores': p.name}
    tf.add_to_collection('inputs', json.dumps(inputs))
    tf.add_to_collection('outputs', json.dumps(outputs))
    
    saver = tf.train.Saver()
    sess = tf.InteractiveSession()
    sess.run(tf.initialize_all_variables())

    i = 0
    for _ in range(args.train_step):
        i += 1
        batch_xs, batch_ts = mnist.train.next_batch(100)
        sess.run(train_step, feed_dict={x: batch_xs, t: batch_ts})
        if i % 100 == 0:
            loss_val, acc_val = sess.run([loss, accuracy],
                feed_dict={x:mnist.test.images, t: mnist.test.labels})
            print ('Step: %d, Loss: %f, Accuracy: %f'
                   % (i, loss_val, acc_val))
            saver.save(sess, checkpoint_path, global_step=i)

    # Export the final model.
    saver.save(sess, model_path)


def main(_):
    parser = argparse.ArgumentParser()
    parser.add_argument('--train_dir', type=str, default='/tmp/train')  # Checkpoint directory
    parser.add_argument('--model_dir', type=str, default='/tmp/model')  # Model directory
    parser.add_argument('--train_step', type=int, default=2000)         # Training steps
    args, _ = parser.parse_known_args()
    run_training(args)


if __name__ == '__main__':
    tf.app.run()

Running the code on Cloud ML

To submit a job to Cloud ML, you need a local machine with Cloud ML SDK, or you can use Cloud Shell as a local environment. I will use Cloud Shell here. Please refer to the official document for other environments.

At first, you create a new project and enable Cloud ML API through the API Manager. Then you launch Cloud Shell and install the SDK.

$ curl https://storage.googleapis.com/cloud-ml/scripts/setup_cloud_shell.sh | bash
$ export PATH=${HOME}/.local/bin:${PATH}
$ curl https://storage.googleapis.com/cloud-ml/scripts/check_environment.py | python
Success! Your environment is configured correctly.

The following command sets the 'editor' authority of the project to a service account. This is necessary to submit jobs using the service account.

$ gcloud beta ml init-project

Prepare the TensorFlow codes (which I explained in the previous section) in the 'trainer' directory under you home directory.

trainer/
├── __init__.py   # Empty file
└── task.py       # Executable file

Before submitting a job, try to run the code on the local environment with a small number of iterations to see there's no obvious mistakes.

$ mkdir -p /tmp/train /tmp/model
$ cd $HOME
$ python -m trainer.task --train_step=200
Extracting /tmp/data/train-images-idx3-ubyte.gz
Extracting /tmp/data/train-labels-idx1-ubyte.gz
Extracting /tmp/data/t10k-images-idx3-ubyte.gz
Extracting /tmp/data/t10k-labels-idx1-ubyte.gz
Step: 100, Loss: 3183.995850, Accuracy: 0.903500
Step: 200, Loss: 2237.709229, Accuracy: 0.934500

$ ls -l /tmp/train /tmp/model/
/tmp/model/:
total 9584
-rw-r--r-- 1 enakai enakai     203 Oct  5 17:14 checkpoint
-rw-r--r-- 1 enakai enakai 9770436 Oct  5 17:14 export
-rw-r--r-- 1 enakai enakai   35514 Oct  5 17:14 export.meta
/tmp/train:
total 28744
-rw-r--r-- 1 enakai enakai     163 Oct  5 17:14 checkpoint
-rw-r--r-- 1 enakai enakai 9770436 Oct  5 17:14 checkpoint-100
-rw-r--r-- 1 enakai enakai   35514 Oct  5 17:14 checkpoint-100.meta
-rw-r--r-- 1 enakai enakai 9770436 Oct  5 17:14 checkpoint-200
-rw-r--r-- 1 enakai enakai   35514 Oct  5 17:14 checkpoint-200.meta

Looks good. Now let's run the code on the cloud. First, you create a Cloud Storage bucket to store data. The bucket name can be arbitrary, but you'd better include the project name following the convention.

$ PROJECT_ID=project01 # your project ID
$ TRAIN_BUCKET="gs://$PROJECT_ID-mldata"
$ gsutil mkdir $TRAIN_BUCKET

Decide the job name ('job01' in this example), and submit it to Cloud ML.

$ JOB_NAME="job01"
$ touch .dummy
$ gsutil cp .dummy $TRAIN_BUCKET/$JOB_NAME/train/
$ gsutil cp .dummy $TRAIN_BUCKET/$JOB_NAME/model/
$ gcloud beta ml jobs submit training $JOB_NAME \
  --region=us-central1 \
  --package-path=trainer --module-name=trainer.task \
  --staging-bucket=$TRAIN_BUCKET \
  -- \
  --train_dir="$TRAIN_BUCKET/$JOB_NAME/train" \
  --model_dir="$TRAIN_BUCKET/$JOB_NAME/model"

createTime: '2016-10-05T08:53:35Z'
jobId: job01
state: QUEUED
trainingInput:
  args:
  - --train_dir=gs://project01/job01/train
  - --model_dir=gs://project01/job01/model
  packageUris:
  - gs://project01/cloudmldist/1475657612/trainer-0.0.0.tar.gz
  pythonModule: trainer.task
  region: us-central1

Folder 'cloudmldist' will be created under the bucket specified with '--staging-bucket', and your codes will be placed under it. Then Cloud ML starts the execution of the code. In the steps above, you explicitly create folders to store checkpoint files and model binary with the gsutil command. You can automate it in your code if you prefer.

Monitor the job execution with the following command:

$ watch -n1 gcloud beta ml jobs describe --project $PROJECT_ID $JOB_NAME
createTime: '2016-10-05T08:53:35Z'
jobId: job01
startTime: '2016-10-05T08:53:45Z'
state: RUNNING
trainingInput:
  args:
  - --train_dir=gs://project01/job01/train
  - --model_dir=gs://project01/job01/model
  packageUris:
  - gs://project01/cloudmldist/1475657612/trainer-0.0.0.tar.gz
  pythonModule: trainer.task
  region: us-central1

The 'state' becomes 'SUCCEEDED' when the job has been completed. You can see the stdout/stderr logs on the Stackdriver's log management console by selecting the 'Cloud Machine Learning' log.

On successful job completion, the model binary 'export' is created as below:

$ gsutil ls $TRAIN_BUCKET/$JOB_NAME/model/export*
gs://project01/job01/model/export
gs://project01/job01/model/export.meta

Serving you model with the prediction API

Now you can start the prediction API service using the trained model binary 'export' by executing the following commands:

$ MODEL_NAME="MNIST"
$ gcloud beta ml models create $MODEL_NAME
$ gcloud beta ml versions create \
  --origin=$TRAIN_BUCKET/$JOB_NAME/model --model=$MODEL_NAME v1
$ gcloud beta ml versions set-default --model=$MODEL_NAME v1

You specify the model name with the environment variable 'MODEL_NAME'. And you can manage multiple versions of the model. In this case, you created a service with 'v1' version model, and made it the default version.

You need to wait for a few minutes until the service becomes available. So while this time, let's create a test dataset with the following python script:

import json
from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets("/tmp/data/", one_hot=True)
with open("data.json", "w") as file:
    for i in range(10):
        data = {"image": mnist.test.images[i].tolist(), "key": i}
        file.write(json.dumps(data)+'\n')

It generates a JSON file 'data.json' containing a pair of image and key value per line. You can submit the date to the prediction API with the following command:

$ gcloud beta ml predict --model=${MODEL_NAME} --json-instances=data.json
predictions:
- key: 0
  scores:
  - 2.53733e-08
  - 6.47722e-09
  - 2.23573e-06
  - 5.32844e-05
  - 3.08012e-10
  - 1.33022e-09
  - 1.55983e-11
  - 0.99991
  - 4.39428e-07
  - 3.38841e-05
- key: 1
  scores:
  - 1.98303e-08
  - 2.84799e-07
  - 0.999985
  - 1.47131e-05
  - 1.45546e-13
  - 1.90945e-09
  - 3.50033e-09
  - 2.24941e-18
  - 2.60025e-07
  - 1.45738e-14
- key: 2
  scores:
  - 3.63027e-09
...

You can see the response on the command line. Please refer to the official document for URLs to directly submit REST requests.

Note on the distributed training

In this example, I used the sample code using the low level TensorFlow APIs. So you need additional modifications to the code following the Distributed TensorFlow if you want to distribute the training jobs on Cloud ML. It's not a trivial change, unfortunately. Some basic points are explained in the following article.

enakai00.hatenablog.com

But don't worry. The TensorFlow team is planning to provide high level TensorFlow APIs so that you can write TensorFlow codes automatically executed in a distributed manner on Cloud ML.

Stay tuned!

Disclaimer: All code snippets are released under Apache 2.0 License. This is not an official Google product.

(日本語版) Cloud ML Super Quick Tour

cloud.google.com

何の話かというと

先日、Google Cloud MLがベータ公開されました。超ざっくりまとめると、GCPのクラウド上で次のことができるようになります。

 (1) TensorFlowのコードを実行して学習済みモデルを作成する
 (2) 学習済みモデルをAPIサービスとして公開する(現在はアルファ版)

(1)については、ハイパーパラメーターの自動チューニングや分散学習処理なども利用できるのですが、ここでは、単純に、既存のTensorFlowのコードをCloud MLに載せるための最低限の手順を説明します。

例として、下記のサンプルコードを使用します。全結合層のみの単層ニューラルネットワークでMNISTデータセットを分類する簡単な例です。

MNIST single layer network.ipynb

ローカルで実行する場合からの修正点

まず、コードの実行に必要なファイルを1つのディレクトリーにまとめて、ライブラリー化します。実行ファイルが「task.py」の1つであれば、次のような構成になります。ディレクトリー名などは任意です。

trainer/
├── __init__.py   # 空ファイル
└── task.py       # 実行ファイル

そして、実行ファイルの末尾に次のコードを挿入します。最後の run メソッドにより、main() 関数の実行が開始されます。

if __name__ == '__main__':
    tf.app.run()

外部とのファイルのやり取りは、Cloud Storageを経由する必要があります。これは、コード内のファイルパスをCloud StorageのURI「gs://...」に書き換えればOKです。ただし、ローカルでテストした上でCloud MLに投げる場合を考えて、実行時のコマンドラインオプションでパスを指定できるようにするとよいでしょう。具体的には、次のようなディレクトリーが考えられます。

・学習中のチェックポイントファイルを出力するディレクトリー
・学習済みモデルのバイナリーをファイルとして出力するディレクトリー(ファイル名は「export」に固定)
・TensorBoard用のログデータを出力するディレクトリー
・トレーニング用のデータを読み込むディレクトリー

(トレーニング用のデータについては、Cloud Storageの利用が必須というわけではありません。Cloud Dataflowなど、他のサービスからのデータを入力ソースにする方法も用意されるようです。)

今回の例であれば、実行ファイル task.py の末尾を次のようにしておきます。オプション「--train_dir」と「--model_dir」でチェックポイントファイルとモデルファイルを出力するディレクトリーを指定します。ついでに「--train_step」で学習処理のループ数も指定できるようにしています。トレーニング用のデータは、TensorFlowのライブラリー機能で、インターネット上のデータを直接に取得します。

def main(_):
    parser = argparse.ArgumentParser()
    parser.add_argument('--train_dir', type=str, default='/tmp/train')  # Checkpoint file
    parser.add_argument('--model_dir', type=str, default='/tmp/model')  # Model file
    parser.add_argument('--train_step', type=int, default=2000)         # Training steps
    args, _ = parser.parse_known_args()
    run_training(args)

if __name__ == '__main__':
    tf.app.run()

そして、ここが一番特殊なのですが、学習済みモデルをAPIサービス化するために、APIの入出力となる要素をTensorFlowのCollectionオブジェクトで指定します。Collectionオブジェクトは、Key-Valueスタイルで任意のオブジェクトを格納するものですが、特に「inputs」というキーで入力を受け付けるPlaceholder一式、「outputs」というキーでAPIから返却する値の一式を指定します。今の例であれば、次のようになります。

input_key = tf.placeholder(tf.int64, [None,])
x = tf.placeholder(tf.float32, [None, 784])

inputs = {'key': input_key.name, 'image': x.name}
tf.add_to_collection('inputs', json.dumps(inputs))

p = tf.nn.softmax(tf.matmul(hidden1, w0) + b0)
output_key = tf.identity(input_key)

outputs = {'key': output_key.name, 'scores': p.name}
tf.add_to_collection('outputs', json.dumps(outputs))

「inputs」「outputs」共に、指定するオブジェクト(のname属性)をディクショナリーにまとめた上で、それをJSONにシリアライズしたものをCollectionオブジェクトに突っ込んでおきます。ディクショナリーのキーは、APIでやり取りする際の名前になります。この例では、入力画像「x」と予測結果(確率のリスト)「p」の他に、入力値をそのまま出力する「input_key」と「output_key」を入出力要素に加えています。これは複数のデータをまとめてAPIに投げた時に、返ってきた結果のそれぞれが、どの入力データに対応するものかを区別するために加えています。

以上を考慮して修正したコードがこちらになります。

task.py

import tensorflow as tf
import numpy as np
from tensorflow.examples.tutorials.mnist import input_data
import argparse, os, json
mnist = input_data.read_data_sets("/tmp/data/", one_hot=True)

def run_training(args):
    # Define filepath for checkpoint and final model
    checkpoint_path = os.path.join(args.train_dir, 'checkpoint')
    model_path = os.path.join(args.model_dir, 'export') # Filename should be 'export'.
    num_units = 1024
    
    x = tf.placeholder(tf.float32, [None, 784])
    
    w1 = tf.Variable(tf.truncated_normal([784, num_units]))
    b1 = tf.Variable(tf.zeros([num_units]))
    hidden1 = tf.nn.relu(tf.matmul(x, w1) + b1)
    
    w0 = tf.Variable(tf.zeros([num_units, 10]))
    b0 = tf.Variable(tf.zeros([10]))
    p = tf.nn.softmax(tf.matmul(hidden1, w0) + b0)
    
    t = tf.placeholder(tf.float32, [None, 10])
    loss = -tf.reduce_sum(t * tf.log(p))
    train_step = tf.train.AdamOptimizer().minimize(loss)
    correct_prediction = tf.equal(tf.argmax(p, 1), tf.argmax(t, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
    # Define key element
    input_key = tf.placeholder(tf.int64, [None,], name='key')
    output_key = tf.identity(input_key)

    # Define API inputs/outpus object
    inputs = {'key': input_key.name, 'image': x.name}
    outputs = {'key': output_key.name, 'scores': p.name}
    tf.add_to_collection('inputs', json.dumps(inputs))
    tf.add_to_collection('outputs', json.dumps(outputs))
    
    saver = tf.train.Saver()
    sess = tf.InteractiveSession()
    sess.run(tf.initialize_all_variables())

    i = 0
    for _ in range(args.train_step):
        i += 1
        batch_xs, batch_ts = mnist.train.next_batch(100)
        sess.run(train_step, feed_dict={x: batch_xs, t: batch_ts})
        if i % 100 == 0:
            loss_val, acc_val = sess.run([loss, accuracy],
                feed_dict={x:mnist.test.images, t: mnist.test.labels})
            print ('Step: %d, Loss: %f, Accuracy: %f'
                   % (i, loss_val, acc_val))
            saver.save(sess, checkpoint_path, global_step=i)

    # Export the final model.
    saver.save(sess, model_path)


def main(_):
    parser = argparse.ArgumentParser()
    parser.add_argument('--train_dir', type=str, default='/tmp/train')  # Checkpoint directory
    parser.add_argument('--model_dir', type=str, default='/tmp/model')  # Model directory
    parser.add_argument('--train_step', type=int, default=2000)         # Training steps
    args, _ = parser.parse_known_args()
    run_training(args)


if __name__ == '__main__':
    tf.app.run()

Cloud MLの環境準備

それでは、Cloud MLの環境を用意して、先のコードを実行します。Cloud MLにジョブを投げるには、Cloud ML用のSDKを導入したローカル環境が必要となりますが、ここでは、Cloud Shellから実行することにします。Cloud Shell以外の環境を準備する方法は、こちらを参照してください。

新規プロジェクトを作成して、API ManagerからCloud MLのAPIを有効化したら、Cloud Shellを起動してSDKを導入します。

$ curl https://storage.googleapis.com/cloud-ml/scripts/setup_cloud_shell.sh | bash
$ export PATH=${HOME}/.local/bin:${PATH}
$ curl https://storage.googleapis.com/cloud-ml/scripts/check_environment.py | python
Success! Your environment is configured correctly.

サービスアカウントからジョブを投げるため、次のコマンドでサービスアカウントに対してプロジェクトの「編集者」権限を付与します。

$ gcloud beta ml init-project

先程のコードをホームディレクトリーの下の「trainer」ディレクトリー以下にに用意します。

$HOME/trainer/
├── __init__.py   # 空ファイル
└── task.py       # 実行ファイル

まずは、ローカルで実行してみます。テスト実行なので、ループ数は少なめにします。

$ mkdir -p /tmp/train /tmp/model
$ cd $HOME
$ python -m trainer.task --train_step=200
Extracting /tmp/data/train-images-idx3-ubyte.gz
Extracting /tmp/data/train-labels-idx1-ubyte.gz
Extracting /tmp/data/t10k-images-idx3-ubyte.gz
Extracting /tmp/data/t10k-labels-idx1-ubyte.gz
Step: 100, Loss: 3183.995850, Accuracy: 0.903500
Step: 200, Loss: 2237.709229, Accuracy: 0.934500

$ ls -l /tmp/train /tmp/model/
/tmp/model/:
total 9584
-rw-r--r-- 1 enakai enakai     203 Oct  5 17:14 checkpoint
-rw-r--r-- 1 enakai enakai 9770436 Oct  5 17:14 export
-rw-r--r-- 1 enakai enakai   35514 Oct  5 17:14 export.meta
/tmp/train:
total 28744
-rw-r--r-- 1 enakai enakai     163 Oct  5 17:14 checkpoint
-rw-r--r-- 1 enakai enakai 9770436 Oct  5 17:14 checkpoint-100
-rw-r--r-- 1 enakai enakai   35514 Oct  5 17:14 checkpoint-100.meta
-rw-r--r-- 1 enakai enakai 9770436 Oct  5 17:14 checkpoint-200
-rw-r--r-- 1 enakai enakai   35514 Oct  5 17:14 checkpoint-200.meta

Cloud MLによるモデルの学習

クラウド上で学習処理を実行します。はじめに、データ保存用のバケットを作成しておきます。バケット名は任意ですが、お作法としてプロジェクト名を含めておくと良いでしょう。

$ PROJECT_ID=project01 # your project ID
$ TRAIN_BUCKET="gs://$PROJECT_ID-mldata"
$ gsutil mkdir $TRAIN_BUCKET

ジョブ名を決めて、Cloud MLのAPIにジョブを投げつけます。「--staging-bucket」オプションで指定したバケットの下に「cloudmldist」フォルダーが作成されて、その中にコード一式が転送されて、処理が開始します。チェックポイントファイルとモデルファイルを出力するフォルダーを事前にgsutilコマンドで作成している点に注意してください。(gsutilコマンドは空のフォルダーを作ることができないので、ダミーファイルをフォルダー内にコピーしています。本当は、コードの中で自動作成するようにした方が便利ですが、出力先フォルダーが必要な点を強調するために手動作成しています。)

$ JOB_NAME="job01"
$ touch .dummy
$ gsutil cp .dummy $TRAIN_BUCKET/$JOB_NAME/train/
$ gsutil cp .dummy $TRAIN_BUCKET/$JOB_NAME/model/
$ gcloud beta ml jobs submit training $JOB_NAME \
  --region=us-central1 \
  --package-path=trainer --module-name=trainer.task \
  --staging-bucket=$TRAIN_BUCKET \
  -- \
  --train_dir="$TRAIN_BUCKET/$JOB_NAME/train" \
  --model_dir="$TRAIN_BUCKET/$JOB_NAME/model"

createTime: '2016-10-05T08:53:35Z'
jobId: job01
state: QUEUED
trainingInput:
  args:
  - --train_dir=gs://project01/job01/train
  - --model_dir=gs://project01/job01/model
  packageUris:
  - gs://project01/cloudmldist/1475657612/trainer-0.0.0.tar.gz
  pythonModule: trainer.task
  region: us-central1

次のコマンドでジョブの実行を見守ります。最後に「state: SUCCEEDED」になれば完了です。

$ watch -n1 gcloud beta ml jobs describe --project $PROJECT_ID $JOB_NAME
createTime: '2016-10-05T08:53:35Z'
jobId: job01
startTime: '2016-10-05T08:53:45Z'
state: RUNNING
trainingInput:
  args:
  - --train_dir=gs://project01/job01/train
  - --model_dir=gs://project01/job01/model
  packageUris:
  - gs://project01/cloudmldist/1475657612/trainer-0.0.0.tar.gz
  pythonModule: trainer.task
  region: us-central1

実行中のログは、Stackdriverのログ管理画面から「Cloud ML」のログを選択して確認することができます。

正常終了した場合は、次のようにモデルファイル「export」が作成されています。

$ gsutil ls $TRAIN_BUCKET/$JOB_NAME/model/export*
gs://project01/job01/model/export
gs://project01/job01/model/export.meta

学習済みモデルのサービス化

学習済みモデルファイル「export」を利用して、APIサービスを立ち上げます。モデル名を指定して、次のコマンドを実行します。複数のモデルファイルをバージョン管理することが可能で、ここでは、バージョン名「v1」で立ち上げた上で、これをデフォルトサービスに指定しています。

$ MODEL_NAME="MNIST"
$ gcloud beta ml models create $MODEL_NAME
$ gcloud beta ml versions create \
  --origin=$TRAIN_BUCKET/$JOB_NAME/model --model=$MODEL_NAME v1
$ gcloud beta ml versions set-default --model=$MODEL_NAME v1

サービスが起動するまで、1〜2分かかるので、その間に、次のPythonスクリプトを実行して、テスト用データ「data.json」を作成します。1行に1つのイメージデータとkey番号が含まれるJSONファイルです。

import json
from tensorflow.examples.tutorials.mnist import input_data
mnist = input_data.read_data_sets("/tmp/data/", one_hot=True)
with open("data.json", "w") as file:
    for i in range(10):
        data = {"image": mnist.test.images[i].tolist(), "key": i}
        file.write(json.dumps(data)+'\n')

gcloudコマンドでAPIにデータを投げつけると、それぞれのデータに対する予測結果が返って来ます。

$ gcloud beta ml predict --model=${MODEL_NAME} --json-instances=data.json
predictions:
- key: 0
  scores:
  - 2.53733e-08
  - 6.47722e-09
  - 2.23573e-06
  - 5.32844e-05
  - 3.08012e-10
  - 1.33022e-09
  - 1.55983e-11
  - 0.99991
  - 4.39428e-07
  - 3.38841e-05
- key: 1
  scores:
  - 1.98303e-08
  - 2.84799e-07
  - 0.999985
  - 1.47131e-05
  - 1.45546e-13
  - 1.90945e-09
  - 3.50033e-09
  - 2.24941e-18
  - 2.60025e-07
  - 1.45738e-14
- key: 2
  scores:
  - 3.63027e-09
...

REST APIに直接にアクセスする際のURLは、こちらを参照してください。

分散学習について

現状では、分散学習を意識したTensorFlowのコードを用意する必要があります。

・参考:Distributed TensorFlow

今後、分散学習用のコードをもう少し簡単に書けるようにするライブラリーなどが登場するものと期待されます。

分散学習用のコードを今すぐ書いてみたい!という方は下記を参考にしてください。

enakai00.hatenablog.com

Disclaimer: All code snippets are released under Apache 2.0 License. This is not an official Google product.