めもめも

このブログに記載の内容は個人の見解であり、必ずしも所属組織の立場、戦略、意見を代表するものではありません。

Jupyterノートブック上でJavaScriptからカーネルのコードを実行する方法

qiita.com

これは、jupyter notebook Advent Calendar 2016の10日目の記事です。

何の話かというと

IPython.displayモジュールのHTML関数を使うと、次のように、Jupyterノートブック上でJavaScriptを実行することができます。

from IPython.display import HTML

javascript = '''
<script type="text/javascript">
    alert("HOGE")
</script>
'''

HTML(javascript) # ポップアップウィンドウを表示

この時さらに、JavaScriptからノートブックを実行中のカーネルを呼び出して、任意のコードを実行することが可能です。また、JavaScript側でコードの実行結果を受け取ることもできます。具体的には、次のようになります。

import numpy as np
from IPython.display import HTML

javascript = '''
<script type="text/javascript">
var kernel = IPython.notebook.kernel;

var callback = function(output) {
  var res = output.content.data['text/plain'];
  alert(res);
};

var x = 2.0;
var command = 'np.sin(' + x + ')';
kernel.execute(command, {'iopub': {"output": callback}}, {silent:false});
</script>
'''

HTML(javascript)

この例では、Pythonカーネル側で np.sin(2.0) を計算して、その結果をJavaScriptに返しています。カーネルの呼び出しは非同期に行われるので、カーネル側の計算が終わったタイミングでコールバック関数(この例では callback)が呼び出されます。JavaScriptからAjaxでREST APIを呼ぶ感覚で、カーネル側のコードを実行すると思えばよいでしょう。

やってみた

というわけで、これを応用して、Jupyterノートブックで遊べるオセロゲームを作りました。オセロの盤面の処理(コマを置いてひっくり返すとか)とコンピュータ側の思考ルーチンをPythonで実装して、JavaScriptのUIから呼び出すようにしています。UIの実装は、enchant.jsを使っています。

github.com

実行画面はこんな感じ。囲碁のパーツで作ったので超違和感がありますがwww。あと、ゲームの終了判定や「パス」ボタンの実装は、面倒だったので省略しています。

うまくいかない点

ゲーム画面が描画された後、画面を上下にスクロールするとクリック位置の判定がおかしくなります。(クリック位置が固定されたままで、一緒にスクロールしていない。)

enchant.jsの実行ループを開始すると、カーソルキーが無効になります。

むむぅ。誰が解決策がわかったら教えてください。

おまけ

コンピュータ側の思考ルーチンはMini-Max法を使っています。具体的には、次のクラスのget_values()メソッドで盤面の評価値(人間側がどの程度優勢か)を計算して、これをなるべく小さくするように打ってきます。

class SimpleMiniMax:
    def get_values(self, boards):
        result = []
        for board in boards:
            score = 0.0
            # The value is defined from the player=1's point of view.
            for c in sum(board, []):
                if c == 1: score += 1
                if c == -1: score -= 1
            result.append([score])
        return np.array(result)

この実装では、「人間側のコマの数-コンピュータ側のコマの数」を評価値にしているので、人間側のコマをなるべく減らす(コンピュータ側のコマをなるべく増やす)という単純なアルゴリズムになっています。最近はやりのディープなんとかで、評価値の計算をなんとかすると、なんとかなあれができるかも知れません。がんばってください。

Using Cloud Dataflow to run parallel predictions with your TensorFlow model

Suppose that you've finished training your prediction model with TensorFlow, yeay!

Now you have to make predictions with the trained model for tens of thousands of data. How will you do it?

I will show you one of possible choices. You can use Cloud Dataflow for general parallel batch processing and it's not hard to apply machine learning models in this framework.

What is Cloud Dataflow?

Cloud Dataflow is a runtime environment running on Google Cloud Platform for parallel data processing. Traditionally you may have written custom MapReduce codes with Hadoop, but it often becomes too complicated to optimise when you have to combine multiple MapReduce jobs. Instead, Cloud Dataflow builds optimised MapReduce jobs from a dataflow description code written with the Apache Beam SDK.

Here's a code snippet for the wordcount example.

import apache_beam as beam

