"Inverted Indexing" を行います。これは、文書番号がついた複数の文書がある時に、単語から文書を検索するための辞書を作成するものですが、最も簡単な辞書としては、
(Key, Value) = (単語, その単語を含む文書の文書番号のリスト)
というデータの集まりが考えられます。この時、Value 部分のリストは、文書番号の順にソートされているものとします。
※ もう少し実用的な例としては、(文書番号, 該当文書のページランク)というタプルのリストを Value に保存して、ページランクでソートしておくなどが考えられますが、ここでは、簡単のために文書番号だけで考えています。
例えば、Map 関数で文書中の単語をスキャンして、(Key, Value) = (単語, 文書番号) というデータを出力すると、Shuffle 処理によって、Reduce 関数には、自動的に
(Key, Value) = (単語, その単語を含む文書の文書番号のリスト)
というデータがやってきますが、残念ながら、Value 部分のリストが文書番号順にソートされているわけではありません。このソート処理を Reduce 関数内でやると、リストが巨大な場合、ソート用のメモリーが大量に必要になってしまいます。
そこで、『(1) Map の出力の Key に何をつかうか』というポイントを活用します。Map 関数の出力を
(Key, Value) = ( (単語, 文書番号), null )
とすると、Reduce 関数には、
( (単語, 文書番号), (null, null,・・・・) )
というデータが、単語、および、文書番号の順にソートされて、やってきます。(『デザインパターン (4)』と同様に、同じ単語のデータが同じ Reduce 処理に振られるように、Reduce 処理分割用の Hash を操作するトリックを用います。)従って、Reduce 関数では、データを受け取る順番で、同じ単語に対する文書番号をリストに追加すればOKです。
これは、『ソートが必要なデータを Map 処理の結果の Key に押し込んで、Shuffle 処理にソートを任せてしまう』というテクニックと言えます。
テストデータの準備
先に準備した「カラマーゾフの兄弟」のテキスト 28054.txt を1段落を1行として、各行の先頭に行番号を付与した形式に変換します。このデータを1段落を1文書、行番号を文書番号として解釈します。
hoge.pl
#!/usr/bin/perl my $s = ""; my $c = 1; while (<>){ chop;chop; if ( $_ eq "" ) { next if ( $s eq "" ); print $c . $s . "\n"; $s = ""; $c += 1; } else { $s .= " " . $_; } }
$ ./hoge.pl < 28054.txt > Karamazov2.txt $ hadoop fs -copyFromLocal Karamazov2.txt Karamazov2.txt
ソースコード
Map 関数の出力の Key となる (単語, 文書番号) を表す (Text, IntWritable) タプルのクラス TextIntPair を用意します。
invert/TextIntPair.java
package invert; import java.io.*; import org.apache.hadoop.io.*; public class TextIntPair implements WritableComparable<TextIntPair> { private Text first; private IntWritable second; public TextIntPair() { set(new Text(), new IntWritable()); } public TextIntPair(Text first, IntWritable second) { set(first, second); } public void set(Text first, IntWritable second) { this.first = first; this.second = second; } public Text getFirst() { return first; } public IntWritable getSecond() { return second; } @Override public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } @Override public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } @Override public int hashCode() { return first.hashCode(); } @Override public boolean equals(Object o) { if (o instanceof TextIntPair) { TextIntPair tp = (TextIntPair) o; return first.equals(tp.first) && second.equals(tp.second); } return false; } @Override public String toString() { return first.toString() + "\t" + second.toString(); } @Override public int compareTo(TextIntPair tp) { int cmp = first.compareTo(tp.first); if (cmp != 0) { return cmp; } return second.compareTo(tp.second); } }
invert/Invert.java
package invert; import java.io.*; import java.util.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.*; public class Invert extends Configured implements Tool { public static void main( String[] args ) throws Exception { int status = ToolRunner.run( new Invert(), args ); System.exit( status ); } @Override public int run( String[] args ) throws Exception { Job job = new Job( getConf(), "MapReduce Job" ); job.setJarByClass( Invert.class ); job.setMapperClass( Map.class ); job.setReducerClass( Reduce.class ); FileInputFormat.setInputPaths( job, new Path( args[ 0 ] ) ); FileOutputFormat.setOutputPath( job, new Path( args[ 1 ] ) ); job.setInputFormatClass( TextInputFormat.class ); job.setOutputFormatClass( TextOutputFormat.class ); // Output format of Map job.setMapOutputKeyClass( TextIntPair.class ); job.setMapOutputValueClass( NullWritable.class ); // Output format of Reduce job.setOutputKeyClass( Text.class ); job.setOutputValueClass( Text.class ); return job.waitForCompletion( true ) ? 0 : -1; } public static class Map extends Mapper<LongWritable, Text, TextIntPair, NullWritable> { private static final NullWritable dummy = NullWritable.get(); protected void map( LongWritable key, Text value, Context context ) throws IOException, InterruptedException { TextIntPair pair = new TextIntPair(); String line = value.toString(); String[] words = line.split( "\\s+" ); int docid = Integer.parseInt( words[ 0 ] ); for ( int i = 1; i < words.length; i++ ) { pair.set( new Text( words[ i ] ), new IntWritable ( docid ) ); context.write( pair, dummy ); } } } public static class Reduce extends Reducer<TextIntPair, NullWritable, Text, Text> { private String docidList = ""; private Text preword = null; protected void reduce( TextIntPair key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { Text word = key.getFirst(); IntWritable docid = key.getSecond(); if ( preword == null ) { preword = new Text( word.toString() ); } if ( ! preword.equals( word ) ) { context.write( preword, new Text( docidList ) ); preword = new Text( word.toString() ); docidList = ""; } docidList += docid.toString() + ","; } protected void cleanup( Context context ) throws IOException, InterruptedException{ context.write( preword, new Text( docidList ) ); } } }
実行例
$ javac invert/*java; jar -cvf invert.jar invert/ $ hadoop fs -rmr testout; hadoop jar invert.jar invert.Invert -D mapred.reduce.tasks=3 Karamazov2.txt testout $ hadoop fs -cat testout/part-r-00000 | grep murder murder 475,852,1141,2051,2053,2116,2280,2457,2566,2640,2851,3322,3342,3343,3355,3413,3428,3492,3546,3570,3571,3629,4493,4533,4548,4549,4586,4812,4883,4906,4909,4940,4971,4991,4992,4996,5030,5043,5094,5124,5314,5317,5417,5421,5455,5489,5519,5563,5564,5570,5577,5579,5580,5581,5583,5584,5585,5591,5609,5612,5616,5686,5696,5697,5700,5703,5704,5710,5714,5718,5719,5734,5735,5736,5744,5745,5762,5785, murder!' 5564, murder, 1698,2051,4691,4915,4964,4988,5090,5098,5108,5428,5502,5526,5534,5576,5577,5579,5581,5584,5676,5687,5696,5718, murder--hatred, 5578, murder; 419,2052,2122, murdered 876,880,1655,1696,2044,2049,2051,2056,2104,2105,3355,3379,3384,3394,3432,3529,3571,3612,3629,3730,3735,4548,4550,4554,4781,4815,4883,4919,4954,4956,4958,4992,5037,5042,5293,5294,5464,5470,5471,5500,5502,5516,5519,5578,5579,5581,5583,5585,5590,5609,5612,5615,5677,5678,5697,5701,5704,5705,5743,5762,5845,5863, murdered, 5584,5700, murdered.... 3384, murderer! 4992,5801, murderer!' 5601, murderer," 5504,5524, murderer? 407,409,4709,5095, murderers, 5090,5825,
考察
(1) Reduce 関数は、(Key, Value) を順次受け取りながら、Key に含まれる単語が変化するタイミングで、データの出力を行っています。最後の単語だけは、出力のタイミングがなくなるので、(Reduce 関数に渡すデータが無くなったタイミングで呼び出される)cleanup 関数で出力しています。
(2) TextIntPair クラスの Hash を Text 部分だけで計算することで、同じ単語のデータは、同じ Reduce タスクに流れるように操作しています。
if ( preword == null ) { preword = new Text( word.toString() ); } if ( ! preword.equals( word ) ) { context.write( preword, new Text( docidList ) ); preword = new Text( word.toString() ); docidList = ""; }
(3) 細かい点ですが、Reduce 関数で、新しい処理対象の単語を変数 preword に代入する際に、preword = word とするのではなく、新しいオブジェクトを作って代入しています。これは、Reduce 関数の引数に与えられる (Key, Value) の値は、新しいオブジェクトが (Key, Value) に代入されるのではなく、(Key, Value) が指しているオブジェクトの内容が直接変更されるためです。preword = word とすると、word が変化すると、一緒に、preword も変化してうまく動きません。