"word co-occurrence problem" の Pairs パターンの応用問題を考えます。
例えば、(You, are), (You, don't) ・・・など You から始まる単語のペアが全部で 120 回出現しており、その中で、(You, are) が 80 回だとすると、全体に対する割合は、80 / 120 * 100 = 66% と計算できます。
このような割合を MapReduce で計算する方法を考えます。
なお、以下の手法は、"order inversion" と呼ばれます。(この割合の分母の様に、全データを見て『最後』に決まる情報を Reduce 処理の『最初』に求めさせる手法、という意味だと思います。)
ソースコード
relfreq/TextPair.java
package relfreq; /* テキストのタプルを Key に使用するためのクラスです。 像本のサンプル・コード http://examples.oreilly.com/9780596521981/htdg-examples-0.1.1.tar.gz より、下記のソースをここに挿入 htdg-examples-0.1.1/src/main/ch04/java/TextPair.java */ /* 下記の hashCode() を修正します。*/ @Override public int hashCode() { // return first.hashCode() * 163 + second.hashCode(); return first.hashCode(); }
relfreq/RelFreq.java
package relfreq; import java.io.*; import java.util.HashMap; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.*; public class RelFreq extends Configured implements Tool { public static void main( String[] args ) throws Exception { int status = ToolRunner.run( new RelFreq(), args ); System.exit( status ); } @Override public int run( String[] args ) throws Exception { Job job = new Job( getConf(), "MapReduce Job" ); job.setJarByClass( RelFreq.class ); job.setMapperClass( Map.class ); job.setReducerClass( Reduce.class ); FileInputFormat.setInputPaths( job, new Path( args[ 0 ] ) ); FileOutputFormat.setOutputPath( job, new Path( args[ 1 ] ) ); job.setInputFormatClass( TextInputFormat.class ); job.setOutputFormatClass( TextOutputFormat.class ); // Output format of Map job.setMapOutputKeyClass( TextPair.class ); job.setMapOutputValueClass( IntWritable.class ); // Output format of Reduce job.setOutputKeyClass( TextPair.class ); job.setOutputValueClass( FloatWritable.class ); return job.waitForCompletion( true ) ? 0 : -1; } public static class Map extends Mapper<LongWritable, Text, TextPair, IntWritable> { private final static IntWritable one = new IntWritable(1); private TextPair pair = new TextPair(); private TextPair freq = new TextPair(); protected void map( LongWritable key, Text value, Context context ) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split( "\\s+" ); for ( String s : words ) { pair.set( pair.getSecond(), new Text( s ) ); context.write( pair, one ); freq.set( pair.getFirst(), new Text( "" ) ); context.write( freq, one ); } } } public static class Reduce extends Reducer<TextPair, IntWritable, TextPair, FloatWritable> { private HashMap<Text, Integer> total = new HashMap<Text, Integer>(); protected void reduce( TextPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for ( IntWritable i : values ) { count += i.get(); } if ( key.getSecond().toString().compareTo( "" ) == 0 ) { total.put( key.getFirst(), count ); } else { context.write( key, new FloatWritable( count * 100 / total.get( key.getFirst() ).intValue() ) ); } } } }
実行例
$ javac relfreq/*java; jar -cf relfreq.jar relfreq/ $ hadoop fs -rmr testout; hadoop jar relfreq.jar relfreq.RelFreq -D mapred.reduce.tasks=3 Karamazov.txt testout $ hadoop fs -cat testout/part-r-00002 | grep "^Here" Here Alexey 2.0 Here Fyodor 2.0 Here I 10.0 Here I've 2.0 Here Ippolit 8.0 Here Ivan 2.0 Here and 2.0 Here are 10.0 Here at 2.0 Here he 8.0 Here in 2.0 Here is 10.0 Here it 12.0 Here or 2.0 Here she 2.0 Here the 4.0 Here they 2.0 Here was 4.0 Here we 4.0 Here when 2.0 Here you, 2.0
最後のコマンドは、(結果の数値的にわかりやすい例として)Here の後ろにペアで現れる単語を表示しています。
考察
for ( String s : words ) { pair.set( pair.getSecond(), new Text( s ) ); context.write( pair, one ); freq.set( pair.getFirst(), new Text( "" ) ); context.write( freq, one ); }
(1) Map の出力の際に ((A,B), 1) と同時に ((A,""), 1) という余計なエントリーを出力しています。この余計なエントリーを Reduce 側で合計すると、A で始まるペアー全体の合計数が計算できます。(「Map は、レコード毎の処理結果以外の追加情報を出力しても構わない。」の利用ですね。)
int count = 0; for ( IntWritable i : values ) { count += i.get(); } if ( key.getSecond().toString().compareTo( "" ) == 0 ) { total.put( key.getFirst(), count ); } else { context.write( key, new FloatWritable( count * 100 / total.get( key.getFirst() ).intValue() ) ); }
(2) こちらが対応する Reduce 処理。Map の結果は Key でソートされて Reduce に渡るので、A で始まるペアーの先頭に、(A, "") が必ず来るので、(A, B) のカウント時には、割り算の分母((A,"")の合計)は計算が終わっている、というトリックも使われています。今回は、(A, "") という Key が自然に、他の (A, B) より先にソートされましたが、Key のソート関数を明示的に定義することでコントロールする方法もあります。
/* 下記の hashCode() を修正します。*/ @Override public int hashCode() { // return first.hashCode() * 163 + second.hashCode(); return first.hashCode(); }
(3) Key に使っている TextPair の hashCode() を修正しています。これは、(A, ""), (A, B), (A, C) など、A で始まるペアーがまとめて同じ Reduce タスクに流れ込むためのトリックです。デフォルトでは、Key 全体のハッシュで複数の Reduce タスクにばらまかれるので、(A, ""), (A, B), (A, C) などが異なる Reduce タスクに行く可能性があり、Reduce タスクの最初に (A, "") の合計で分母を求める作戦が破綻してしまいます。ここでは、ハッシュ計算を (A, B) の A の部分だけで行うように修正して、A で始まるペアーが同じ Reduce タスクに振られるようにしています。
Reduce タスクにデータを分割する方法(ParttionerClass)を job.setPartitionerClass() で、明示的に指定することも可能です。(お作法的にはこちらが正解?)