めもめも

このブログに記載の内容は個人の見解であり、必ずしも所属組織の立場、戦略、意見を代表するものではありません。

MapReduce デザインパターン (3)

"word co-occurrence problem" (文書内の近くにペアで出現する単語の数をカウントする処理)の2つの基本パターンである "Pairs" と "Stripes" から、Stripes を見てみます。

ソースコード

今度は、1つのレコード(1行)の中に同時に含まれる単語のペアの出現数をカウントすることを考えて見ます。Pairs パターンの様に、単語のペアを Map の出力の Key にすると、中間レコードの数が非常に多くなります。

そこで、1 つの単語を Key にして、「その単語とペアで出現する単語を Key にした Hash」にペアでの出現数を格納して、この Hash を対応する(Map の出力の) Value として利用します。

※ 以下のコードでは、(A,B)ペアと(B,A)ペアでダブルカウントしていますが、そこは気にしないで下さい。。。

stripes/TextPair.java

package stripes;
/*
テキストのタプルを Key に使用するためのクラスです。

像本のサンプル・コード
http://examples.oreilly.com/9780596521981/htdg-examples-0.1.1.tar.gz
より、下記のソースをここに挿入
htdg-examples-0.1.1/src/main/ch04/java/TextPair.java
*/

stripes/Stripes.java

package stripes;

import java.io.*;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;

public class Stripes extends Configured implements Tool {

    public static void main( String[] args ) throws Exception {
        int status = ToolRunner.run( new Stripes(), args );
        System.exit( status );
    }

    @Override
    public int run( String[] args ) throws Exception {
        Job job = new Job( getConf(), "MapReduce Job" );
        job.setJarByClass( Stripes.class );
        job.setMapperClass( Map.class );
        job.setReducerClass( Reduce.class );

        FileInputFormat.setInputPaths( job, new Path( args[ 0 ] ) );
        FileOutputFormat.setOutputPath( job, new Path( args[ 1 ] ) );

        job.setInputFormatClass( TextInputFormat.class );
        job.setOutputFormatClass( TextOutputFormat.class );

// Output format of Map
        job.setMapOutputKeyClass( Text.class );
        job.setMapOutputValueClass( MapWritable.class );

// Output format of Reduce
        job.setOutputKeyClass( TextPair.class );
        job.setOutputValueClass( IntWritable.class );

        return job.waitForCompletion( true ) ? 0 : -1;
    }

    public static class Map extends
        Mapper<LongWritable, Text, Text, MapWritable> {

        protected void map( LongWritable key, Text value, Context context )
            throws IOException, InterruptedException {

            String line = value.toString();
            String[] words = line.split( "\\s+" );
            MapWritable count = new MapWritable();
            IntWritable v;

            for ( String first : words ) {
                int skipSelf = 0;
                for ( String second : words ) {
                    if ( first.equals( second ) && skipSelf == 0 ) {
                        skipSelf = 1;
                    } else {
                        v = (IntWritable)count.get( new Text( second ) );
                        int i = ( v != null ) ? v.get() + 1 : 1;
                        count.put( new Text( second ), new IntWritable( i ) );
                    }
                }
                context.write( new Text( first ), count );
            }
        }
    }

    public static class Reduce extends
        Reducer<Text, MapWritable, TextPair, IntWritable> {

        private MapWritable sum( MapWritable total, MapWritable count ) {
            int t, c;
            IntWritable v;
            for ( Writable word : count.keySet() ) {
                v = (IntWritable)total.get( word );
                t = ( v != null ) ? v.get() : 0;
                v = (IntWritable)count.get( word );
                c = v.get();
                total.put( word, new IntWritable( t + c ) );
            }
            return total;
        }

        protected void reduce( Text key, Iterable<MapWritable> values,
            Context context) throws IOException, InterruptedException {

            MapWritable total = new MapWritable();
            for ( MapWritable count : values ) {
                total = sum( total, count );
            }

            for ( Writable word : total.keySet() ) {
                context.write( new TextPair( key, (Text)word ),
                    (IntWritable)total.get( word ) );
            }
        }
    }
}

実行例

$ javac stripes/*java; jar -cf stripes.jar stripes/
$ hadoop fs -rmr testout; hadoop jar stripes.jar stripes.Stripes   -D mapred.reduce.tasks=3 Karamazov.txt testout
$  hadoop fs -cat testout/part-*2 | grep "^Karamazo" | head
Karamazov       gained  6
Karamazov               10
Karamazov       never   6
Karamazov       That    3
Karamazov       family  16
Karamazov       household       4
Karamazov       She     7
Karamazov       character--that's       1
Karamazov       _beyond_--I     9
Karamazov       keenly  4

最後のコマンドは、Karamazov と同じ行に現れる単語の一部を表示しています。

考察

(1) Hash を Value に使うために MapWritable クラスを利用していますが、クラスの変換がなかなか面倒です。

(2) ロジックが固まれば、型を固定した専用のサブクラスを定義すればよいのですが、MapReduce を書く場合は、ロジックを考えながら書き下すことも多いので、このあたりは、軽量言語の方がいいのかも知れません。