ざっくり全体像
各ノードは、定期的に『ランダムに選んだノード、および、ランダムに選んだシード』と Gossip パケットを交換する。
Gossip パケットの 1 回のやりとりは、Syn → Ack → Ack2 の 3way で行われる。
このやりとりの中で、双方のノードは、相手が持っている『グローバル情報』と自分が持っている『グローバル情報』の Digest を比較して、自分の方が新しい情報を持っていれば、その差分を相手に送って、相手の情報をアップデートする。
※『グローバル情報』とは、各ノードが担当するトークンの一覧など、クラスタの全体像を表す情報のこと。各ノードは、それぞれ、ローカルでこれらの情報を保持しており、理想的にはすべてのノードが同じ情報を持っているべきもの。
仮に、シードが無かったとすると、各ノードは、まったくランダムな相手と情報を交換するので、確率論的には、永遠に最新情報をもらえないノードが発生することがおきる。
実際には、各ノードは、必ずシードとは Gossip をやりとりするので、シードを仲介にして、短時間で、すべてのノードが最新情報を持つことができる。(シードが少ないと、シードとの通信がボトルネックになる。しかし、シードが増えると、収束時間はながくなる。)
(参考資料)http://wiki.apache.org/cassandra/ArchitectureGossip_JP
主な登場人物
Gossipの基盤部分
Gossiper.instance : Gossip 処理を行う各ノードのシングルトン・インスタンス
GossipTimerTask : 定期的に Gossip の Syn 送信を行うタスク
endPointStateMap_ : IP から対応するノードの EndPointState を取り出す Map
EndPointState : Gossip で共有する 1 つのノードの情報を詰め込むオブジェクト
(Key, Value) 形式で情報を保存する。
ApplicationState : EndPointState の Value に入る情報。バージョン情報が付随している。
HeartBeatState : EndPointState に付随するハートビート情報。ジェネレーション情報法と、バージョン情報が付随しており、Syn を投げるごとにバージョンが増えていく。
※実際に使われている ApplicationState は、loadInfo_ と MOVE_STATE の2種類だけ。
./service/LoadDisseminator.java: Gossiper.instance.addLocalApplicationState(LoadDisseminator.loadInfo_, new ApplicationState(diskUtilization)); ./service/StorageService.java: Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_NORMAL + Delimiter + partitioner_.getTokenFactory().toString(getLocalToken()))); Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_NORMAL + Delimiter + partitioner_.getTokenFactory().toString(token))); Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_BOOTSTRAPPING + Delimiter + partitioner_.getTokenFactory().toString(token))); Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_LEAVING + Delimiter + getLocalToken().toString())); Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_LEFT + Delimiter + partitioner_.getTokenFactory().toString(getLocalToken()))); Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_NORMAL + Delimiter + partitioner_.getTokenFactory().toString(getLocalToken()) + Delimiter + REMOVE_TOKEN + Delimiter + partitioner_.getTokenFactory().toString(token)));
Gossip の 3way のやりとりを実際に行う人々
GossipDigestSynVerbHandler : Syn を受け取った時に起動するハンドラ(Ack を返す)
GossipDigestAckVerbHandler : Ack を受け取った時に起動するハンドラ(Ack2 を返す)
GossipDigestAck2VerbHandler : Ack2 を受け取った時に起動するハンドラ
Gossip メッセージのローレベルフォーマット
Message : (送信先IP, Stage, Verb, 本文) のフォーマットを持つメッセージ。メッセージを受け取ったノードは、Verb の種類ごとに登録されたハンドラでメッセージを処理する。
定期的に Syn を送る処理
gms/Gossiper.java
private class GossipTimerTask extends TimerTask public void run() endPointStateMap_.get(localEndPoint_).getHeartBeatState().updateHeartBeat(); // 自ノードの EndPointState に含まれるハートビート・カウンタをアップデートする。 List<GossipDigest> gDigests = new ArrayList<GossipDigest>(); Gossiper.instance.makeRandomGossipDigest(gDigests); // 各ノードについて、(IP, HeartBeat Generation, Max Version) の3つ組みのダイジェストからなるアレイを作る。 Message message = makeGossipDigestSynMessage(gDigests); // (自ノード, GOSSIP_STAGE, GOSSIP_DIGEST_SYN, ダイジェストのアレイ) のメッセージ /* Gossip to some random live member */ boolean gossipedToSeed = doGossipToLiveMember(message); // ランダムに選んだ1ノードにメッセージを送る /* Gossip to some unreachable member with some probability to check if he is back up */ doGossipToUnreachableMember(message); if (!gossipedToSeed || liveEndpoints_.size() < seeds_.size()) doGossipToSeed(message); // ある確率で、シードの1つにも同じメッセージを送る doStatusCheck();
Synを受け取った後の処理
gms/GossipDigestSynVerbHandler.java
public class GossipDigestSynVerbHandler implements IVerbHandler public void doVerb(Message message) InetAddress from = message.getFrom(); byte[] bytes = message.getMessageBody(); DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) ); GossipDigestSynMessage gDigestMessage = GossipDigestSynMessage.serializer().deserialize(dis); List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests(); /* Notify the Failure Detector */ Gossiper.instance.notifyFailureDetector(gDigestList); doSort(gDigestList); List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>(); // 差分のあるノードの GossipDigest を入れる。(相手が新しい情報を持っている時に情報をリクエストするため。) Map<InetAddress, EndPointState> deltaEpStateMap = new HashMap<InetAddress, EndPointState>(); // 差分のあるノードの EndPointState そのものを入れる。(こちらが新しい情報を持っている時に、情報を送るため。) Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap); // 情報の差分を発見して、deltaGossipDigestList, deltaEpStateMap を構成する。(1) GossipDigestAckMessage gDigestAck = new GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap); Message gDigestAckMessage = Gossiper.instance.makeGossipDigestAckMessage(gDigestAck); MessagingService.instance.sendOneWay(gDigestAckMessage, from); // 差分情報を入れた ACK を送信
(1) examineGossiper
gms/Gossiper.java
synchronized void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> deltaGossipDigestList, Map<InetAddress, EndPointState> deltaEpStateMap) { for ( GossipDigest gDigest : gDigestList ) // 受け取った各ノードの Digest ごとにループ { int remoteGeneration = gDigest.getGeneration(); int maxRemoteVersion = gDigest.getMaxVersion(); /* Get state associated with the end point in digest */ EndPointState epStatePtr = endPointStateMap_.get(gDigest.getEndPoint()); if ( epStatePtr != null ) // 知っているノードの場合は差分を調べる { int localGeneration = epStatePtr.getHeartBeatState().getGeneration(); int maxLocalVersion = getMaxEndPointStateVersion(epStatePtr); if ( remoteGeneration == localGeneration && maxRemoteVersion == maxLocalVersion ) continue; // 情報が同じ場合は何もしない。 if ( remoteGeneration > localGeneration ) // 相手のジェネレーションが新しい場合は情報を要求 { requestAll(gDigest, deltaGossipDigestList, remoteGeneration); // == deltaGossipDigestList.add( new GossipDigest(gDigest.getEndPoint(), remoteGeneration, 0) ); } if ( remoteGeneration < localGeneration ) // 自分のジェネレーションが新しい場合は情報を送る { /* send all data with generation = localgeneration and version > 0 */ sendAll(gDigest, deltaEpStateMap, 0); // == deltaEpStateMap.put(gDigest.getEndPoint(), localEpStatePtr); } if ( remoteGeneration == localGeneration ) { if ( maxRemoteVersion > maxLocalVersion ) // 相手のバージョンが新しい場合は情報を要求 { deltaGossipDigestList.add( new GossipDigest(gDigest.getEndPoint(), remoteGeneration, maxLocalVersion) ); } if ( maxRemoteVersion < maxLocalVersion ) // 自分のバージョンが新しい場合は情報を送る { /* send all data with generation = localgeneration and version > maxRemoteVersion */ sendAll(gDigest, deltaEpStateMap, maxRemoteVersion); // == deltaEpStateMap.put(gDigest.getEndPoint(), localEpStatePtr); } } } else // 知らないノードの場合は、情報を要求 { /* We are here since we have no data for this endpoint locally so request everything. */ requestAll(gDigest, deltaGossipDigestList, remoteGeneration); // == deltaGossipDigestList.add( new GossipDigest(gDigest.getEndPoint(), remoteGeneration, 0) ); } } }
Ackを受け取った後の処理
gms/GossipDigestAckVerbHandler.java
public class GossipDigestAckVerbHandler implements IVerbHandler { public void doVerb(Message message) InetAddress from = message.getFrom(); byte[] bytes = message.getMessageBody(); DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) ); GossipDigestAckMessage gDigestAckMessage = GossipDigestAckMessage.serializer().deserialize(dis); List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList(); Map<InetAddress, EndPointState> epStateMap = gDigestAckMessage.getEndPointStateMap(); if ( epStateMap.size() > 0 ) // 新しい情報が来たのでローカルの情報をアップデートする。 { /* Notify the Failure Detector */ Gossiper.instance.notifyFailureDetector(epStateMap); Gossiper.instance.applyStateLocally(epStateMap); // ここでアップデート (2) } Map<InetAddress, EndPointState> deltaEpStateMap = new HashMap<InetAddress, EndPointState>(); for( GossipDigest gDigest : gDigestList ) // 要求された情報を Ack2 に入れる { InetAddress addr = gDigest.getEndPoint(); EndPointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion()); if ( localEpStatePtr != null ) deltaEpStateMap.put(addr, localEpStatePtr); } GossipDigestAck2Message gDigestAck2 = new GossipDigestAck2Message(deltaEpStateMap); Message gDigestAck2Message = Gossiper.instance.makeGossipDigestAck2Message(gDigestAck2); MessagingService.instance.sendOneWay(gDigestAck2Message, from); // ACK2 を送る
Ack2を受け取った後の処理
gms/GossipDigestAck2VerbHandler.java
public class GossipDigestAck2VerbHandler implements IVerbHandler public void doVerb(Message message) InetAddress from = message.getFrom(); byte[] bytes = message.getMessageBody(); DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) ); GossipDigestAck2Message gDigestAck2Message; gDigestAck2Message = GossipDigestAck2Message.serializer().deserialize(dis); Map<InetAddress, EndPointState> remoteEpStateMap = gDigestAck2Message.getEndPointStateMap(); /* Notify the Failure Detector */ Gossiper.instance.notifyFailureDetector(remoteEpStateMap); Gossiper.instance.applyStateLocally(remoteEpStateMap); // 受け取った情報でアップデートする (2) } }
受け取った情報をアップデートする処理
特に他のノードの担当 token 情報を受け取る所をみてみます。
gms/Gossiper.java
synchronized void applyStateLocally(Map<InetAddress, EndPointState> epStateMap) for (Entry<InetAddress, EndPointState> entry : epStateMap.entrySet()) // 各ノードについてループ { InetAddress ep = entry.getKey(); EndPointState localEpStatePtr = endPointStateMap_.get(ep); EndPointState remoteState = entry.getValue(); if ( localEpStatePtr != null ) { int localGeneration = localEpStatePtr.getHeartBeatState().getGeneration(); int remoteGeneration = remoteState.getHeartBeatState().getGeneration(); if (remoteGeneration > localGeneration) { handleGenerationChange(ep, remoteState); } else if ( remoteGeneration == localGeneration ) { /* manage the membership state */ int localMaxVersion = getMaxEndPointStateVersion(localEpStatePtr); int remoteMaxVersion = getMaxEndPointStateVersion(remoteState); if ( remoteMaxVersion > localMaxVersion ) { markAlive(ep, localEpStatePtr); applyHeartBeatStateLocally(ep, localEpStatePtr, remoteState); /* apply ApplicationState */ applyApplicationStateLocally(ep, localEpStatePtr, remoteState); } } } else { handleNewJoin(ep, remoteState); // 新規のノード情報の場合 (3) } } } private void handleNewJoin(InetAddress ep, EndPointState epState) // (3) { if (justRemovedEndPoints_.containsKey(ep)) return; handleMajorStateChange(ep, epState, false); // (4) } private void handleMajorStateChange(InetAddress ep, EndPointState epState, boolean isKnownNode) // (4) { endPointStateMap_.put(ep, epState); // endPointStateMap_ に追加登録 isAlive(ep, epState, isKnownNode); for (IEndPointStateChangeSubscriber subscriber : subscribers_) subscriber.onJoin(ep, epState); // 各 subscriber の onJoin ハンドラを呼び出し (5) }
/service/StorageService.java
public void onJoin(InetAddress endpoint, EndPointState epState) // (5) { for (Map.Entry<String,ApplicationState> entry : epState.getSortedApplicationStates()) { onChange(endpoint, entry.getKey(), entry.getValue()); } } public void onChange(InetAddress endpoint, String apStateName, ApplicationState apState) // (6) { if (!MOVE_STATE.equals(apStateName)) return; // 正常起動中のノードは、EndPointState に ApplicationStatus : (MOVE_STATE, "STATE_NORMAL, Token") を登録している。 String apStateValue = apState.getValue(); String[] pieces = apStateValue.split(DelimiterStr, -1); assert (pieces.length > 0); String moveName = pieces[0]; if (moveName.equals(STATE_BOOTSTRAPPING)) handleStateBootstrap(endpoint, pieces); else if (moveName.equals(STATE_NORMAL)) // ここが呼ばれる (7) handleStateNormal(endpoint, pieces); else if (moveName.equals(STATE_LEAVING)) handleStateLeaving(endpoint, pieces); else if (moveName.equals(STATE_LEFT)) handleStateLeft(endpoint, pieces); } private void handleStateNormal(InetAddress endPoint, String[] pieces) // (7) { Token token = getPartitioner().getTokenFactory().fromString(pieces[1]); // we don't want to update if this node is responsible for the token and it has a later startup time than endpoint. InetAddress currentNode = tokenMetadata_.getEndPoint(token); if (currentNode == null || Gossiper.instance.compareEndpointStartup(endPoint, currentNode) > 0) tokenMetadata_.updateNormalToken(token, endPoint); // ここで tokenMetadata_ が更新される。 else logger_.info("Will not change my token ownership to " + endPoint); calculatePendingRanges(); if (!isClientMode) SystemTable.updateToken(endPoint, token); }
(補足)MessagingService の仕組み
Message フォーマット
(To, Stage, Verb, Body)
Message を受け取った側は、Verb ごとに登録された VerbHandler が受信後の処理をおこなう。
StorageService が登録する VerbHandler
StorageService が微妙に Gossiper と密結合している。Gossiper だけ独立したクラスタサービスになっていた方が気持ちいいのだけど。。。。
/* register the verb handlers */ MessagingService.instance.registerVerbHandlers(Verb.BINARY, new BinaryVerbHandler()); MessagingService.instance.registerVerbHandlers(Verb.MUTATION, new RowMutationVerbHandler()); MessagingService.instance.registerVerbHandlers(Verb.READ_REPAIR, new ReadRepairVerbHandler()); MessagingService.instance.registerVerbHandlers(Verb.READ, new ReadVerbHandler()); MessagingService.instance.registerVerbHandlers(Verb.RANGE_SLICE, new RangeSliceVerbHandler()); // see BootStrapper for a summary of how the bootstrap verbs interact MessagingService.instance.registerVerbHandlers(Verb.BOOTSTRAP_TOKEN, new BootStrapper.BootstrapTokenVerbHandler()); MessagingService.instance.registerVerbHandlers(Verb.STREAM_REQUEST, new StreamRequestVerbHandler() ); MessagingService.instance.registerVerbHandlers(Verb.STREAM_INITIATE, new StreamInitiateVerbHandler()); MessagingService.instance.registerVerbHandlers(Verb.STREAM_INITIATE_DONE, new StreamInitiateDoneVerbHandler()); MessagingService.instance.registerVerbHandlers(Verb.STREAM_FINISHED, new StreamFinishedVerbHandler()); MessagingService.instance.registerVerbHandlers(Verb.READ_RESPONSE, new ResponseVerbHandler()); MessagingService.instance.registerVerbHandlers(Verb.TREE_REQUEST, new TreeRequestVerbHandler()); MessagingService.instance.registerVerbHandlers(Verb.TREE_RESPONSE, new AntiEntropyService.TreeResponseVerbHandler()); MessagingService.instance.registerVerbHandlers(Verb.JOIN, new GossiperJoinVerbHandler()); MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_SYN, new GossipDigestSynVerbHandler()); MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK, new GossipDigestAckVerbHandler()); MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK2, new GossipDigestAck2VerbHandler());
Message送信
MessagingService.instance.sendOneWay(message, to); // MessagingService で送る。
Gosssipの受信部分
net/MessagingService.java
public static void receive(Message message) // TCP Socket で受信したメッセージはここに入る。 { message = SinkManager.processServerMessageSink(message); Runnable runnable = new MessageDeliveryTask(message); ExecutorService stage = StageManager.getStage(message.getMessageType()); if (stage == null) { defaultExecutor_.execute(runnable); } else { stage.execute(runnable); } }
net/MessageDeliveryTask.java
public class MessageDeliveryTask implements Runnable { private Message message_; public MessageDeliveryTask(Message message) { message_ = message; } public void run() { StorageService.Verb verb = message_.getVerb(); switch (verb) { case BINARY: case MUTATION: case READ: case RANGE_SLICE: case READ_REPAIR: IVerbHandler verbHandler = MessagingService.instance.getVerbHandler(verb); verbHandler.doVerb(message_); } }