めもめも

このブログに記載の内容は個人の見解であり、必ずしも所属組織の立場、戦略、意見を代表するものではありません。

Using Cloud Dataflow to run parallel predictions with your TensorFlow model

Suppose that you've finished training your prediction model with TensorFlow, yeay!

Now you have to make predictions with the trained model for tens of thousands of data. How will you do it?

I will show you one of possible choices. You can use Cloud Dataflow for general parallel batch processing and it's not hard to apply machine learning models in this framework.

What is Cloud Dataflow?

Cloud Dataflow is a runtime environment running on Google Cloud Platform for parallel data processing. Traditionally you may have written custom MapReduce codes with Hadoop, but it often becomes too complicated to optimise when you have to combine multiple MapReduce jobs. Instead, Cloud Dataflow builds optimised MapReduce jobs from a dataflow description code written with the Apache Beam SDK.

Here's a code snippet for the wordcount example.

import apache_beam as beam

class WordExtractingDoFn(df.DoFn):
  def process(self, context):
    text_line = context.element.strip()
    words = re.findall(r'[A-Za-z\']+', text_line)
    return words

p = beam.Pipeline()
lines = p | 'Step1. Read lines' >> beam.io.Read(beam.io.TextFileSource(input_file))
words = (lines | 'Step2. Split into words' >> beam.ParDo(WordExtractingDoFn()))
counts = (words | 'Step3. Assign one to each word' >> beam.Map(lambda x: (x, 1))
                | 'Step4. Group by word' >> beam.GroupByKey()
                | 'Step5. Do count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
         )
output = counts | 'Step6. Format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
output | 'Step7. Output results' >> beam.io.Write(beam.io.textio.WriteToText(output_file))
p.run()

Step2&3 corresponds to Map jobs, step4 corresponds to the Shuffle phase, and step5&6 corresponds to Reduce jobs. When you run this code on your SDK environment (typically on the Cloud Shell), it throws a job to the runtime environment and corresponding MapReduce jobs are automatically executed.

How to write codes for Cloud Dataflow jobs?

In the wordcount example, the WordExtractingDoFn class provides a custom Map job to split a single line into a list of words. In the same manner, you can write a class to make a prediction with a trained TensorFlow model for a single data. The jobs are executed in multiple containers on the runtime environment, and you can add additional modules and binaries in the container if necessary. In this case, I will add the TensorFlow module in the runtime.

In the following example, I use three files to run predictions for the MNIST dataset with Cloud Dataflow.

・setup.py : A boiler plate script to customise the runtime. I use this to install the TensorFlow module.
・run.py : A thin wrapper to kick a job.
・module/predict.py : The main code to build and run the dataflow pipeline.

In addition, a text file containing the MNIST dataset and a TensorFlow's model checkpoint file are uploaded in Cloud Storage. (The bucket name dataflow99 can be arbitrary.)

・MNIST dataset: gs://dataflow99/input/images.txt
・Checkpoint file: gs://dataflow99/input/cnn_session-20000

The MNIST dataset file is created with the following code. Each line contains a single image and the data number is added at the top of the line.

from tensorflow.examples.tutorials.mnist import input_data

mnist = input_data.read_data_sets("/tmp/data/", one_hot=True)
with open('images.txt', 'w') as file:
    for num, image in enumerate(mnist.test.images):
        file.write('%d,%s' % (num, ','.join(map(str,image.tolist()))))
        file.write('\n')

Now let's go through the actual codes.

setup.py

from distutils.command.build import build as _build
import setuptools
import subprocess

class build(_build):  # pylint: disable=invalid-name
  sub_commands = _build.sub_commands + [('CustomCommands', None)]

