めもめも

このブログに記載の内容は個人の見解であり、必ずしも所属組織の立場、戦略、意見を代表するものではありません。

MapReduce デザインパターン (4)

"word co-occurrence problem" の Pairs パターンの応用問題を考えます。

例えば、(You, are), (You, don't) ・・・など You から始まる単語のペアが全部で 120 回出現しており、その中で、(You, are) が 80 回だとすると、全体に対する割合は、80 / 120 * 100 = 66% と計算できます。

このような割合を MapReduce で計算する方法を考えます。

なお、以下の手法は、"order inversion" と呼ばれます。(この割合の分母の様に、全データを見て『最後』に決まる情報を Reduce 処理の『最初』に求めさせる手法、という意味だと思います。)

ソースコード

relfreq/TextPair.java

package relfreq;
/*
テキストのタプルを Key に使用するためのクラスです。

像本のサンプル・コード
http://examples.oreilly.com/9780596521981/htdg-examples-0.1.1.tar.gz
より、下記のソースをここに挿入
htdg-examples-0.1.1/src/main/ch04/java/TextPair.java
*/

/* 下記の hashCode() を修正します。*/
  @Override
  public int hashCode() {
//    return first.hashCode() * 163 + second.hashCode();
    return first.hashCode();
  }

relfreq/RelFreq.java

package relfreq;

import java.io.*;
import java.util.HashMap;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;

public class RelFreq extends Configured implements Tool {

    public static void main( String[] args ) throws Exception {
        int status = ToolRunner.run( new RelFreq(), args );
        System.exit( status );
    }

    @Override
    public int run( String[] args ) throws Exception {
        Job job = new Job( getConf(), "MapReduce Job" );
        job.setJarByClass( RelFreq.class );
        job.setMapperClass( Map.class );
        job.setReducerClass( Reduce.class );

        FileInputFormat.setInputPaths( job, new Path( args[ 0 ] ) );
        FileOutputFormat.setOutputPath( job, new Path( args[ 1 ] ) );

        job.setInputFormatClass( TextInputFormat.class );
        job.setOutputFormatClass( TextOutputFormat.class );

// Output format of Map
        job.setMapOutputKeyClass( TextPair.class );
        job.setMapOutputValueClass( IntWritable.class );

// Output format of Reduce
        job.setOutputKeyClass( TextPair.class );
        job.setOutputValueClass( FloatWritable.class );

        return job.waitForCompletion( true ) ? 0 : -1;
    }

    public static class Map extends
        Mapper<LongWritable, Text, TextPair, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private TextPair pair = new TextPair();
        private TextPair freq = new TextPair();

        protected void map( LongWritable key, Text value, Context context )
            throws IOException, InterruptedException {

            String line = value.toString();
            String[] words = line.split( "\\s+" );

            for ( String s : words ) {
                pair.set( pair.getSecond(), new Text( s ) );
                context.write( pair, one );
                freq.set( pair.getFirst(), new Text( "" ) );
                context.write( freq, one );
            }
        }
    }

    public static class Reduce extends
        Reducer<TextPair, IntWritable, TextPair, FloatWritable> {

        private HashMap<Text, Integer> total = new HashMap<Text, Integer>();

        protected void reduce( TextPair key, Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException {

            int count = 0;
            for ( IntWritable i : values ) {
                count += i.get();
            }
            if ( key.getSecond().toString().compareTo( "" ) == 0 ) {
                total.put( key.getFirst(), count );
            } else {
                context.write( key, new FloatWritable(
                    count * 100 / total.get( key.getFirst() ).intValue() ) );
            }
        }
    }
}

実行例

$ javac relfreq/*java; jar -cf relfreq.jar relfreq/
$ hadoop fs -rmr testout; hadoop jar relfreq.jar relfreq.RelFreq -D mapred.reduce.tasks=3 Karamazov.txt testout
$ hadoop fs -cat testout/part-r-00002 | grep "^Here"
Here    Alexey  2.0
Here    Fyodor  2.0
Here    I       10.0
Here    I've    2.0
Here    Ippolit 8.0
Here    Ivan    2.0
Here    and     2.0
Here    are     10.0
Here    at      2.0
Here    he      8.0
Here    in      2.0
Here    is      10.0
Here    it      12.0
Here    or      2.0
Here    she     2.0
Here    the     4.0
Here    they    2.0
Here    was     4.0
Here    we      4.0
Here    when    2.0
Here    you,    2.0

最後のコマンドは、(結果の数値的にわかりやすい例として)Here の後ろにペアで現れる単語を表示しています。

考察

            for ( String s : words ) {
                pair.set( pair.getSecond(), new Text( s ) );
                context.write( pair, one );
                freq.set( pair.getFirst(), new Text( "" ) );
                context.write( freq, one );
            }

(1) Map の出力の際に ((A,B), 1) と同時に ((A,""), 1) という余計なエントリーを出力しています。この余計なエントリーを Reduce 側で合計すると、A で始まるペアー全体の合計数が計算できます。(「Map は、レコード毎の処理結果以外の追加情報を出力しても構わない。」の利用ですね。)

            int count = 0;
            for ( IntWritable i : values ) {
                count += i.get();
            }
            if ( key.getSecond().toString().compareTo( "" ) == 0 ) {
                total.put( key.getFirst(), count );
            } else {
                context.write( key, new FloatWritable(
                    count * 100 / total.get( key.getFirst() ).intValue() ) );
            }

(2) こちらが対応する Reduce 処理。Map の結果は Key でソートされて Reduce に渡るので、A で始まるペアーの先頭に、(A, "") が必ず来るので、(A, B) のカウント時には、割り算の分母((A,"")の合計)は計算が終わっている、というトリックも使われています。今回は、(A, "") という Key が自然に、他の (A, B) より先にソートされましたが、Key のソート関数を明示的に定義することでコントロールする方法もあります。

/* 下記の hashCode() を修正します。*/
  @Override
  public int hashCode() {
//    return first.hashCode() * 163 + second.hashCode();
    return first.hashCode();
  }

(3) Key に使っている TextPair の hashCode() を修正しています。これは、(A, ""), (A, B), (A, C) など、A で始まるペアーがまとめて同じ Reduce タスクに流れ込むためのトリックです。デフォルトでは、Key 全体のハッシュで複数の Reduce タスクにばらまかれるので、(A, ""), (A, B), (A, C) などが異なる Reduce タスクに行く可能性があり、Reduce タスクの最初に (A, "") の合計で分母を求める作戦が破綻してしまいます。ここでは、ハッシュ計算を (A, B) の A の部分だけで行うように修正して、A で始まるペアーが同じ Reduce タスクに振られるようにしています。

Reduce タスクにデータを分割する方法(ParttionerClass)を job.setPartitionerClass() で、明示的に指定することも可能です。(お作法的にはこちらが正解?)