読者です 読者をやめる 読者になる 読者になる

めもめも

このブログに記載の内容は個人の見解であり、必ずしも所属組織の立場、戦略、意見を代表するものではありません。

Cassandraソース解析(1) - ノード起動時の動きから読んでいく

訳あって、Cassandra のソースを読んでいます。バージョンは 0.6.8 です。

(以下、執筆中です。日々変わります。)

主な登場人物

CassandraDaemon : 最初に起動するデーモン
StorageService : データアクセス処理の中心となるシングルトン
CustomTThreadPoolServer : thrift クライアントからの接続を受けるスレッド群

Gossiper : ゴシップで情報交換するためのシングルトン(いわゆるクラスタサービス)
 ⇒ Gossiper.instance.register( Object ) で Object を登録すると、Gossip が他ノードに関するイベントを検知すると、以下のハンドラを呼び出すようになる。

  • ./gms/Gossiper.java: subscriber.onRemove(endpoint);
  • ./gms/Gossiper.java: subscriber.onJoin(ep, epState);
  • ./gms/Gossiper.java: subscriber.onChange(addr, stateName, state);
  • ./gms/Gossiper.java: subscriber.onAlive(addr, epState);
  • ./gms/Gossiper.java: subscriber.onDead(addr, epState);
  • ./net/ResponseVerbHandler.java: subscriber.receiveTiming(host, latency);

 ⇒ Gossiper.instance.addLocalApplicationState() でばらまきたい情報を登録する

MessagingService : メッセージングサービス・シングルトン
StorageLoadBalancer : 各ノードの負荷情報を持っているシングルトン
 LoadDisseminator : StorageLoadBalancer の持つ付加情報を Gossip でばらまく人

Gossiper に登録する人々

StorageService(シングルトン)

ノード起動時の流れ

(1) CassandraDaemon の起動

thrift/CassandraDaemon.java

 main()
  daemon = new CassandraDaemon();
  daemon.setup()
   StorageService.instance.initServer() // StorageService をシングルトンで起動 (2)

       // now we start listening for clients
   cassandraServer = new CassandraServer();
   processor = new Cassandra.Processor(cassandraServer);
   serverEngine = new CustomTThreadPoolServer(new TProcessorFactory(processor),・・・

  daemon.start()
   serverEngine.serve(); // thrift クライアントからの接続を受けるループに入る
  // ここに来たらデーモン停止処理に入る。
(2) StorageService の起動

service/StorageService.java

 initServer()
  setMode("Joining: getting bootstrap token", true);
    // "管理ツールでステータスを表示した時に出るメッセージ", ログにも吐くかどうか?

  storageMetadata_ = SystemTable.initMetadata();

  // be certain that the recorded clustername matches what the user specified
     // Config に記載のクラスタ名と SSTable に記載のクラスタ名が一致することを確認

        DatabaseDescriptor.createAllDirectories();

        Gossiper.instance.register(this); // Gossiper に登録

        Gossiper.instance.start(FBUtilities.getLocalAddress(), storageMetadata_.getGeneration()); 
    // needed for node-ring gathering.  // Gossiper の起動 (4)
       // この段階でローカルノード用の EndPointState が用意されて、ハートビートのステータスが入る。

        MessagingService.instance.listen(FBUtilities.getLocalAddress()); // MessagingService 起動

        StorageLoadBalancer.instance.startBroadcasting(); // StorageLoadBalancer 起動 (3)
       // すでに用意済みの EndPointState に loadInfo_ を定期的に追加登録する。

// 新規ノードを起動してクラスタに参加する場合

     setMode("Joining: getting load information", true);
     StorageLoadBalancer.instance.waitForLoadInfo(); // 一瞬待つだけ。
 
     Token token = BootStrapper.getBootstrapToken(tokenMetadata_, StorageLoadBalancer.instance.getLoadInfo());
      // 担当する token をもらって、
     startBootstrap(token); // Bootstrap を開始。

            // don't finish startup (enabling thrift) until after bootstrap is done
            while (isBootstrapMode) { Thread.sleep(100); } // Bootstrap 完了を待って終了

// クラスタに登録済みのノードの場合
            SystemTable.setBootstrapped(true);
            Token token = storageMetadata_.getToken();
            tokenMetadata_.updateNormalToken(token, FBUtilities.getLocalAddress());
            Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_NORMAL + Delimiter + partitioner_.getTokenFactory().toString(token)));
            setMode("Normal", false);
(3) StorageLoadBalancer の起動処理

service/StorageLoadBalancer.java

 startBroadcasting()
  loadTimer_.schedule(new LoadDisseminator(), 2 * Gossiper.intervalInMillis_, BROADCAST_INTERVAL);
   // LoadDisseminator を定期的に実行

./service/LoadDisseminator.java

 private final static Logger logger_ = Logger.getLogger(LoadDisseminator.class);
 protected final static String loadInfo_= "LOAD-INFORMATION";

 run()
  String diskUtilization = String.valueOf(StorageService.instance.getLoad());
  Gossiper.instance.addLocalApplicationState(LoadDisseminator.loadInfo_, new ApplicationState(diskUtilization));
   // Gossiper の EndPointState に自ノードのデータ量(ディスク使用量)の情報を定期的に登録(更新)する。
   //   (Key, Value) = ("LOAD-INFORMATION", ApplicationState(diskUtilization))
   //    と、自動的に情報が Gossip で流れる