class WordExtractingDoFn(df.DoFn):
  def process(self, context):
    text_line = context.element.strip()
    words = re.findall(r'[A-Za-z\']+', text_line)
    return words

p = beam.Pipeline()
lines = p | 'Step1. Read lines' >> beam.io.Read(beam.io.TextFileSource(input_file))
words = (lines | 'Step2. Split into words' >> beam.ParDo(WordExtractingDoFn()))
counts = (words | 'Step3. Assign one to each word' >> beam.Map(lambda x: (x, 1))
                | 'Step4. Group by word' >> beam.GroupByKey()
                | 'Step5. Do count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
         )
output = counts | 'Step6. Format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
output | 'Step7. Output results' >> beam.io.Write(beam.io.textio.WriteToText(output_file))
p.run()

Step2&3 corresponds to Map jobs, step4 corresponds to the Shuffle phase, and step5&6 corresponds to Reduce jobs. When you run this code on your SDK environment (typically on the Cloud Shell), it throws a job to the runtime environment and corresponding MapReduce jobs are automatically executed.

How to write codes for Cloud Dataflow jobs?

In the wordcount example, the WordExtractingDoFn class provides a custom Map job to split a single line into a list of words. In the same manner, you can write a class to make a prediction with a trained TensorFlow model for a single data. The jobs are executed in multiple containers on the runtime environment, and you can add additional modules and binaries in the container if necessary. In this case, I will add the TensorFlow module in the runtime.

In the following example, I use three files to run predictions for the MNIST dataset with Cloud Dataflow.

・setup.py : A boiler plate script to customise the runtime. I use this to install the TensorFlow module.
・run.py : A thin wrapper to kick a job.
・module/predict.py : The main code to build and run the dataflow pipeline.

In addition, a text file containing the MNIST dataset and a TensorFlow's model checkpoint file are uploaded in Cloud Storage. (The bucket name dataflow99 can be arbitrary.)

・MNIST dataset: gs://dataflow99/input/images.txt
・Checkpoint file: gs://dataflow99/input/cnn_session-20000

The MNIST dataset file is created with the following code. Each line contains a single image and the data number is added at the top of the line.

from tensorflow.examples.tutorials.mnist import input_data

mnist = input_data.read_data_sets("/tmp/data/", one_hot=True)
with open('images.txt', 'w') as file:
    for num, image in enumerate(mnist.test.images):
        file.write('%d,%s' % (num, ','.join(map(str,image.tolist()))))
        file.write('\n')

Now let's go through the actual codes.

setup.py

from distutils.command.build import build as _build
import setuptools
import subprocess

class build(_build):  # pylint: disable=invalid-name
  sub_commands = _build.sub_commands + [('CustomCommands', None)]

