読者です 読者をやめる 読者になる 読者になる

めもめも

このブログに記載の内容は個人の見解であり、必ずしも所属組織の立場、戦略、意見を代表するものではありません。

分散学習用TensorFlowコードの書き方

何の話かというと

Google Cloud MLを利用して、TensorFlowの分散学習を行う方法です。取り急ぎ、自分用のメモとして公開しておきます。

分散学習にはいくつかのパターンがありますが、最もシンプルな「データ分散」の場合を説明します。各ノードは同じモデルに対して、個別に学習データを適用して、Variableを修正する勾配ベクトルを計算します。それぞれで計算した勾配ベクトルを用いて、共通のVariableを修正していきます。

前提知識

TensorFlowの分散学習処理を行う際は、3種類のノードを使用します。

・Parameter Server:Workerが計算した勾配ベクトルを用いて、Variableのアップデートを行います。
・Worker:教師データから勾配ベクトルを計算します。
・Master:Workerと同様の処理に加えて、学習済みモデルの保存やテストセットに対する評価などの追加処理を行います。

一般にParameter Serverは1〜2ノード、Workerは必要に応じた沢山のノード、Masterは1ノードだけで動かします。Cloud MLにジョブを投げると、これらのノード群がコンテナで生成されて、各ノードでコードの実行が行われます。

クラスター構成情報とノードの役割の取得

TensorFlowのコードは全ノードで共通ですが、コード内でノードの種類に応じて処理を分岐させます。コード内では環境変数 TF_CONFIG を通じて、クラスターの構成情報と自分自身の役割を取得します。次のコードは、クラスター構成情報オブジェクト cluster_spec を作成して、job_name と task_index にノードの役割を格納します。

  # Get cluster and node info
  env = json.loads(os.environ.get('TF_CONFIG', '{}'))
  cluster_info = env.get('cluster', None)
  cluster_spec = tf.train.ClusterSpec(cluster_info)
  task_info = env.get('task', None)
  job_name, task_index = task_info['type'], task_info['index']

job_name には、「ps」「master」「worker」のいずれかの文字列が入ります。task_index は同じ種類のノードが複数ある際に 0 からの通し番号が入ります。これらの情報を用いて、次のように Server オブジェクトを作成すると、このオブジェクトがクラスター内の他のノードとの通信処理を担います。

  server = tf.train.Server(cluster_spec,
                           job_name=job_name, task_index=task_index)

Parameter Serverの処理

Parameter Serverの場合は、次のコマンドで Server オブジェクトを起動すれば、それで必要な処理は終わりです。あとは Server オブジェクトが Parameter Server としての機能を提供してくれます。このコマンドは外部からプロセスを停止するまで、戻ってくることはありません。

  if job_name == "ps": # Parameter server
    server.join()

Workerの処理

Worker(および、Master)では、学習処理のループを回す必要がありますが、この際、他のノードと協調動作するために、Supervisorオブジェクトを生成した後に、このオブジェクトを経由して、チェックポイントファイルの保存やセッションの作成といった処理を実施します。

次は、Supervisorオブジェクトを生成するコードの例です。

  if job_name == "master" or job_name == "worker": # Worker node
    is_chief = (job_name == "master")
...
        global_step = tf.Variable(0, trainable=False)
        init_op = tf.global_variables_initializer()
        saver = tf.train.Saver()
        # Create a supervisor
        sv = tf.train.Supervisor(is_chief=is_chief, logdir=LOG_DIR,
                                 init_op=init_op, saver=saver, summary_op=None,
                                 global_step=global_step, save_model_secs=0)

・is_chief:Masterノードの場合に True を渡します。
・logdir:チェックポイントファイルの保存ディレクトリー
・init_op:セッション作成時のVariable初期化処理
・saver:チェックポイントファイルを保存するためのSaverオブジェクト
・global_step:最適化処理の実施回数をカウントするVariable
・summary_op:TensorBoard用のサマリーオブジェクト(Supervisorオブジェクトを介さずにサマリーを保存する際はNoneを指定)
・save_model_secs:チェックポイントの定期保存間隔(自動保存ではなく、明示的に保存する際はNoneを指定)

また、モデルを定義する際は、次の様に、job_name と task_index を用いて、自分の役割を tf.device で設定した with 構文の中で定義していきます。

  device_fn = tf.train.replica_device_setter(
    cluster=cluster_spec,
    worker_device="/job:%s/task:%d" % (job_name, task_index)
  )
...
  if job_name == "master" or job_name == "worker": # Worker node
    is_chief = (job_name == "master")

    with tf.Graph().as_default() as graph:
      with tf.device(device_fn):

