"Graph algorithms" を考えます。
下図の S, A, B, C, D は 5 つの Web ページで、矢印は、Web リンクを表しているとします。(このような関係は、『有向グラフ』と捉えることができます。)このとき、ページ S からスタートして、最低、何クリックで、A 〜 D の各ページに到達するかを計算します。
___________ ___________ | ↓| ↓ S → A → B → C ← D ↑____|↑__________|____↑
再帰的な計算をごにょごにょすれば、何とかなる気がしますが(普通は、ダイクストラのアルゴリズムを使います)、MapReduce の場合は、基本的には『端から順番になめていく』タイプの計算しかできないので、次のように考えます。
1. 各ページの求める答えを(最初は不明なので)『?』と定義します。(S 自身は、不明ではなくて、0 ですね。)
___________ ___________ | ↓| ↓ S → A → B → C ← D 0 ? ? ? ? ↑____|↑__________|____↑
2. この状態で、MapReduce で全ページをなめて、答が x と分かっているページからリンクされているページの答を x+1 とします。(リンク先のページがすでに答を持っている場合は、何もしません。)
___________ ___________ | ↓| ↓ S → A → B → C ← D 0 1 1 ? ? ↑____|↑__________|____↑
3. この MapReduce の計算を何度も繰り返すと、いつかは、全てのページの答が埋まります。(この例の場合は、2回で完了。)
___________ ___________ | ↓| ↓ S → A → B → C ← D 0 1 1 2 2 ↑____|↑__________|____↑
※一般に、リンクが重み(距離)を持っている場合は、(x+リンクの重み)をリンク先の答にします。この場合、リンク先がすでに答を持っている場合でも、新しい答の方が値が小さければ、新しい答を採用します。
ツリー検索で言うところの『幅優先の総当たり検索』にあたるもので、何のひねりもありませんが、大量データをスケールする方法で計算するには、これも一つのテクニックと言えるのでしょう。。。
テストデータの準備
上記の図のリンク関係を下記のテキストで表現します。タブ区切りで、『サイト名、リンク先のサイトのリスト(コンマ区切り)、サイトSからのリンク数』を表します。リンク数がまだ不明のサイトは、リンク数を -1 としています。
$ cat link.txt S A,B, 0 A B, -1 B C,D, -1 C A,D, -1 D C, -1 $ hadoop fs -copyFromLocal link.txt link.txt
ソースコード
Map 関数は、『元データから読み込んだ各サイトの情報』と『新たに判明したサイト S からのリンク数』の 2 種類の情報を Reduce 関数に渡します。Reduce 関数は、受け取った情報から、各サイトの情報を更新して、最初のデータと同じフォーマットでファイルに書き出します。
Map 関数から 2 種類の情報を流すために、先に作成した TextIntPair クラスを Map 関数の出力 Value に使用して、
- 元データから読み込んだ各サイトの情報 → (Key, Value) = (サイト名, (リンク先のサイトのリスト, サイトSからのリンク数) )
- 新たに判明したサイト S からのリンク数 → (Key, Value) = (サイト名, ( "", サイトSからのリンク数 ) )
という形式で利用します。
graph/TextIntPair.java
⇒ ここで作成した TextIntPair.java の package 名を graph に変更して利用。
graph/Graph.java
package graph; import java.io.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.*; public class Graph extends Configured implements Tool { public static void main( String[] args ) throws Exception { int status = ToolRunner.run( new Graph(), args ); System.exit( status ); } @Override public int run( String[] args ) throws Exception { Job job = new Job( getConf(), "MapReduce Job" ); job.setJarByClass( Graph.class ); job.setMapperClass( Map.class ); job.setReducerClass( Reduce.class ); FileInputFormat.setInputPaths( job, new Path( args[ 0 ] ) ); FileOutputFormat.setOutputPath( job, new Path( args[ 1 ] ) ); job.setInputFormatClass( TextInputFormat.class ); job.setOutputFormatClass( TextOutputFormat.class ); // Output format of Map job.setMapOutputKeyClass( Text.class ); // Site name job.setMapOutputValueClass( TextIntPair.class ); // Links or Answer // Output format of Reduce job.setOutputKeyClass( Text.class ); // Site name job.setOutputValueClass( TextIntPair.class ); // Links return job.waitForCompletion( true ) ? 0 : -1; } public static class Map extends Mapper<LongWritable, Text, Text, TextIntPair> { protected void map( LongWritable key, Text value, Context context ) throws IOException, InterruptedException { String[] line = value.toString().split( "\\s+" ); Text site = new Text( line[ 0 ] ); String[] links = line[ 1 ].split( "," ); IntWritable distance = new IntWritable( Integer.parseInt( line[ 2 ] ) ); // re-emit the site data context.write( site, new TextIntPair( new Text( line[ 1 ] ), distance ) ); if ( distance.get() >= 0 ) { // new distance to links for ( String link : links ) { IntWritable linkdist = new IntWritable( distance.get() + 1 ); context.write( new Text( link ), new TextIntPair( new Text ( "" ), linkdist ) ); } } } } public static class Reduce extends Reducer<Text, TextIntPair, Text, TextIntPair> { protected void reduce( Text key, Iterable<TextIntPair> values, Context context) throws IOException, InterruptedException { Text links = new Text( "" ); IntWritable distance = new IntWritable( -1 ); IntWritable mindist = null; Counter counter = context.getCounter( "Update", "NEWDIST" ); for ( TextIntPair value : values ) { if ( ! value.getFirst().toString().equals( "" ) ) { // recover the site data links = new Text( value.getFirst() ); distance = new IntWritable( value.getSecond().get() ); } else { // replace the distance IntWritable newdist = new IntWritable( value.getSecond().get() ); if ( mindist == null || newdist.get() < mindist.get() ) { mindist = newdist; } } } if ( mindist != null ) { // update distance if ( distance.get() < 0 || mindist.get() < distance.get() ) { distance = mindist; counter.increment( 1 ); } } context.write( key, new TextIntPair( links, distance ) ); } } }
実行例
前述のように、この処理は何度も繰り返し行う必要があります。Reduce 関数の中でカウンターを定義して、新たに発見された答の数をカウントしており、このカウントが 0 になる(全ての答が見つかって、これ以上、新たな答はみつからない状態)まで、実行を繰り返します。
$ javac graph/*java; jar -cvf graph.jar graph/ $ hadoop fs -rmr testout; hadoop jar graph.jar graph.Graph -D mapred.deduce.tasks=3 link.txt testout (中略) 10/07/14 16:15:55 INFO mapred.JobClient: Update 10/07/14 16:15:55 INFO mapred.JobClient: NEWDIST=2 (中略) $ hadoop fs -rmr testout2; hadoop jar graph.jar graph.Graph -D mapred.deduce.tasks=3 testout/part-* testout2 (中略) 10/07/14 16:17:52 INFO mapred.JobClient: Update 10/07/14 16:17:52 INFO mapred.JobClient: NEWDIST=2 (中略) $ hadoop fs -rmr testout3; hadoop jar graph.jar graph.Graph -D mapred.deduce.tasks=3 testout2/part-* testout3 (中略) 10/07/14 16:19:22 INFO mapred.JobClient: Update 10/07/14 16:19:22 INFO mapred.JobClient: NEWDIST=0 (中略) $ hadoop fs -cat testout/part-* A B, 1 B C,D, 1 C A,D, -1 D C, -1 S A,B, 0 $ hadoop fs -cat testout2/part-* A B, 1 B C,D, 1 C A,D, 2 D C, 2 S A,B, 0 $ hadoop fs -cat testout3/part-* A B, 1 B C,D, 1 C A,D, 2 D C, 2 S A,B, 0
考察
(1) Map 関数は、『元データから読み込んだ各サイトの情報』と『新たに判明したサイト S からのリンク数』の 2 種類の情報を Reduce 関数に渡していますが、『元データ』の方は分散 KVS に格納しておいて、Reduce 関数の中で直接読み込む方式も考えられます。(このようなデータをサイド・データと言います。)
(2) 今回はカウンターの出力を見ながら、手で実行を繰り返していますが、実用化する際は、これを自動化するドライバー・プログラムを書くことになります。