class CustomCommands(setuptools.Command):
  def initialize_options(self):
    pass

  def finalize_options(self):
    pass

  def RunCustomCommand(self, command_list):
    print 'Running command: %s' % command_list
    p = subprocess.Popen(
        command_list,
        stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    # Can use communicate(input='y\n'.encode()) if the command run requires
    # some confirmation.
    stdout_data, _ = p.communicate()
    print 'Command output: %s' % stdout_data
    if p.returncode != 0:
      raise RuntimeError(
          'Command %s failed: exit code: %s' % (command_list, p.returncode))

  def run(self):
    for command in CUSTOM_COMMANDS:
      self.RunCustomCommand(command)


CUSTOM_COMMANDS = [['pip', 'install', 
  'https://storage.googleapis.com/tensorflow/linux/cpu/tensorflow-0.10.0-cp27-none-linux_x86_64.whl']]
REQUIRED_PACKAGES = []

setuptools.setup(
    name='tensorflow-module',
    version='0.0.1',
    description='TensorFlow model prediction package.',
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    cmdclass={'build': build, 'CustomCommands': CustomCommands}
)

This is almost a boiler plate. The important parts are CUSTOM_COMMANDS and REQUIRED_PACKAGES. In this example, the TensorFlow module is explicitly installed with the pip command.

run.py

import logging
from modules import predict
if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  predict.run()

This is a thin wrapper to run the module 'modules/predict.py'.

modules/predict.py

import tensorflow as tf
import numpy as np
import apache_beam as beam
import argparse
import logging

def singleton(cls):
  instances = {}
  def getinstance():
      if cls not in instances:
          instances[cls] = cls()
      return instances[cls]
  return getinstance

@singleton
class Model():
  def __init__(self):
    num_filters1 = 32
    x = tf.placeholder(tf.float32, [None, 784])
    x_image = tf.reshape(x, [-1,28,28,1])
    
    W_conv1 = tf.Variable(tf.truncated_normal([5,5,1,num_filters1],
                                              stddev=0.1))
    h_conv1 = tf.nn.conv2d(x_image, W_conv1,
                           strides=[1,1,1,1], padding='SAME')
    b_conv1 = tf.Variable(tf.constant(0.1, shape=[num_filters1]))
    h_conv1_cutoff = tf.nn.relu(h_conv1 + b_conv1)
    h_pool1 =tf.nn.max_pool(h_conv1_cutoff, ksize=[1,2,2,1],
                            strides=[1,2,2,1], padding='SAME')
    
    num_filters2 = 64
    W_conv2 = tf.Variable(
                tf.truncated_normal([5,5,num_filters1,num_filters2],
                                    stddev=0.1))
    h_conv2 = tf.nn.conv2d(h_pool1, W_conv2,
                           strides=[1,1,1,1], padding='SAME')
    b_conv2 = tf.Variable(tf.constant(0.1, shape=[num_filters2]))
    h_conv2_cutoff = tf.nn.relu(h_conv2 + b_conv2)
    h_pool2 =tf.nn.max_pool(h_conv2_cutoff, ksize=[1,2,2,1],
                            strides=[1,2,2,1], padding='SAME')
    h_pool2_flat = tf.reshape(h_pool2, [-1, 7*7*num_filters2])
    
    num_units1 = 7*7*num_filters2
    num_units2 = 1024
    w2 = tf.Variable(tf.truncated_normal([num_units1, num_units2]))
    b2 = tf.Variable(tf.constant(0.1, shape=[num_units2]))
    hidden2 = tf.nn.relu(tf.matmul(h_pool2_flat, w2) + b2)
    
    keep_prob = tf.placeholder(tf.float32)
    hidden2_drop = tf.nn.dropout(hidden2, keep_prob)
    
    w0 = tf.Variable(tf.zeros([num_units2, 10]))
    b0 = tf.Variable(tf.zeros([10]))
    p = tf.nn.softmax(tf.matmul(hidden2_drop, w0) + b0)
    
    sess = tf.InteractiveSession()
    sess.run(tf.initialize_all_variables())
    saver = tf.train.Saver()
    saver.restore(sess, 'gs://dataflow99/input/cnn_session-20000')

    self.sess, self.p, self.x, self.keep_prob = sess, p, x, keep_prob


class PredictDoFn(beam.DoFn):
  def process(self, context):
    model = Model()
    image = context.element.split(',')
    datanum = image.pop(0)
    pred = model.sess.run(model.p,
                          feed_dict={model.x:[image], model.keep_prob:1.0})[0]
    result = '%s: %s' % (datanum, ','.join(map(str, pred.tolist())))
    return [result]


def run(argv=None):
  parser = argparse.ArgumentParser()
  parser.add_argument('--input', dest='input', required=True,
                      help='Input file to process.')
  parser.add_argument('--output', dest='output', required=True,
                      help='Output file to write results to.')
  known_args, pipeline_args = parser.parse_known_args(argv)

  p = beam.Pipeline(argv=pipeline_args)
  images = p | 'ReadFromText' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
  predictions = (images | 'Prediction' >> beam.ParDo(PredictDoFn()))
  predictions | 'WriteToText' >> beam.io.Write(beam.io.textio.WriteToText(known_args.output))
  logging.getLogger().setLevel(logging.INFO)
  result = p.run()

This executes the actual prediction jobs. The Model class provides the double layer CNN model explained in the TensorFLow Tutorials. The pretrained checkpoint is restored from gs://dataflow99/input/cnn_session-20000.

Note that the Model class is implemented as a singleton. Since the PredictDoFn(beam.DoFn) class could be instantiated multiple times in the same thread, you need to avoid creating multiple TensorFlow sessions in this way.

Run it!

The Apache Beam SDK is preinstalled in the Cloud Shell. So you can directly run the script on the Cloud Shell. The following runs the codes locally on the Cloud Shell to test them. The results are written in /tmp/OUTPUT-00000-of-00001.

$ BUCKET=gs://dataflow99
$ PROJECT=<your project ID>
$ python run.py --input $BUCKET/input/images.txt --output /tmp/OUTPUT

The following runs the job on the Cloud Platform. Runtime environment is automatically deployed on Google Compute Engine and destroyed after the execution.

$ python run.py --runner BlockingDataflowPipelineRunner \
                --project=$PROJECT \
                --staging_location $BUCKET/staging \
                --temp_location $BUCKET/temp \
                --setup_file ./setup.py \
                --job_name $PROJECT-prediction \
                --input $BUCKET/input/images.txt \
                --output $BUCKET/output/predictions

The results are written in multiple files on Cloud Storage gs://dataflow99/output/predictions-0000[0...9]-of-00009. Here are a few examples of the predictions.

7831: 4.27848138862e-11,4.68908603352e-13,2.48020860454e-05,0.999975204468,7.18904258557e-17,6.26192708797e-11,2.0732427408e-20,9.44890032883e-09,1.42062542285e-11,3.0826359243e-09
7832: 2.86281824913e-08,0.999999403954,1.5229447925e-08,2.67739736631e-12,5.05345951751e-07,2.36653296959e-09,1.38168243513e-09,7.38670680178e-09,3.35720322653e-08,1.15596296424e-10
7833: 1.49982966951e-19,2.73763130221e-12,1.07422761838e-15,2.25128758385e-19,1.0,4.67769159385e-15,3.64918932859e-17,2.55687953449e-11,1.83131521693e-14,5.16055809197e-11
7834: 4.4291605619e-20,4.20704703713e-13,4.15420612127e-16,5.78066899277e-17,1.0,2.73779564693e-14,5.10855258428e-16,8.55861992388e-10,4.34696029172e-11,4.51999015993e-09

Disclaimer: All code snippets are released under Apache 2.0 License. This is not an official Google product.