そして次は、セッションを生成して、学習処理のループを回す部分です。

        # Create a session and run training loops
        with sv.managed_session(server.target) as sess:
          reports, step = 0, 0
          start_time = time.time()
          while not sv.should_stop() and step < MAX_STEPS:
             images, labels = mnist_data.train.next_batch(BATCH_SIZE)
             feed_dict = {x:images, t:labels, keep_prob:0.5}
             _, loss_val, step = sess.run([train_step, loss, global_step],
                                          feed_dict=feed_dict)
             if step > CHECKPOINT * reports:
               reports += 1
               logging.info("Step: %d, Train loss: %f" % (step, loss_val))
               if is_chief:
                 # Save checkpoint
                 sv.saver.save(sess, sv.save_path, global_step=step)
...
                 # Save summary
                 feed_dict = {test_loss:loss_val, test_accuracy:acc_val}
                 sv.summary_computed(sess,
                   sess.run(summary, feed_dict=feed_dict), step)
                 sv.summary_writer.flush()

ここでのポイントは、最適化アルゴリズム train_step を評価して Variable をアップデートする際に、global_step を一緒に評価する点です。これにより、global_step の値が 1 増加して、他のノードを含めた学習処理のトータルの回数が取得できます。この例では取得した値を変数 step に格納して、全体として CHECKPOINT 回評価するごとに進捗をログ出力するということを行っています。また、先に定義しておいた is_chief (Masterの場合に True)を用いて、Masterだけで追加の処理をすることもできます。この例では、サマリーの出力とチェックポイントの保存を行っています。sv.save_path には、Supervisorを作成した時に logdir で指定したディレクトリーが入ります。

学習済みモデルの保存

分散学習でトリッキーな点の1つが学習済みモデルの保存方法です。先に構築したモデルは、分散学習用にノードの情報がひも付いていますが、学習済みモデルで分類を行う際は、これらの情報は不要です。そこで、単体ノードでもリストア可能な分類処理専用のモデルを再構築して、それを保存するという処理を行います。

まず、学習処理のループを抜けた部分で次を実行します。これは、最終状態の Variable を一旦チェックポイントファイルに保存して、それからモデルの再構築・保存処理(export_model)を呼び出しています。

          if is_chief: # Export the final model
            sv.saver.save(sess, sv.save_path, global_step=sess.run(global_step))
            export_model(tf.train.latest_checkpoint(LOG_DIR))

そして、再構築・保存処理の例は次のようになります。

def export_model(last_checkpoint):
  # create a session with a new graph.
  with tf.Session(graph=tf.Graph()) as sess:
    x = tf.placeholder(tf.float32, [None, 784])
    p = mnist.get_model(x, None, training=False)

    # Define key elements
    input_key = tf.placeholder(tf.int64, [None,])
    output_key = tf.identity(input_key)

    # Define API inputs/outpus object
    inputs = {'key': input_key.name, 'image': x.name}
    outputs = {'key': output_key.name, 'scores': p.name}
    tf.add_to_collection('inputs', json.dumps(inputs))
    tf.add_to_collection('outputs', json.dumps(outputs))

    init_op = tf.global_variables_initializer()
    sess.run(init_op)

    # Restore the latest checkpoint and save the model
    saver = tf.train.Saver()
    saver.restore(sess, last_checkpoint)
    saver.export_meta_graph(filename=MODEL_DIR + '/export.meta')
    saver.save(sess, MODEL_DIR + '/export',
               write_meta_graph=False)

ここでは、新たなグラフとセッションを用意して、入力 x に対して、予測結果 p を計算する最低限の関係を定義した後に、先ほど保存したチェックポイントの内容をリストアしています。このセッションに含まれていない Variable の値は単純に無視されます。また、分類用コードに入出力変数を渡すために、入出力に関連した変数名を JSON にまとめたものを collection に入れてあります。output_key は、Placeholder の input_key にいれた値がそのまま出てくる変数で、複数データをバッチ処理する際にどの出力データがどの入力データに対応するかを紐付けるために使用します。

コードの全体像

モデル定義の中身は適当に用意した MNIST 用 CNN です。

trainer
├── __init__.py # 空ファイル
├── mnist.py # モデル定義
└── task.py # 学習処理用コード


trainer/task.py

import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
import numpy as np
import time, json, os, logging

import mnist

flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_integer('batch_size', 100,
                     'Batch size. Must divide evenly into the dataset sizes.')
flags.DEFINE_integer('max_steps', 10000, 'Number of steps to run trainer.')
flags.DEFINE_integer('checkpoint', 100, 'Interval steps to save checkpoint.')
flags.DEFINE_string('log_dir', '/tmp/logs',
                    'Directory to store checkpoints and summary logs')
flags.DEFINE_string('model_dir', '/tmp/model',
                    'Directory to store trained model')


