めもめも

このブログに記載の内容は個人の見解であり、必ずしも所属組織の立場、戦略、意見を代表するものではありません。

MapReduce デザインパターン (2)

"word co-occurrence problem" (文書内の近くにペアで出現する単語の数をカウントする処理)の2つの基本パターンである "Pairs" と "Stripes" から、まずは、Pairs を見てみます。

事前準備

「カラマーゾフの兄弟」のテキストを HDFS に保存しておきます。

$ wget http://www.gutenberg.org/files/28054/28054.zip
$ unzip 28054.zip
$ hadoop fs -copyFromLocal 28054.txt Karamazov.txt

ソースコード

例えば、連続して出現する単語のペアをカウントする場合、次のようなコードが書けます。

pairs/TextPair.java

package pairs;
/*
テキストのタプルを Key に使用するためのクラスです。

像本のサンプル・コード
http://examples.oreilly.com/9780596521981/htdg-examples-0.1.1.tar.gz
より、下記のソースをここに挿入
htdg-examples-0.1.1/src/main/ch04/java/TextPair.java
*/

pairs/Pairs.java

package pairs;

import java.io.*;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;

public class Pairs extends Configured implements Tool {

    public static void main( String[] args ) throws Exception {
        int status = ToolRunner.run( new Pairs(), args );
        System.exit( status );
    }

    @Override
    public int run( String[] args ) throws Exception {
        Job job = new Job( getConf(), "MapReduce Job" );
        job.setJarByClass( Pairs.class );
        job.setMapperClass( Map.class );
        job.setReducerClass( Reduce.class );

        FileInputFormat.setInputPaths( job, new Path( args[ 0 ] ) );
        FileOutputFormat.setOutputPath( job, new Path( args[ 1 ] ) );
        job.setInputFormatClass( TextInputFormat.class );
        job.setOutputFormatClass( TextOutputFormat.class );

// Output format of Map
        job.setMapOutputKeyClass( TextPair.class );
        job.setMapOutputValueClass( IntWritable.class );

// Output format of Reduce
        job.setOutputKeyClass( TextPair.class );
        job.setOutputValueClass( IntWritable.class );

        return job.waitForCompletion( true ) ? 0 : -1;
    }

    public static class Map extends
        Mapper<LongWritable, Text, TextPair, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private TextPair pair = new TextPair();

        protected void map( LongWritable key, Text value, Context context )
            throws IOException, InterruptedException {

            String line = value.toString();
            String[] words = line.split( "\\s+" );

            for ( String s : words ) {
                pair.set( pair.getSecond(), new Text( s ) );
                context.write( pair, one );
            }
        }
    }

    public static class Reduce extends
        Reducer<TextPair, IntWritable, TextPair, IntWritable> {

        protected void reduce( TextPair key, Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException {

            int count = 0;
            for ( IntWritable i : values ) {
                count += i.get();
            }
            context.write( key, new IntWritable( count ) );
        }
    }
}

実行例

$ javac pairs/*java; jar -cf pairs.jar pairs/
$ hadoop fs -rmr testout; hadoop jar pairs.jar pairs.Pairs -D mapred.reduce.tasks=2 Karamazov.txt testout
$ hadoop fs -cat testout/part-* | grep -E "5[0-9][0-9]$"
                597
I       am      542
at      the     589

最後のコマンドは、500〜599回登場したペアを表示しています。

考察

(1) 連続する単語の組を Map の結果の Key にするために、像本の TextPair クラスを利用しています。Hadoop で使用する Key は、WritableComparable を implement している必要があるので、構造化データを利用するときは、そのためのクラスを作ることになります。(ちょっと面倒。)

(2) 検出した単語の組ごとに、真面目に、数字の 1 を Value として出力しています。これを Reduce で全部足し上げる事で、合計値を出しています。

(3) 細かい点ですが、Map 関数の "TextPair pair" をインスタンス変数にしていますので、新しいレコードの先頭の単語は、前のレコードの最後の単語のペアとしてカウントされます。レコード間の単語のつながりを考えない場合は、"TextPair pair" は、Map 関数内のローカル変数にします。