めもめも

このブログに記載の内容は個人の見解であり、必ずしも所属組織の立場、戦略、意見を代表するものではありません。

Jaql入門(番外編) - バスケット解析

何をするかと言うと。。。

ASCII.technologies 2011年1月号で、リッテルの清田さんが、バスケット解析処理を Hive で記述した例を掲載しています。同じ事を Jaql でやって、Hive との違いを味わってみます。

前準備

使用するデータは、Pig に付属の Web 検索ログのサンプルです。タブ区切りで、「ユーザID, タイムスタンプ(YYMMDDhhmmss), 検索文字列」が並びます。

$ wget http://www.apache.org/dist/hadoop/pig/pig-0.7.0/pig-0.7.0.tar.gz
$ tar -xvzf pig-0.7.0.tar.gz
$ cd pig-0.7.0/tutorial/data
$ bunzip excite.log.bz2
$ head excite.log
9593C58F7C1C5CE4        970916072134    levis
9593C58F7C1C5CE4        970916072311    levis strause & co
9593C58F7C1C5CE4        970916072339    levis 501 jeans
45531846E8E7C127        970916065859
45531846E8E7C127        970916065935
45531846E8E7C127        970916070105    "brazillian soccer teams"
45531846E8E7C127        970916070248    "brazillian soccer"
45531846E8E7C127        970916071154    "population of maldives"
082A665972806A62        970916123431    pegasus
F6C8FFEAA26F1778        970916070130    "alicia silverstone" cutest crush batgirl babysitter clueless

不正データの事前クレンジングは、ちょっと手抜きで Perl でやってから、HDFS に格納します。

$ perl -nle '$_ =~ s/[^ \w\t\n]//g; $_ =~ m/(^\w+\t\d+\t[ \w]*$)/; print $1 if $1' excite.log > excite_clean.log
$ hadoop fs -copyFromLocal excite_clean.log excite.log

やりたいこと

第一段階として、[ユーザID, 検索語1, 検索語2] (検索語2は、検索語1の後、10分以内に同じユーザが検索したもの)という三つ組みのデータ群を作ります。この時、重複する三つ組みは1つにまとめます。(同じユーザが、「検索語1⇒検索語2」という検索を何度も繰り返しても、それは1回とカウントする。)

第二段階として、それぞれの [検索語1, 検索語2] のペアに対して、同じパターンの検索をしているユーザが何人いるのかをカウントして、[ユーザ数, 検索語1, 検索語2] というデータを作って、回数の降順にソートします。

この時、ユーザ数が多いデータは、沢山の人がその「検索語1⇒検索語2」のパターンで検索をしており、この2つの検索語は関連性が高いという事が結論づけられます。

処理のポイント

清田さんの記事に詳しい説明があるので、ぜひ、そちらを読んでください。

・・・で終わるとさみしいので、簡単に説明します。

第一段階の処理の際に、同じユーザの全ての検索データを付き合わせて、10分以内の検索ペアを発見しようとすると、例えば、そのユーザが 1000 回の検索をしてると、1000 * 1000 個のデータ比較が必要です。これでは、データ量が多くなるととても処理がスケールしません。

そこで、例えば、事前に各データを適当な時間帯でグループ分けして、各グループ内のデータのみを比較するという作戦が考えられます。が、この場合、隣の時間帯グループのデータが比較できないので、隣の時間帯同士で、お互いに10分以内の間隔のデータが抽出できなくなります。

そこで、あるデータを本来の時間帯のグループと、さらに、1つ後ろの時間帯のグループの両方に入れてあげます。そうすると、時間帯を跨った比較もできるようになります。

Jaqlで書いてみる

上述の処理をおもむろにJaqlで書いてみます。

$ cat basket.jql
// データ読み込み
$input =
  read(del("excite.log",
       { delimiter: '\t',
         schema: schema { id: string, ts: long, query: string? }
       }
  ));

// 時間帯を付与したデータを作成
$withSlot1 = $input
  -> transform
       [ { id: $.id, ts: $.ts, query: $.query, slot: $.ts / 1000 },    // 本来の時間帯
         { id: $.id, ts: $.ts, query: $.query, slot: $.ts / 1000 + 1 } // 1つ後ろの時間帯
       ]
  -> expand;

// 比較のために同じデータをもう一つ作成
$withSlot2 = $input
  -> transform
       [ { id: $.id, ts: $.ts, query: $.query, slot: $.ts / 1000 },
         { id: $.id, ts: $.ts, query: $.query, slot: $.ts / 1000 + 1 }
       ]
  -> expand;

// 第一段階の処理
$firstStage =
  join $withSlot1, $withSlot2
    where $withSlot1.slot == $withSlot2.slot           // 同じ時間帯で、
      and $withSlot1.id == $withSlot2.id               // 同じユーザのデータをまとめる。
    into { slot: $withSlot1.slot, id: $withSlot1.id,
           ts1: $withSlot1.ts, query1: $withSlot1.query,
           ts2: $withSlot2.ts, query2: $withSlot2.query }
  -> filter $.query1 != $.query2                       // 同じ検索語のペアを除外して、
        and $.ts1 < $.ts2 and $.ts2 < $.ts1 + 1000     // 検索語1⇒検索語2が10分以内を抽出。
  -> transform { triplet: [$.id, $.query1, $.query2] } // 三つ組み形式のデータを作る。
  -> group by $triplet = $.triplet                     // 同一の三つ組みは1つにまとめる。
       into { id: $triplet[ 0 ], queryPair: [ $triplet[ 1 ], $triplet[ 2 ] ] };

// 第二段階の処理
$secondStage = $firstStage
  -> group by $queryPair = $.queryPair                       // 同一の「検索語1,検索語2」でまとめて、
       into { count: count($[*].id), queryPair: $queryPair } // ユーザ数をカウントする。
  -> sort by [ $.count desc ];                               // ユーザ数の降順にソート。

// 最終結果を HDFS に保存
$secondStage -> write(hdfs("basket.out"));

quit;

実行方法

# jaqlshell -cb basket.jql

実行結果

メモリ1GB, 2vCPU(Xeon1.5GHz) の KVM の仮想マシン4台のクラスタで、5分弱。4段の MapReduce が実行されました。清田さんの記事とほぼ同等の結果です。恐らく、生成される MapReduce の中身のロジックは、ほとんど同じなのでしょう。

トップ5の結果を見ると、記事と同一の結果が得られていることが分かります。

# jaqlshell -c
jaql> read(hdfs("basket.out")) -> top 5;
[
  {
    "count": 53,
    "queryPair": [
      "chat",
      "chat rooms "
    ]
  },
  {
    "count": 44,
    "queryPair": [
      "playboy",
      "playboy playmates "
    ]
  },
  {
    "count": 38,
    "queryPair": [
      "yahoo",
      "yahoo yahoos "
    ]
  },
  {
    "count": 32,
    "queryPair": [
      "diana",
      "diana princess "
    ]
  },
  {
    "count": 30,
    "queryPair": [
      "playboy",
      "playboy nudity "
    ]
  }
]

所感

記事に掲載されている Hive のコードとぜひ比較してください。ロジックは同じなのですが、SQL そのままの Hive に比べて、Jaql の方は、手続き型の記述になっています。

好みの問題だと思いますが、SQL を書き慣れた人なら Hive がよいでしょうし、手続き型の思考になれた人なら Jaql の方がずっと手早く書けると思います。