訳あって、Cassandra のソースを読んでいます。バージョンは 0.6.8 です。
(以下、執筆中です。日々変わります。)
主な登場人物
CassandraDaemon : 最初に起動するデーモン
StorageService : データアクセス処理の中心となるシングルトン
CustomTThreadPoolServer : thrift クライアントからの接続を受けるスレッド群
Gossiper : ゴシップで情報交換するためのシングルトン(いわゆるクラスタサービス)
⇒ Gossiper.instance.register( Object ) で Object を登録すると、Gossip が他ノードに関するイベントを検知すると、以下のハンドラを呼び出すようになる。
- ./gms/Gossiper.java: subscriber.onRemove(endpoint);
- ./gms/Gossiper.java: subscriber.onJoin(ep, epState);
- ./gms/Gossiper.java: subscriber.onChange(addr, stateName, state);
- ./gms/Gossiper.java: subscriber.onAlive(addr, epState);
- ./gms/Gossiper.java: subscriber.onDead(addr, epState);
- ./net/ResponseVerbHandler.java: subscriber.receiveTiming(host, latency);
⇒ Gossiper.instance.addLocalApplicationState() でばらまきたい情報を登録する
MessagingService : メッセージングサービス・シングルトン
StorageLoadBalancer : 各ノードの負荷情報を持っているシングルトン
LoadDisseminator : StorageLoadBalancer の持つ付加情報を Gossip でばらまく人
Gossiper に登録する人々
StorageService(シングルトン)
ノード起動時の流れ
(1) CassandraDaemon の起動
thrift/CassandraDaemon.java
main() daemon = new CassandraDaemon(); daemon.setup() StorageService.instance.initServer() // StorageService をシングルトンで起動 (2) // now we start listening for clients cassandraServer = new CassandraServer(); processor = new Cassandra.Processor(cassandraServer); serverEngine = new CustomTThreadPoolServer(new TProcessorFactory(processor),・・・ daemon.start() serverEngine.serve(); // thrift クライアントからの接続を受けるループに入る // ここに来たらデーモン停止処理に入る。
(2) StorageService の起動
service/StorageService.java
initServer() setMode("Joining: getting bootstrap token", true); // "管理ツールでステータスを表示した時に出るメッセージ", ログにも吐くかどうか? storageMetadata_ = SystemTable.initMetadata(); // be certain that the recorded clustername matches what the user specified // Config に記載のクラスタ名と SSTable に記載のクラスタ名が一致することを確認 DatabaseDescriptor.createAllDirectories(); Gossiper.instance.register(this); // Gossiper に登録 Gossiper.instance.start(FBUtilities.getLocalAddress(), storageMetadata_.getGeneration()); // needed for node-ring gathering. // Gossiper の起動 (4) // この段階でローカルノード用の EndPointState が用意されて、ハートビートのステータスが入る。 MessagingService.instance.listen(FBUtilities.getLocalAddress()); // MessagingService 起動 StorageLoadBalancer.instance.startBroadcasting(); // StorageLoadBalancer 起動 (3) // すでに用意済みの EndPointState に loadInfo_ を定期的に追加登録する。 // 新規ノードを起動してクラスタに参加する場合 setMode("Joining: getting load information", true); StorageLoadBalancer.instance.waitForLoadInfo(); // 一瞬待つだけ。 Token token = BootStrapper.getBootstrapToken(tokenMetadata_, StorageLoadBalancer.instance.getLoadInfo()); // 担当する token をもらって、 startBootstrap(token); // Bootstrap を開始。 // don't finish startup (enabling thrift) until after bootstrap is done while (isBootstrapMode) { Thread.sleep(100); } // Bootstrap 完了を待って終了 // クラスタに登録済みのノードの場合 SystemTable.setBootstrapped(true); Token token = storageMetadata_.getToken(); tokenMetadata_.updateNormalToken(token, FBUtilities.getLocalAddress()); Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_NORMAL + Delimiter + partitioner_.getTokenFactory().toString(token))); setMode("Normal", false);
(3) StorageLoadBalancer の起動処理
service/StorageLoadBalancer.java
startBroadcasting() loadTimer_.schedule(new LoadDisseminator(), 2 * Gossiper.intervalInMillis_, BROADCAST_INTERVAL); // LoadDisseminator を定期的に実行
./service/LoadDisseminator.java
private final static Logger logger_ = Logger.getLogger(LoadDisseminator.class); protected final static String loadInfo_= "LOAD-INFORMATION"; run() String diskUtilization = String.valueOf(StorageService.instance.getLoad()); Gossiper.instance.addLocalApplicationState(LoadDisseminator.loadInfo_, new ApplicationState(diskUtilization)); // Gossiper の EndPointState に自ノードのデータ量(ディスク使用量)の情報を定期的に登録(更新)する。 // (Key, Value) = ("LOAD-INFORMATION", ApplicationState(diskUtilization)) // と、自動的に情報が Gossip で流れる