"word co-occurrence problem" (文書内の近くにペアで出現する単語の数をカウントする処理)の2つの基本パターンである "Pairs" と "Stripes" から、Stripes を見てみます。
ソースコード
今度は、1つのレコード(1行)の中に同時に含まれる単語のペアの出現数をカウントすることを考えて見ます。Pairs パターンの様に、単語のペアを Map の出力の Key にすると、中間レコードの数が非常に多くなります。
そこで、1 つの単語を Key にして、「その単語とペアで出現する単語を Key にした Hash」にペアでの出現数を格納して、この Hash を対応する(Map の出力の) Value として利用します。
※ 以下のコードでは、(A,B)ペアと(B,A)ペアでダブルカウントしていますが、そこは気にしないで下さい。。。
stripes/TextPair.java
package stripes; /* テキストのタプルを Key に使用するためのクラスです。 像本のサンプル・コード http://examples.oreilly.com/9780596521981/htdg-examples-0.1.1.tar.gz より、下記のソースをここに挿入 htdg-examples-0.1.1/src/main/ch04/java/TextPair.java */
stripes/Stripes.java
package stripes; import java.io.*; import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.mapreduce.lib.output.*; import org.apache.hadoop.util.*; public class Stripes extends Configured implements Tool { public static void main( String[] args ) throws Exception { int status = ToolRunner.run( new Stripes(), args ); System.exit( status ); } @Override public int run( String[] args ) throws Exception { Job job = new Job( getConf(), "MapReduce Job" ); job.setJarByClass( Stripes.class ); job.setMapperClass( Map.class ); job.setReducerClass( Reduce.class ); FileInputFormat.setInputPaths( job, new Path( args[ 0 ] ) ); FileOutputFormat.setOutputPath( job, new Path( args[ 1 ] ) ); job.setInputFormatClass( TextInputFormat.class ); job.setOutputFormatClass( TextOutputFormat.class ); // Output format of Map job.setMapOutputKeyClass( Text.class ); job.setMapOutputValueClass( MapWritable.class ); // Output format of Reduce job.setOutputKeyClass( TextPair.class ); job.setOutputValueClass( IntWritable.class ); return job.waitForCompletion( true ) ? 0 : -1; } public static class Map extends Mapper<LongWritable, Text, Text, MapWritable> { protected void map( LongWritable key, Text value, Context context ) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split( "\\s+" ); MapWritable count = new MapWritable(); IntWritable v; for ( String first : words ) { int skipSelf = 0; for ( String second : words ) { if ( first.equals( second ) && skipSelf == 0 ) { skipSelf = 1; } else { v = (IntWritable)count.get( new Text( second ) ); int i = ( v != null ) ? v.get() + 1 : 1; count.put( new Text( second ), new IntWritable( i ) ); } } context.write( new Text( first ), count ); } } } public static class Reduce extends Reducer<Text, MapWritable, TextPair, IntWritable> { private MapWritable sum( MapWritable total, MapWritable count ) { int t, c; IntWritable v; for ( Writable word : count.keySet() ) { v = (IntWritable)total.get( word ); t = ( v != null ) ? v.get() : 0; v = (IntWritable)count.get( word ); c = v.get(); total.put( word, new IntWritable( t + c ) ); } return total; } protected void reduce( Text key, Iterable<MapWritable> values, Context context) throws IOException, InterruptedException { MapWritable total = new MapWritable(); for ( MapWritable count : values ) { total = sum( total, count ); } for ( Writable word : total.keySet() ) { context.write( new TextPair( key, (Text)word ), (IntWritable)total.get( word ) ); } } } }
実行例
$ javac stripes/*java; jar -cf stripes.jar stripes/ $ hadoop fs -rmr testout; hadoop jar stripes.jar stripes.Stripes -D mapred.reduce.tasks=3 Karamazov.txt testout $ hadoop fs -cat testout/part-*2 | grep "^Karamazo" | head Karamazov gained 6 Karamazov 10 Karamazov never 6 Karamazov That 3 Karamazov family 16 Karamazov household 4 Karamazov She 7 Karamazov character--that's 1 Karamazov _beyond_--I 9 Karamazov keenly 4
最後のコマンドは、Karamazov と同じ行に現れる単語の一部を表示しています。
考察
(1) Hash を Value に使うために MapWritable クラスを利用していますが、クラスの変換がなかなか面倒です。
(2) ロジックが固まれば、型を固定した専用のサブクラスを定義すればよいのですが、MapReduce を書く場合は、ロジックを考えながら書き下すことも多いので、このあたりは、軽量言語の方がいいのかも知れません。