めもめも

このブログに記載の内容は個人の見解であり、必ずしも所属組織の立場、戦略、意見を代表するものではありません。

MapReduce デザインパターン (5)

"Inverted Indexing" を行います。これは、文書番号がついた複数の文書がある時に、単語から文書を検索するための辞書を作成するものですが、最も簡単な辞書としては、

 (Key, Value) = (単語, その単語を含む文書の文書番号のリスト)

というデータの集まりが考えられます。この時、Value 部分のリストは、文書番号の順にソートされているものとします。

※ もう少し実用的な例としては、(文書番号, 該当文書のページランク)というタプルのリストを Value に保存して、ページランクでソートしておくなどが考えられますが、ここでは、簡単のために文書番号だけで考えています。

例えば、Map 関数で文書中の単語をスキャンして、(Key, Value) = (単語, 文書番号) というデータを出力すると、Shuffle 処理によって、Reduce 関数には、自動的に

 (Key, Value) = (単語, その単語を含む文書の文書番号のリスト)

というデータがやってきますが、残念ながら、Value 部分のリストが文書番号順にソートされているわけではありません。このソート処理を Reduce 関数内でやると、リストが巨大な場合、ソート用のメモリーが大量に必要になってしまいます。

そこで、『(1) Map の出力の Key に何をつかうか』というポイントを活用します。Map 関数の出力を

 (Key, Value) = ( (単語, 文書番号), null )

とすると、Reduce 関数には、

 ( (単語, 文書番号), (null, null,・・・・) )

というデータが、単語、および、文書番号の順にソートされて、やってきます。(『デザインパターン (4)』と同様に、同じ単語のデータが同じ Reduce 処理に振られるように、Reduce 処理分割用の Hash を操作するトリックを用います。)従って、Reduce 関数では、データを受け取る順番で、同じ単語に対する文書番号をリストに追加すればOKです。

これは、『ソートが必要なデータを Map 処理の結果の Key に押し込んで、Shuffle 処理にソートを任せてしまう』というテクニックと言えます。

テストデータの準備

先に準備した「カラマーゾフの兄弟」のテキスト 28054.txt を1段落を1行として、各行の先頭に行番号を付与した形式に変換します。このデータを1段落を1文書、行番号を文書番号として解釈します。

hoge.pl

#!/usr/bin/perl

my $s = "";
my $c = 1;
while (<>){
    chop;chop;
    if ( $_ eq "" ) {
        next if ( $s eq "" );
        print $c . $s . "\n";
        $s = ""; $c += 1;
    } else {
        $s .= " " . $_;
    }
}
$ ./hoge.pl < 28054.txt > Karamazov2.txt
$ hadoop fs -copyFromLocal Karamazov2.txt Karamazov2.txt

ソースコード

Map 関数の出力の Key となる (単語, 文書番号) を表す (Text, IntWritable) タプルのクラス TextIntPair を用意します。

invert/TextIntPair.java

package invert;

import java.io.*;
import org.apache.hadoop.io.*;

public class TextIntPair implements WritableComparable<TextIntPair> {

  private Text first;
  private IntWritable second;

  public TextIntPair() {
    set(new Text(), new IntWritable());
  }

  public TextIntPair(Text first, IntWritable second) {
    set(first, second);
  }

  public void set(Text first, IntWritable second) {
    this.first = first;
    this.second = second;
  }

  public Text getFirst() {
    return first;
  }

  public IntWritable getSecond() {
    return second;
  }

  @Override
  public void write(DataOutput out) throws IOException {
    first.write(out);
    second.write(out);
  }

  @Override
  public void readFields(DataInput in) throws IOException {
    first.readFields(in);
    second.readFields(in);
  }

  @Override
  public int hashCode() {
    return first.hashCode();
  }

  @Override
  public boolean equals(Object o) {
    if (o instanceof TextIntPair) {
      TextIntPair tp = (TextIntPair) o;
      return first.equals(tp.first) && second.equals(tp.second);
    }
    return false;
  }

  @Override
  public String toString() {
    return first.toString() + "\t" + second.toString();
  }

  @Override
  public int compareTo(TextIntPair tp) {
    int cmp = first.compareTo(tp.first);
    if (cmp != 0) {
      return cmp;
    }
    return second.compareTo(tp.second);
  }
}

invert/Invert.java

package invert;

import java.io.*;
import java.util.*;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.*;

public class Invert extends Configured implements Tool {

    public static void main( String[] args ) throws Exception {
        int status = ToolRunner.run( new Invert(), args );
        System.exit( status );
    }