class CustomCommands(setuptools.Command):
  def initialize_options(self):
    pass

  def finalize_options(self):
    pass

  def RunCustomCommand(self, command_list):
    print 'Running command: %s' % command_list
    p = subprocess.Popen(
        command_list,
        stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    # Can use communicate(input='y\n'.encode()) if the command run requires
    # some confirmation.
    stdout_data, _ = p.communicate()
    print 'Command output: %s' % stdout_data
    if p.returncode != 0:
      raise RuntimeError(
          'Command %s failed: exit code: %s' % (command_list, p.returncode))

  def run(self):
    for command in CUSTOM_COMMANDS:
      self.RunCustomCommand(command)


CUSTOM_COMMANDS = [['pip', 'install', 
  'https://storage.googleapis.com/tensorflow/linux/cpu/tensorflow-0.10.0-cp27-none-linux_x86_64.whl']]
REQUIRED_PACKAGES = []

setuptools.setup(
    name='tensorflow-module',
    version='0.0.1',
    description='TensorFlow model prediction package.',
    install_requires=REQUIRED_PACKAGES,
    packages=setuptools.find_packages(),
    cmdclass={'build': build, 'CustomCommands': CustomCommands}
)

This is almost a boiler plate. The important parts are CUSTOM_COMMANDS and REQUIRED_PACKAGES. In this example, the TensorFlow module is explicitly installed with the pip command.

run.py

import logging
from modules import predict
if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  predict.run()

This is a thin wrapper to run the module 'modules/predict.py'.

modules/predict.py

import tensorflow as tf
import numpy as np
import apache_beam as beam
import argparse
import logging

def singleton(cls):
  instances = {}
  def getinstance():
      if cls not in instances:
          instances[cls] = cls()
      return instances[cls]
  return getinstance

@singleton
class Model():
  def __init__(self):
    num_filters1 = 32
    x = tf.placeholder(tf.float32, [None, 784])
    x_image = tf.reshape(x, [-1,28,28,1])
    
    W_conv1 = tf.Variable(tf.truncated_normal([5,5,1,num_filters1],
                                              stddev=0.1))
    h_conv1 = tf.nn.conv2d(x_image, W_conv1,
                           strides=[1,1,1,1], padding='SAME')
    b_conv1 = tf.Variable(tf.constant(0.1, shape=[num_filters1]))
    h_conv1_cutoff = tf.nn.relu(h_conv1 + b_conv1)
    h_pool1 =tf.nn.max_pool(h_conv1_cutoff, ksize=[1,2,2,1],
                            strides=[1,2,2,1], padding='SAME')
    
    num_filters2 = 64
    W_conv2 = tf.Variable(
                tf.truncated_normal([5,5,num_filters1,num_filters2],
                                    stddev=0.1))
    h_conv2 = tf.nn.conv2d(h_pool1, W_conv2,
                           strides=[1,1,1,1], padding='SAME')
    b_conv2 = tf.Variable(tf.constant(0.1, shape=[num_filters2]))
    h_conv2_cutoff = tf.nn.relu(h_conv2 + b_conv2)
    h_pool2 =tf.nn.max_pool(h_conv2_cutoff, ksize=[1,2,2,1],
                            strides=[1,2,2,1], padding='SAME')
    h_pool2_flat = tf.reshape(h_pool2, [-1, 7*7*num_filters2])
    
    num_units1 = 7*7*num_filters2
    num_units2 = 1024
    w2 = tf.Variable(tf.truncated_normal([num_units1, num_units2]))
    b2 = tf.Variable(tf.constant(0.1, shape=[num_units2]))
    hidden2 = tf.nn.relu(tf.matmul(h_pool2_flat, w2) + b2)
    
    keep_prob = tf.placeholder(tf.float32)
    hidden2_drop = tf.nn.dropout(hidden2, keep_prob)
    
    w0 = tf.Variable(tf.zeros([num_units2, 10]))
    b0 = tf.Variable(tf.zeros([10]))
    p = tf.nn.softmax(tf.matmul(hidden2_drop, w0) + b0)
    
    sess = tf.InteractiveSession()
    sess.run(tf.initialize_all_variables())
    saver = tf.train.Saver()
    saver.restore(sess, 'gs://dataflow99/input/cnn_session-20000')

    self.sess, self.p, self.x, self.keep_prob = sess, p, x, keep_prob


class PredictDoFn(beam.DoFn):
  def process(self, context):
    model = Model()
    image = context.element.split(',')
    datanum = image.pop(0)
    pred = model.sess.run(model.p,
                          feed_dict={model.x:[image], model.keep_prob:1.0})[0]
    result = '%s: %s' % (datanum, ','.join(map(str, pred.tolist())))
    return [result]


def run(argv=None):
  parser = argparse.ArgumentParser()
  parser.add_argument('--input', dest='input', required=True,
                      help='Input file to process.')
  parser.add_argument('--output', dest='output', required=True,
                      help='Output file to write results to.')
  known_args, pipeline_args = parser.parse_known_args(argv)

  p = beam.Pipeline(argv=pipeline_args)
  images = p | 'ReadFromText' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
  predictions = (images | 'Prediction' >> beam.ParDo(PredictDoFn()))
  predictions | 'WriteToText' >> beam.io.Write(beam.io.textio.WriteToText(known_args.output))
  logging.getLogger().setLevel(logging.INFO)
  result = p.run()

This executes the actual prediction jobs. The Model class provides the double layer CNN model explained in the TensorFLow Tutorials. The pretrained checkpoint is restored from gs://dataflow99/input/cnn_session-20000.

Note that the Model class is implemented as a singleton. Since the PredictDoFn(beam.DoFn) class could be instantiated multiple times in the same thread, you need to avoid creating multiple TensorFlow sessions in this way.

Run it!

The Apache Beam SDK is preinstalled in the Cloud Shell. So you can directly run the script on the Cloud Shell. The following runs the codes locally on the Cloud Shell to test them. The results are written in /tmp/OUTPUT-00000-of-00001.

$ BUCKET=gs://dataflow99
$ PROJECT=<your project ID>
$ python run.py --input $BUCKET/input/images.txt --output /tmp/OUTPUT

The following runs the job on the Cloud Platform. Runtime environment is automatically deployed on Google Compute Engine and destroyed after the execution.

$ python run.py --runner BlockingDataflowPipelineRunner \
                --project=$PROJECT \
                --staging_location $BUCKET/staging \
                --temp_location $BUCKET/temp \
                --setup_file ./setup.py \
                --job_name $PROJECT-prediction \
                --input $BUCKET/input/images.txt \
                --output $BUCKET/output/predictions

The results are written in multiple files on Cloud Storage gs://dataflow99/output/predictions-0000[0...9]-of-00009. Here are a few examples of the predictions.

7831: 4.27848138862e-11,4.68908603352e-13,2.48020860454e-05,0.999975204468,7.18904258557e-17,6.26192708797e-11,2.0732427408e-20,9.44890032883e-09,1.42062542285e-11,3.0826359243e-09
7832: 2.86281824913e-08,0.999999403954,1.5229447925e-08,2.67739736631e-12,5.05345951751e-07,2.36653296959e-09,1.38168243513e-09,7.38670680178e-09,3.35720322653e-08,1.15596296424e-10
7833: 1.49982966951e-19,2.73763130221e-12,1.07422761838e-15,2.25128758385e-19,1.0,4.67769159385e-15,3.64918932859e-17,2.55687953449e-11,1.83131521693e-14,5.16055809197e-11
7834: 4.4291605619e-20,4.20704703713e-13,4.15420612127e-16,5.78066899277e-17,1.0,2.73779564693e-14,5.10855258428e-16,8.55861992388e-10,4.34696029172e-11,4.51999015993e-09

Disclaimer: All code snippets are released under Apache 2.0 License. This is not an official Google product.

Using Cloud Vision API and Cloud Translation API from Google App Engine

qiita.com

This is a Day 7 entry for TensorFlow Advent Calendar 2016 (in Japanese). It's organized by Japan TensorFlow Users Group (TFUG) :)

Recently I had a chance to create a prototype of a photo album application emulating the auto labeling feature of Google Photo. It's running on Google App Engine and uses Cloud Vision API and Cloud Translation API in the background. Cloud Vision API is used to detect appropriate labels for the uploaded photos and Cloud Translation API is used to translate labels in English to some other languages. The target language is specified by the administrator as a deployment option.

Here's a screenshot of the prototype running in the Italian mode. You can see that the labels (tags) are described in Italian.

The full prototype codes is now available on GitHub. I hope you're gonna love it!

github.com

In this note, I will show how you can use those APIs from Python codes running on the GAE standard environment. The good thing is that you don't have to deal with API keys and credentials since API authentication is automatically done on GAE.

First, since the example uses the google-cloud client library, add the following line in requirements.txt.

requirements.txt

google-cloud==0.21.0

Now here's the code snippet for the label detection and the label translation.

from google.cloud import vision, translate

def get_labels(bucket, filename, num=3):
    vision_client = vision.Client()
    image = vision_client.image(
                source_uri = 'gs://%s/%s' % (bucket, filename))
    return image.detect_labels(limit=num)

def translate_text(text, target='en'):
    if target == 'en':
        return text
    translate_client = translate.Client()
    result = translate_client.translate(text, target_language=target)
    return result['translatedText']

It's super easy, yeay!

To use get_label, you specifies the location of an uploaded photo in Google Cloud Storage with the bucket and file names, and the maximum number of labels you need.

To use translate_text, you pass the text string of a label and specify the target language code from the supported languages.

That's all. More details can be found in the library documentation. Enjoy!