めもめも

このブログに記載の内容は個人の見解であり、必ずしも所属組織の立場、戦略、意見を代表するものではありません。

MapReduce デザインパターン (6)

"Graph algorithms" を考えます。

下図の S, A, B, C, D は 5 つの Web ページで、矢印は、Web リンクを表しているとします。(このような関係は、『有向グラフ』と捉えることができます。)このとき、ページ S からスタートして、最低、何クリックで、A 〜 D の各ページに到達するかを計算します。

 ___________  ___________
|     ↓|          ↓
S → A → B → C ← D
↑____|↑__________|____↑

再帰的な計算をごにょごにょすれば、何とかなる気がしますが(普通は、ダイクストラのアルゴリズムを使います)、MapReduce の場合は、基本的には『端から順番になめていく』タイプの計算しかできないので、次のように考えます。

1. 各ページの求める答えを(最初は不明なので)『?』と定義します。(S 自身は、不明ではなくて、0 ですね。)

 ___________  ___________
|     ↓|          ↓
S → A → B → C ← D
0  ?  ?  ?  ?
↑____|↑__________|____↑

2. この状態で、MapReduce で全ページをなめて、答が x と分かっているページからリンクされているページの答を x+1 とします。(リンク先のページがすでに答を持っている場合は、何もしません。)

 ___________  ___________
|     ↓|          ↓
S → A → B → C ← D
0  1  1  ?  ?
↑____|↑__________|____↑

3. この MapReduce の計算を何度も繰り返すと、いつかは、全てのページの答が埋まります。(この例の場合は、2回で完了。)

 ___________  ___________
|     ↓|          ↓
S → A → B → C ← D
0  1  1  2  2
↑____|↑__________|____↑

※一般に、リンクが重み(距離)を持っている場合は、(x+リンクの重み)をリンク先の答にします。この場合、リンク先がすでに答を持っている場合でも、新しい答の方が値が小さければ、新しい答を採用します。

ツリー検索で言うところの『幅優先の総当たり検索』にあたるもので、何のひねりもありませんが、大量データをスケールする方法で計算するには、これも一つのテクニックと言えるのでしょう。。。

テストデータの準備

上記の図のリンク関係を下記のテキストで表現します。タブ区切りで、『サイト名、リンク先のサイトのリスト(コンマ区切り)、サイトSからのリンク数』を表します。リンク数がまだ不明のサイトは、リンク数を -1 としています。

$ cat link.txt
S       A,B,    0
A       B,      -1
B       C,D,    -1
C       A,D,    -1
D       C,      -1
$ hadoop fs -copyFromLocal link.txt link.txt

ソースコード

Map 関数は、『元データから読み込んだ各サイトの情報』と『新たに判明したサイト S からのリンク数』の 2 種類の情報を Reduce 関数に渡します。Reduce 関数は、受け取った情報から、各サイトの情報を更新して、最初のデータと同じフォーマットでファイルに書き出します。

Map 関数から 2 種類の情報を流すために、先に作成した TextIntPair クラスを Map 関数の出力 Value に使用して、

  • 元データから読み込んだ各サイトの情報 → (Key, Value) = (サイト名, (リンク先のサイトのリスト, サイトSからのリンク数) )
  • 新たに判明したサイト S からのリンク数 → (Key, Value) = (サイト名, ( "", サイトSからのリンク数 ) )

という形式で利用します。

graph/TextIntPair.java

ここで作成した TextIntPair.java の package 名を graph に変更して利用。

graph/Graph.java

package graph;

import java.io.*;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;

public class Graph extends Configured implements Tool {

    public static void main( String[] args ) throws Exception {
        int status = ToolRunner.run( new Graph(), args );
        System.exit( status );
    }

    @Override
    public int run( String[] args ) throws Exception {
        Job job = new Job( getConf(), "MapReduce Job" );
        job.setJarByClass( Graph.class );
        job.setMapperClass( Map.class );
        job.setReducerClass( Reduce.class );

        FileInputFormat.setInputPaths( job, new Path( args[ 0 ] ) );
        FileOutputFormat.setOutputPath( job, new Path( args[ 1 ] ) );

        job.setInputFormatClass( TextInputFormat.class );
        job.setOutputFormatClass( TextOutputFormat.class );

// Output format of Map
        job.setMapOutputKeyClass( Text.class );             // Site name
        job.setMapOutputValueClass( TextIntPair.class );    // Links or Answer

// Output format of Reduce
        job.setOutputKeyClass( Text.class );                // Site name
        job.setOutputValueClass( TextIntPair.class );       // Links

        return job.waitForCompletion( true ) ? 0 : -1;
    }