    @Override
    public int run( String[] args ) throws Exception {
        Job job = new Job( getConf(), "MapReduce Job" );
        job.setJarByClass( Invert.class );
        job.setMapperClass( Map.class );
        job.setReducerClass( Reduce.class );

        FileInputFormat.setInputPaths( job, new Path( args[ 0 ] ) );
        FileOutputFormat.setOutputPath( job, new Path( args[ 1 ] ) );

        job.setInputFormatClass( TextInputFormat.class );
        job.setOutputFormatClass( TextOutputFormat.class );

// Output format of Map
        job.setMapOutputKeyClass( TextIntPair.class );
        job.setMapOutputValueClass( NullWritable.class );

// Output format of Reduce
        job.setOutputKeyClass( Text.class );
        job.setOutputValueClass( Text.class );

        return job.waitForCompletion( true ) ? 0 : -1;
    }

    public static class Map extends
            Mapper<LongWritable, Text, TextIntPair, NullWritable> {

        private static final NullWritable dummy = NullWritable.get();

        protected void map( LongWritable key, Text value, Context context )
            throws IOException, InterruptedException {

            TextIntPair pair = new TextIntPair();
            String line = value.toString();
            String[] words = line.split( "\\s+" );

            int docid = Integer.parseInt( words[ 0 ] );
            for ( int i = 1; i < words.length; i++ ) {
                pair.set( new Text( words[ i ] ), new IntWritable ( docid ) );
                context.write( pair, dummy );
            }
        }
    }

    public static class Reduce extends
        Reducer<TextIntPair, NullWritable, Text, Text> {

        private String docidList = "";
        private Text preword = null;

        protected void reduce( TextIntPair key, Iterable<NullWritable> values,
            Context context) throws IOException, InterruptedException {

            Text word = key.getFirst();
            IntWritable docid = key.getSecond();

            if ( preword == null ) {
                preword = new Text( word.toString() );
            }

            if ( ! preword.equals( word ) ) {
                context.write( preword, new Text( docidList ) );
                preword = new Text( word.toString() );
                docidList = "";
            }
            docidList += docid.toString() + ",";
        }

        protected void cleanup( Context context )
                throws IOException, InterruptedException{
            context.write( preword, new Text( docidList ) );
        }
    }
}

実行例

$ javac invert/*java; jar -cvf invert.jar invert/
$ hadoop fs -rmr testout; hadoop jar invert.jar invert.Invert -D mapred.reduce.tasks=3 Karamazov2.txt testout
$ hadoop fs -cat testout/part-r-00000 | grep murder
murder  475,852,1141,2051,2053,2116,2280,2457,2566,2640,2851,3322,3342,3343,3355,3413,3428,3492,3546,3570,3571,3629,4493,4533,4548,4549,4586,4812,4883,4906,4909,4940,4971,4991,4992,4996,5030,5043,5094,5124,5314,5317,5417,5421,5455,5489,5519,5563,5564,5570,5577,5579,5580,5581,5583,5584,5585,5591,5609,5612,5616,5686,5696,5697,5700,5703,5704,5710,5714,5718,5719,5734,5735,5736,5744,5745,5762,5785,
murder!'        5564,
murder, 1698,2051,4691,4915,4964,4988,5090,5098,5108,5428,5502,5526,5534,5576,5577,5579,5581,5584,5676,5687,5696,5718,
murder--hatred, 5578,
murder; 419,2052,2122,
murdered        876,880,1655,1696,2044,2049,2051,2056,2104,2105,3355,3379,3384,3394,3432,3529,3571,3612,3629,3730,3735,4548,4550,4554,4781,4815,4883,4919,4954,4956,4958,4992,5037,5042,5293,5294,5464,5470,5471,5500,5502,5516,5519,5578,5579,5581,5583,5585,5590,5609,5612,5615,5677,5678,5697,5701,5704,5705,5743,5762,5845,5863,
murdered,       5584,5700,
murdered....    3384,
murderer!       4992,5801,
murderer!'      5601,
murderer,"      5504,5524,
murderer?       407,409,4709,5095,
murderers,      5090,5825,

考察

(1) Reduce 関数は、(Key, Value) を順次受け取りながら、Key に含まれる単語が変化するタイミングで、データの出力を行っています。最後の単語だけは、出力のタイミングがなくなるので、(Reduce 関数に渡すデータが無くなったタイミングで呼び出される)cleanup 関数で出力しています。

(2) TextIntPair クラスの Hash を Text 部分だけで計算することで、同じ単語のデータは、同じ Reduce タスクに流れるように操作しています。

            if ( preword == null ) {
                preword = new Text( word.toString() );
            }

            if ( ! preword.equals( word ) ) {
                context.write( preword, new Text( docidList ) );
                preword = new Text( word.toString() );
                docidList = "";
            }

(3) 細かい点ですが、Reduce 関数で、新しい処理対象の単語を変数 preword に代入する際に、preword = word とするのではなく、新しいオブジェクトを作って代入しています。これは、Reduce 関数の引数に与えられる (Key, Value) の値は、新しいオブジェクトが (Key, Value) に代入されるのではなく、(Key, Value) が指しているオブジェクトの内容が直接変更されるためです。preword = word とすると、word が変化すると、一緒に、preword も変化してうまく動きません。