# Global flags
BATCH_SIZE = FLAGS.batch_size
MODEL_DIR = FLAGS.model_dir
LOG_DIR = FLAGS.log_dir
MAX_STEPS = FLAGS.max_steps
CHECKPOINT = FLAGS.checkpoint


def export_model(last_checkpoint):
  # create a session with a new graph.
  with tf.Session(graph=tf.Graph()) as sess:
    x = tf.placeholder(tf.float32, [None, 784])
    p = mnist.get_model(x, None, training=False)

    # Define key elements
    input_key = tf.placeholder(tf.int64, [None,])
    output_key = tf.identity(input_key)

    # Define API inputs/outpus object
    inputs = {'key': input_key.name, 'image': x.name}
    outputs = {'key': output_key.name, 'scores': p.name}
    tf.add_to_collection('inputs', json.dumps(inputs))
    tf.add_to_collection('outputs', json.dumps(outputs))

    init_op = tf.global_variables_initializer()
    sess.run(init_op)

    # Restore the latest checkpoint and save the model
    saver = tf.train.Saver()
    saver.restore(sess, last_checkpoint)
    saver.export_meta_graph(filename=MODEL_DIR + '/export.meta')
    saver.save(sess, MODEL_DIR + '/export',
               write_meta_graph=False)
  