    public static class Map extends
            Mapper<LongWritable, Text, Text, TextIntPair> {

        protected void map( LongWritable key, Text value, Context context )
            throws IOException, InterruptedException {

            String[] line = value.toString().split( "\\s+" );
            Text site = new Text( line[ 0 ] );
            String[] links = line[ 1 ].split( "," );
            IntWritable distance = new IntWritable( Integer.parseInt( line[ 2 ] ) );

            // re-emit the site data
            context.write( site,
                new TextIntPair( new Text( line[ 1 ] ), distance ) );

            if ( distance.get() >= 0 ) {
                // new distance to links
                for ( String link : links ) {
                    IntWritable linkdist = new IntWritable( distance.get() + 1 );
                    context.write( new Text( link ),
                        new TextIntPair( new Text ( "" ), linkdist ) );
                }
            }
        }
    }

    public static class Reduce extends
        Reducer<Text, TextIntPair, Text, TextIntPair> {

        protected void reduce( Text key, Iterable<TextIntPair> values,
            Context context) throws IOException, InterruptedException {

            Text links = new Text( "" );
            IntWritable distance = new IntWritable( -1 );
            IntWritable mindist = null;
            Counter counter = context.getCounter( "Update", "NEWDIST" );

            for ( TextIntPair value : values ) {
                if ( ! value.getFirst().toString().equals( "" ) ) {
                    // recover the site data
                    links = new Text( value.getFirst() );
                    distance = new IntWritable( value.getSecond().get() );
                } else {
                    // replace the distance
                    IntWritable newdist = new IntWritable( value.getSecond().get() );
                    if ( mindist == null || newdist.get() < mindist.get() ) {
                        mindist = newdist;
                    }
                }
            }

            if ( mindist != null ) {    // update distance
                if ( distance.get() < 0 || mindist.get() < distance.get() ) {
                    distance = mindist;
                    counter.increment( 1 );
                }
            }
            context.write( key, new TextIntPair( links, distance ) );
        }
    }
}

実行例

前述のように、この処理は何度も繰り返し行う必要があります。Reduce 関数の中でカウンターを定義して、新たに発見された答の数をカウントしており、このカウントが 0 になる(全ての答が見つかって、これ以上、新たな答はみつからない状態)まで、実行を繰り返します。

$ javac graph/*java; jar -cvf graph.jar graph/

$  hadoop fs -rmr testout; hadoop jar graph.jar graph.Graph -D mapred.deduce.tasks=3 link.txt testout
(中略)
10/07/14 16:15:55 INFO mapred.JobClient:   Update
10/07/14 16:15:55 INFO mapred.JobClient:     NEWDIST=2
(中略)

$  hadoop fs -rmr testout2; hadoop jar graph.jar graph.Graph -D mapred.deduce.tasks=3 testout/part-* testout2
(中略)
10/07/14 16:17:52 INFO mapred.JobClient:   Update
10/07/14 16:17:52 INFO mapred.JobClient:     NEWDIST=2
(中略)

$  hadoop fs -rmr testout3; hadoop jar graph.jar graph.Graph -D mapred.deduce.tasks=3 testout2/part-* testout3
(中略)
10/07/14 16:19:22 INFO mapred.JobClient:   Update
10/07/14 16:19:22 INFO mapred.JobClient:     NEWDIST=0
(中略)

$ hadoop fs -cat testout/part-*
A       B,      1
B       C,D,    1
C       A,D,    -1
D       C,      -1
S       A,B,    0

$ hadoop fs -cat testout2/part-*
A       B,      1
B       C,D,    1
C       A,D,    2
D       C,      2
S       A,B,    0

$ hadoop fs -cat testout3/part-*
A       B,      1
B       C,D,    1
C       A,D,    2
D       C,      2
S       A,B,    0

考察

(1) Map 関数は、『元データから読み込んだ各サイトの情報』と『新たに判明したサイト S からのリンク数』の 2 種類の情報を Reduce 関数に渡していますが、『元データ』の方は分散 KVS に格納しておいて、Reduce 関数の中で直接読み込む方式も考えられます。(このようなデータをサイド・データと言います。)

(2) 今回はカウンターの出力を見ながら、手で実行を繰り返していますが、実用化する際は、これを自動化するドライバー・プログラムを書くことになります。