def run_training():
  # Get cluster and node info
  env = json.loads(os.environ.get('TF_CONFIG', '{}'))
  cluster_info = env.get('cluster', None)
  cluster_spec = tf.train.ClusterSpec(cluster_info)
  task_info = env.get('task', None)
  job_name, task_index = task_info['type'], task_info['index']

  device_fn = tf.train.replica_device_setter(
    cluster=cluster_spec,
    worker_device="/job:%s/task:%d" % (job_name, task_index)
  )

  logging.info('Start job:%s, index:%d' % (job_name, task_index))

  # Create server
  server = tf.train.Server(cluster_spec,
                           job_name=job_name, task_index=task_index)

  if job_name == "ps": # Parameter server
    server.join()

  if job_name == "master" or job_name == "worker": # Worker node
    is_chief = (job_name == "master")

    with tf.Graph().as_default() as graph:
      with tf.device(device_fn):

        # Prepare training data
        mnist_data = input_data.read_data_sets("/tmp/data/", one_hot=True)

        # Create placeholders
        x = tf.placeholder_with_default(
          tf.zeros([BATCH_SIZE, 784], tf.float32), shape=[None, 784])
        t = tf.placeholder_with_default(
          tf.zeros([BATCH_SIZE, 10], tf.float32), shape=[None, 10])
        keep_prob = tf.placeholder_with_default(
          tf.zeros([], tf.float32), shape=[])
        global_step = tf.Variable(0, trainable=False)

        # Add test loss and test accuracy to summary
        test_loss = tf.placeholder_with_default(
          tf.zeros([], tf.float32), shape=[])
        test_accuracy = tf.placeholder_with_default(
          tf.zeros([], tf.float32), shape=[])
        tf.summary.scalar("Test_loss", test_loss) 
        tf.summary.scalar("Test_accuracy", test_accuracy) 

        # Define a model
        p = mnist.get_model(x, keep_prob, training=True)
        train_step, loss, accuracy = mnist.get_trainer(p, t, global_step)

        init_op = tf.global_variables_initializer()
        saver = tf.train.Saver()
        summary = tf.summary.merge_all()

        # Create a supervisor
        sv = tf.train.Supervisor(is_chief=is_chief, logdir=LOG_DIR,
                                 init_op=init_op, saver=saver, summary_op=None,
                                 global_step=global_step, save_model_secs=0)
    
        # Create a session and run training loops
        with sv.managed_session(server.target) as sess:
          reports, step = 0, 0
          start_time = time.time()
          while not sv.should_stop() and step < MAX_STEPS:
             images, labels = mnist_data.train.next_batch(BATCH_SIZE)
             feed_dict = {x:images, t:labels, keep_prob:0.5}
             _, loss_val, step = sess.run([train_step, loss, global_step],
                                          feed_dict=feed_dict)
             if step > CHECKPOINT * reports:
               reports += 1
               logging.info("Step: %d, Train loss: %f" % (step, loss_val))
               if is_chief:
                 # Save checkpoint
                 sv.saver.save(sess, sv.save_path, global_step=step)

                 # Evaluate the test loss and test accuracy
                 loss_vals, acc_vals = [], []
                 for _ in range(len(mnist_data.test.labels) // BATCH_SIZE):
                   images, labels = mnist_data.test.next_batch(BATCH_SIZE)
                   feed_dict = {x:images, t:labels, keep_prob:1.0}
                   loss_val, acc_val = sess.run([loss, accuracy],
                                                feed_dict=feed_dict)
                   loss_vals.append(loss_val)
                   acc_vals.append(acc_val)
                 loss_val, acc_val = np.sum(loss_vals), np.mean(acc_vals)

                 # Save summary
                 feed_dict = {test_loss:loss_val, test_accuracy:acc_val}
                 sv.summary_computed(sess,
                   sess.run(summary, feed_dict=feed_dict), step)
                 sv.summary_writer.flush()

                 logging.info("Time elapsed: %d" % (time.time() - start_time))
                 logging.info("Step: %d, Test loss: %f, Test accuracy: %f" %
                              (step, loss_val, acc_val))

          # Finish training
          if is_chief: # Export the final model
            sv.saver.save(sess, sv.save_path, global_step=sess.run(global_step))
            export_model(tf.train.latest_checkpoint(LOG_DIR))

        sv.stop()  


def main(_):
  run_training()


if __name__ == '__main__':
  logging.basicConfig(level=logging.INFO) 
  tf.app.run()

trainer/mnist.py

import tensorflow as tf
import json

def get_model(x, keep_prob, training=True):
  num_filters1 = 32
  num_filters2 = 64

  with tf.name_scope('cnn'):
    with tf.name_scope('convolution1'):
      x_image = tf.reshape(x, [-1,28,28,1])
      
      W_conv1 = tf.Variable(tf.truncated_normal([5,5,1,num_filters1],
                                                stddev=0.1))
      h_conv1 = tf.nn.conv2d(x_image, W_conv1,
                             strides=[1,1,1,1], padding='SAME')
      
      b_conv1 = tf.Variable(tf.constant(0.1, shape=[num_filters1]))
      h_conv1_cutoff = tf.nn.relu(h_conv1 + b_conv1)
      
      h_pool1 = tf.nn.max_pool(h_conv1_cutoff, ksize=[1,2,2,1],
                               strides=[1,2,2,1], padding='SAME')

    with tf.name_scope('convolution2'):
      W_conv2 = tf.Variable(
                  tf.truncated_normal([5,5,num_filters1,num_filters2],
                                      stddev=0.1))
      h_conv2 = tf.nn.conv2d(h_pool1, W_conv2,
                             strides=[1,1,1,1], padding='SAME')
      
      b_conv2 = tf.Variable(tf.constant(0.1, shape=[num_filters2]))
      h_conv2_cutoff = tf.nn.relu(h_conv2 + b_conv2)
      
      h_pool2 = tf.nn.max_pool(h_conv2_cutoff, ksize=[1,2,2,1],
                               strides=[1,2,2,1], padding='SAME')

    with tf.name_scope('fully-connected'):
      h_pool2_flat = tf.reshape(h_pool2, [-1, 7*7*num_filters2])
      num_units1 = 7*7*num_filters2
      num_units2 = 1024
      w2 = tf.Variable(tf.truncated_normal([num_units1, num_units2]))
      b2 = tf.Variable(tf.constant(0.1, shape=[num_units2]))
      hidden2 = tf.nn.relu(tf.matmul(h_pool2_flat, w2) + b2)

    with tf.name_scope('output'):
      if training:
        hidden2_drop = tf.nn.dropout(hidden2, keep_prob)
      else:
        hidden2_drop = hidden2
      w0 = tf.Variable(tf.zeros([num_units2, 10]))
      b0 = tf.Variable(tf.zeros([10]))
      p = tf.nn.softmax(tf.matmul(hidden2_drop, w0) + b0)

  tf.summary.histogram("conv_filters1", W_conv1)
  tf.summary.histogram("conv_filters2", W_conv2)

  return p

  
def get_trainer(p, t, global_step):
  with tf.name_scope('optimizer'):
    loss = -tf.reduce_sum(t * tf.log(p), name='loss')
    train_step = tf.train.AdamOptimizer(0.0001).minimize(loss, global_step=global_step)
      
  with tf.name_scope('evaluator'):
    correct_prediction = tf.equal(tf.argmax(p, 1), tf.argmax(t, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction,
                                      tf.float32), name='accuracy')

  return train_step, loss, accuracy

このコードを使って、Cloud MLで学習する際は、Cloud Shellから次のコマンドを実行していきます。(trainerディレクトリーの親ディクレクトリーで実行します。)

$ PROJECT_ID=project01 # your project ID
$ TRAIN_BUCKET="gs://$PROJECT_ID-mldata"
$ gsutil mkdir $TRAIN_BUCKET

$ cat << EOF > config.yaml
trainingInput:
  # Use a cluster with many workers and a few parameter servers.
  scaleTier: STANDARD_1
EOF

$ JOB_NAME="job01"
$ gsutil rm -rf $TRAIN_BUCKET/$JOB_NAME
$ touch .dummy
$ gsutil cp .dummy $TRAIN_BUCKET/$JOB_NAME/train/
$ gsutil cp .dummy $TRAIN_BUCKET/$JOB_NAME/model/

$ gcloud beta ml jobs submit training ${JOB_NAME} \
  --package-path=trainer \
  --module-name=trainer.task \
  --staging-bucket="${TRAIN_BUCKET}" \
  --region=us-central1 \
  --config=config.yaml \
  -- \
  --log_dir=$TRAIN_BUCKET/$JOB_NAME/train \
  --model_dir=$TRAIN_BUCKET/$JOB_NAME/model \
  --max_steps=10000

TensorBoardで進捗を見る時は、次を実行します。

$  tensorboard --port 8080 --logdir $TRAIN_BUCKET/$JOB_NAME/train

学習が終わると、「$TRAIN_BUCKET/$JOB_NAME/model」以下に学習済みモデル(export.meta、および、exprot-data.xxxx)が出力されます。

学習済みモデルをリストアして、予測処理を行うコードの例は次になります。

#!/usr/bin/python
import tensorflow as tf
import numpy as np
import json
from tensorflow.examples.tutorials.mnist import input_data

model_meta = 'gs://project01-mldata/job01/model/export.meta'
model_param = 'gs://project01-mldata/job01/model/export'

with tf.Graph().as_default() as graph:
  sess = tf.InteractiveSession()
  saver = tf.train.import_meta_graph(model_meta)
  saver.restore(sess, model_param)

  inputs = json.loads(tf.get_collection('inputs')[0])
  outputs = json.loads(tf.get_collection('outputs')[0])
  x = graph.get_tensor_by_name(inputs['image'])
  input_key = graph.get_tensor_by_name(inputs['key'])
  p = graph.get_tensor_by_name(outputs['scores'])
  output_key = graph.get_tensor_by_name(outputs['key'])

  mnist_data = input_data.read_data_sets("/tmp/data/", one_hot=True)
  images, labels = mnist_data.test.next_batch(10)
  index = range(10)
  keys, preds = sess.run([output_key, p], feed_dict={input_key:index, x:images})
  for key, pred, label in zip(keys, preds, labels):
    print key, np.argmax(pred), np.argmax(label)


Disclaimer: All code snippets are released under Apache 2.0 License. This is not an official Google product.

ネイピア数の導出と極限計算に関する補足

何の話かというと

www.yukisako.xyz

こちらの記事では、

 \lim_{h\to 0}\frac{a^h-1}{h}=1 ―― (1)

を満たす a は、

 a=\lim_{h\to 0}(1+h)^{\frac{1}{h}} ―― (2)

であることを導いて、これがネイピア数 e の定義に一致することを示しています。

この結論は正しいのですが、(1) から (2) を導出する課程で、極限の計算の取扱に不正確な部分があるので、その点を補足してみます。

まず上記の記事では、(1) から

 \lim_{h\to 0}a^h = 1 + \lim_{h\to 0}h ―― (3)

という式を導いて、さらにこれを

 a=\lim_{h\to 0}(1+h)^{\frac{1}{h}} ―― (4)

と変形しています。

ここで、(1) → (3)、および、(3) → (4) の式変形をそれぞれ詳しく吟味してみます。

導出における問題点(その1)

まず、(1) → (3) は同値変形ではありません。つまり、(3) → (1) の逆変形ができません。

なぜなら、(3) の両辺を個別に計算すると、これはどちらも 1 になります。つまり、(3) は  1=1 という当たり前のことを言っており、よく考えると a の値が何であっても成り立ってしまいます。したがって、この後で説明するように、(3) → (4) を導くこともできません。

それでは、なぜ、(3) → (1) の逆変形はできないのでしょうか?

まず、(3) 右辺の 1 を左辺に移項すると次になります。これは同値変形です。

 \lim_{h\to 0}(a^h-1)= \lim_{h\to 0}h ―― (5)

では、この式の両辺を右辺で割って、(1) を導くことはできるでしょうか? 残念ながらそれはできません。

たとえば、単純に割り算すると次になります。

 \frac{\lim_{h\to 0}(a^h-1)}{\lim_{h\to 0}h} = 1 ―― (6)

ですが、よく考えると (5) の右辺は値で言うと 0 です。これは両辺を 0 で割るという、やってはいけない計算になってしまいます。

(1) の意味は「割り算した後に h\to 0 の極限をとる」という操作であって、極限を取る前の (1) 左辺の分母は 0 ではありませんので、これは許される計算です。一方、(6) は極限を取った後に割り算をしているため、0 で割るという禁止事項に触れてしまっています。(1) と (6) は同じ内容を表わすものではない、という点をよく味わってみてください。

違う言い方をすると、(1) から (3) を導く際は、「極限を取る前の計算」を「極限を取った後の計算」に置き換えてしまっているというわけです。極限を含む式を扱う際は、このような「極限を取るタイミング」によく注意する必要があります。

導出における問題点(その2)

(3) から (4) の変形では、両辺を \frac{1}{h} 乗するという操作をしていますが、ここでも「極限を取った後の計算」と「極限を取る前の計算」に注意する必要があります。(3) の両辺を \frac{1}{h} 乗した結果は、あくまで次のものです。

 \left(\lim_{h\to 0}a^h\right)^{\frac{1}{h}} = \left(1 + \lim_{h\to 0}h\right)^{\frac{1}{h}} ―― (7)

ここで「極限を取った後に \frac{1}{h} 乗する」という操作が「 \frac{1}{h} 乗した後に極限を取る」という操作と同じであれば、これを (4) に書き直すことができるのですが、残念ながら、この2つの操作は同じではありません。(もし同じだとすると、任意の a で成り立つ (3) から、特定の a のみで成り立つ (4) が導かれるというおかしな話になってしまいます。そもそも (7) は、\frac{1}{h} 乗の中身ですでに \lim h\to 0 の極限をとっているので、\frac{1}{h} 乗の h はいったい何者かよくわかりません。)

ちなみに、それでは、(3) からはどのような変形が可能なのでしょうか? (3) には極限が入っているために、次の式変形が難しくなっています。そこで、(3) を極限をとる前の関係式に直してみます。

 a^h = 1 + h + f(h) ―― (8)

極限を取る前は、両辺の値はかならずしも一致しませんので、その差分を f(h) と置いています。極限で一致することから \lim_{h\to 0}f(h) = 0 が成り立ちます。

(8) は「極限を取る前」の関係ですので、安心して、両辺を \frac{1}{h} 乗することができます。

 a = \left\{1+h+f(h)\right\}^{\frac{1}{h}} ―― (9)

この後、両辺について、h\to 0 の極限をとると次が得られます。

 a = \lim_{h\to 0}\left\{1+h+f(h)\right\}^{\frac{1}{h}} ―― (10)

仮に、f(h) \equiv 0 (すべての h について f(h)=0)であれば、(10) は (4) に一致しますが、残念ながらそうではありません。f(h) をいろいろ取り替えることで、a はいろいろな異なる値になります。(3) が任意の a で成り立つという事実が、この f(h) の自由度に反映されているわけです。

 「えー。\lim_{h\to 0}f(h) = 0 やねんから、(10) と (4) はおんなじとちゃうん???」

ちゃいます。その理屈が通るのであれば、\lim_{h\to 0}h=0 ですので、(10) は、そもそも a=\lim_{h\to 0}\left\{1\right\}^{\frac{1}{h}} と同じになってしまいます。そんなわけはありません。あくまで、\left\{1+h+f(h)\right\}^{\frac{1}{h}} という計算を一式終わらせた後に、h \to 0 の極限をとるというのが (10) の意味であって、個別のパーツだけで先に極限を取ってしまうと、同じ計算にはなりません。「計算してから極限をとる」と「極限をとってから計算する」の違いをもう一度味わってみてください。

(1)から(4)を導く厳密な計算

・・・と、不正確な点を指摘するだけでは不親切なので、上記の問題点をさけて、厳密に (1) から (4) を導く方法を考えてみましょう。いろんなやり方が考えられますが、根本原理に立ち戻って、ε-δ 論法を使ってみましょう。先ほど関数 f(x) を導入したように、ε-δ 論法を用いると、「極限をとる前の関係式で、極限の様子を表現する」ということが出来るので、極限のことを気にせずに自由に式変形ができるようになります。これこそが、ε-δ 論法が有用な理由なのです。

まず、(1) の関係を ε-δ 論法で述べると次になります。

「(十分に小さな)任意の \epsilon > 0 に対して、ある \delta > 0 をうまく選択すると、 \left|h\right| < \delta を満たすすべての h に対して  \left| \frac{a^h-1}{h} - 1 \right| < \epsilon が成立する」 ―― (11)

いかがでしょうか。(11) は極限を取る前の関係式ですので、やや複雑ではありますが、通常の関係式として、安心して式変形することができます。

そして、最終的に示したい事実 (4) は、ε-δ 論法で述べると次になります。

「(十分に小さな)任意の \epsilon' > 0 に対して、ある \delta' > 0 をうまく選択すると、 \left|h\right| < \delta' を満たすすべての h に対して \left|a - (1+h)^{\frac{1}{h}}\right| < \epsilon' が成立する」―― (12)

(11)と(12)に含まれる \epsilon\delta は異なるものなので、混乱しないように記号をわけておきました。

そこで、(11) から出発して、(12) を示すことができれば、 (1) から (4) が導けたことになるわけです。




・・・約3時間の沈黙・・・



ぐああああああああ。


すいません。紙の上で証明はできたのですが、ノート5ページぐらいになってしまって、とてもTeX記法で整理して書く気がしません。。。。くやしぃ。

代わりに、ε-δ 論法をつかわずに極限の落とし穴をすりぬけてうまいこと証明する方法を紹介しておきます。。。。。

まず、(1) において、t=\frac{1}{a^h-1} という変数変換を行います。これは、h について解くと h=\log_a\left(1+\frac{1}{t}\right) になります。さらに、h\to 0 の時 t\to\infty であることに注意すると、(1) は、次の式と同値です。

 \lim_{t\to \infty}\frac{1}{t\log_a\left(1+\frac{1}{t}\right)}=1

さらに、この左辺は次のように変形できます。

 \lim_{t\to \infty}\frac{1}{\log_a\left(1+\frac{1}{t}\right)^t} = 1

ここで、左辺に含まれる関数 \frac{1}{\log_a x} は、x について連続関数なので、極限を対数の中に入れることができます。

 \frac{1}{\log_a\lim_{t\to\infty}\left(1+\frac{1}{t}\right)^t} = 1

つまり、

 \log_a\lim_{t\to\infty}\left(1+\frac{1}{t}\right)^t = 1

となります。最後にこの両辺を a の肩にのせると、a^{\log_ax} = x に注意して、

 \lim_{t\to\infty}\left(1+\frac{1}{t}\right)^t =a

が得られます。

来ました! ここで h' = \frac{1}{t} で新たに h' を定義すると、結局、次が成り立つことがわかります。

 a=\lim_{h'\to 0}(1+h')^{\frac{1}{h'}}

この h' はいわゆる束縛変数なので、好きな文字に置き換えられます。h' をあらたに h に置き直すことで (4) が導かれました。

この証明のミソは、連続関数 f(x) については、\lim f(\cdots) = f(\lim\cdots) が成立するという性質にあるわけです。

捩率が0でない「平面」の例

何の話かというと

みんな大好きRiemann幾何学では、計量を保存する曲率のある空間(平面)を取り扱いますが、この際、(Riemann多様体の前提として)捩率は必ず0になります。

このような捩率0で曲率だけを持つ平面(二次元多様体)の例は、球面などを容易に想像することができます。接ベクトルを閉曲線にそって平行移動したときに元にもどらないという話も、経線方向と赤道方向の移動を組み合わせて、簡単に確認することができます。

その一方で、より一般的な捩率を持つ「平面(二次元多様体)」は、意外と簡単な例が思いつかないのではないでしょうか? よくあるのは、接ベクトルがプロペラのように回転しながら移動する下記のような例ですが、これは、三次元多様体であって、決して平面ではありません。

https://upload.wikimedia.org/wikipedia/commons/thumb/e/e5/Torsion_along_a_geodesic.svg/877px-Torsion_along_a_geodesic.svg.png

はたして、捩率を持つ平面にはどのような構造があるのでしょうか?!

というわけで、捩率を持つわかりやすい平面の例を手作業で作ってみました。

事前準備

なるべくシンプルな例にするために、局所座標系を1つ固定して、この座標系において計量テンソルは一定 g = \begin{pmatrix}1 & 0 \\ 0 & 1\end{pmatrix} とします。

この時、接続 \nabla が計量的である(計量を保存する)という次の条件から、接続係数 \Gamma_{ij}^{\,\,\,k} の形がかなり限定されます。

 Xg(Y,Z) = g(\nabla_XY,Z)+g(Y,\nabla_XZ)

まず、上記を成分表記すると次になります。

 \partial_k g_{ij} = \Gamma_{ki,j}+\Gamma_{kj,i}

今の場合、左辺は0になることから、これを全成分について書き下すと次が得られます。

 \Gamma_{00,0} + \Gamma_{00,0} = 0
 \Gamma_{01,0} + \Gamma_{00,1} = 0
 \Gamma_{10,0} + \Gamma_{10,0} = 0
 \Gamma_{11,0} + \Gamma_{10,1} = 0
 \Gamma_{00,1} + \Gamma_{01,0} = 0
 \Gamma_{01,1} + \Gamma_{01,1} = 0
 \Gamma_{10,1} + \Gamma_{11,0} = 0
 \Gamma_{11,1} + \Gamma_{11,1} = 0

これを整理すると次になります。

 \Gamma_{00,0}  = 0
 \Gamma_{10,0}  = 0
 \Gamma_{01,1}  = 0
 \Gamma_{11,1}  = 0
 \Gamma_{01,0} = - \Gamma_{00,1}
 \Gamma_{11,0} = - \Gamma_{10,1}

つまり、接続係数は、本質的に2つのパラメータで決定されることになります。

ちなみに、この上に、さらに捩率0という条件 \Gamma_{ij,k} = \Gamma_{ji,k} を課すと接続係数はすべて0になって、平坦なユークリッド平面になってしまいます。今の場合、捩率0という条件を課す必要はありませんので、一般に、接続係数は次のように決まります。

 \Gamma^0 = \begin{pmatrix}0 & p \\ 0 & q\end{pmatrix},\,\,\,\,\Gamma^1 = \begin{pmatrix}-p & 0 \\ -q & 0\end{pmatrix}

捩率テンソル T_{ij}^{\,\,\,k} = \Gamma_{ij}^{\,\,\,k} - \Gamma_{ji}^{\,\,\,k} で表記すると次になります。

 T^0 = \begin{pmatrix}0 & p \\ -p & 0\end{pmatrix},\,\,\,\,T^1 = \begin{pmatrix}0 & q \\ -q & 0\end{pmatrix}

この平面の性質

それでは、上記の接続係数を持つ平面はどのように「捩れて」いるのか、接ベクトルを平行移動して確かめてみましょう。

まず、原点における座標軸方向の2本の接ベクトル Z=\partial_0,\,W=\partial_1 を0軸方向に平行移動した際の成分の変化を次のように表します。

 \partial_0 \rightarrow Z = z^0(x_0,0)\partial_0 + z^1(x_0, 0)\partial_1

 \partial_1  \rightarrow W = w^0(x_0,0)\partial_0 + w^1(x_0, 0)\partial_1

これを0軸方向の平行移動の条件 \nabla_0 Z = 0,\,\nabla_0 W = 0 に代入します。まず、Z から計算すると・・・

 0 = \nabla_0 Z = (\partial_0 Z^l + Z^m\Gamma_{0m}^{\,\,\,l})\partial_l =(\partial_0 Z^l + Z^0\Gamma_{00}^{\,\,\,l}+Z^1\Gamma_{01}^{\,\,\,l})\partial_l
  =(\partial_0z^0+pz_1)\partial_0 + (\partial_0z^1-pz^0)\partial_1

したがって、次の連立偏微分方程式が得られます。

 \partial_0 z^0 = -p z^1,\,\,\,\,\partial_0 z^1 = pz^0

原点における初期条件を考慮すると、次の解が得られます。

  z^0 = \cos(px_0),\,\,\,\,z^1 = \sin(px_0)

つまり、接ベクトル Z は、0軸方向にすすむと角速度 p で反時計回り方向に回転していくのです。W についても同様の成分計算をしても構いませんが、計量が保存されることを考えると、W も同様の回転をすることがわかります。なかなかユニークな平面です。

それでは同様に、1軸方向に平行移動するとどうなるでしょうか? 先ほどと同様の成分表示を用いて計算します。

 \partial_0 \rightarrow Z = z^0(0,x_1)\partial_0 + z^1(0, x_1)\partial_1

 \partial_1  \rightarrow W = w^0(0,x_1)\partial_0 + w^1(0, x_1)\partial_1

これを1軸方向の平行移動の条件 \nabla_0 Z = 0,\,\nabla_0 W = 0 に代入して、先と同様に Z から計算します。

 0 = \nabla_1 Z = (\partial_1 Z^l + Z^m\Gamma_{1m}^{\,\,\,l})\partial_l =(\partial_1 Z^l + Z^0\Gamma_{10}^{\,\,\,l}+Z^1\Gamma_{11}^{\,\,\,l})\partial_l
  =(\partial_1z^0+qz_1)\partial_0 + (\partial_1z^1-qz^0)\partial_1

 \partial_1 z^0 = -q z^1,\,\,\,\,\partial_1 z^1 = qz^0

したがって、初期条件を考慮して、次の解が得られます。

  z^0 = \cos(qx_1),\,\,\,\,z^1 = \sin(qx_1)

これより、接ベクトル Z は、1軸方向にすすむと角速度 q で反時計回り方向に回転することがわかります。計量が保存されることから、W も同様の回転を行います。

以上より一般に、この平面上では、0軸方向に進むと角速度 p、1軸方向に進むと角速度 q で回転するような「捩れ」が存在することがわかります。

この時、計量ベクトルと接続係数は座標に依存しない定数ですので、原点以外のどの点から出発しても同様になる点に注意してください。これは、適当な長方形にそって一周回った場合、行きと帰りで回転方向が反対になることから、元の接ベクトルに戻ることを意味しています。つまり、この平面は、捩率だけが存在して、曲率は0になっているのです。「ほんまかいな」と思う方は、曲率テンソルを成分計算すると、すべて0になることが確認できるはずです。

捩れ具合をベクトル場であらわすとこんな感じでしょうかね。


参考資料

曲率と捩率の直感的解釈を厳密に計算で示した例がありました。
情報幾何ゼミ Torsion に関する補足(PDF)