めもめも

このブログに記載の内容は個人の見解であり、必ずしも所属組織の立場、戦略、意見を代表するものではありません。

Cassandraソース解析(2) - Gossip による情報交換を整理する

ざっくり全体像

各ノードは、定期的に『ランダムに選んだノード、および、ランダムに選んだシード』と Gossip パケットを交換する。

Gossip パケットの 1 回のやりとりは、Syn → Ack → Ack2 の 3way で行われる。

このやりとりの中で、双方のノードは、相手が持っている『グローバル情報』と自分が持っている『グローバル情報』の Digest を比較して、自分の方が新しい情報を持っていれば、その差分を相手に送って、相手の情報をアップデートする。

※『グローバル情報』とは、各ノードが担当するトークンの一覧など、クラスタの全体像を表す情報のこと。各ノードは、それぞれ、ローカルでこれらの情報を保持しており、理想的にはすべてのノードが同じ情報を持っているべきもの。

仮に、シードが無かったとすると、各ノードは、まったくランダムな相手と情報を交換するので、確率論的には、永遠に最新情報をもらえないノードが発生することがおきる。

実際には、各ノードは、必ずシードとは Gossip をやりとりするので、シードを仲介にして、短時間で、すべてのノードが最新情報を持つことができる。(シードが少ないと、シードとの通信がボトルネックになる。しかし、シードが増えると、収束時間はながくなる。)

(参考資料)http://wiki.apache.org/cassandra/ArchitectureGossip_JP

主な登場人物

Gossipの基盤部分

Gossiper.instance : Gossip 処理を行う各ノードのシングルトン・インスタンス
GossipTimerTask : 定期的に Gossip の Syn 送信を行うタスク

endPointStateMap_ : IP から対応するノードの EndPointState を取り出す Map
EndPointState : Gossip で共有する 1 つのノードの情報を詰め込むオブジェクト
        (Key, Value) 形式で情報を保存する。
ApplicationState : EndPointState の Value に入る情報。バージョン情報が付随している。
HeartBeatState : EndPointState に付随するハートビート情報。ジェネレーション情報法と、バージョン情報が付随しており、Syn を投げるごとにバージョンが増えていく。

※実際に使われている ApplicationState は、loadInfo_ と MOVE_STATE の2種類だけ。

./service/LoadDisseminator.java:
Gossiper.instance.addLocalApplicationState(LoadDisseminator.loadInfo_, new ApplicationState(diskUtilization));

./service/StorageService.java:
Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_NORMAL + Delimiter + partitioner_.getTokenFactory().toString(getLocalToken())));
Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_NORMAL + Delimiter + partitioner_.getTokenFactory().toString(token)));
Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_BOOTSTRAPPING + Delimiter + partitioner_.getTokenFactory().toString(token)));
Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_LEAVING + Delimiter + getLocalToken().toString()));
Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_LEFT + Delimiter + partitioner_.getTokenFactory().toString(getLocalToken())));
Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_NORMAL + Delimiter + partitioner_.getTokenFactory().toString(getLocalToken()) + Delimiter + REMOVE_TOKEN + Delimiter + partitioner_.getTokenFactory().toString(token)));
Gossip の 3way のやりとりを実際に行う人々

GossipDigestSynVerbHandler : Syn を受け取った時に起動するハンドラ(Ack を返す)

GossipDigestAckVerbHandler : Ack を受け取った時に起動するハンドラ(Ack2 を返す)

GossipDigestAck2VerbHandler : Ack2 を受け取った時に起動するハンドラ

Gossip メッセージのローレベルフォーマット

Message : (送信先IP, Stage, Verb, 本文) のフォーマットを持つメッセージ。メッセージを受け取ったノードは、Verb の種類ごとに登録されたハンドラでメッセージを処理する。

定期的に Syn を送る処理

gms/Gossiper.java

    private class GossipTimerTask extends TimerTask
        public void run()
                    endPointStateMap_.get(localEndPoint_).getHeartBeatState().updateHeartBeat();
                    // 自ノードの EndPointState に含まれるハートビート・カウンタをアップデートする。

                    List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
                    Gossiper.instance.makeRandomGossipDigest(gDigests);
          // 各ノードについて、(IP, HeartBeat Generation, Max Version) の3つ組みのダイジェストからなるアレイを作る。
                        Message message = makeGossipDigestSynMessage(gDigests);
          // (自ノード, GOSSIP_STAGE, GOSSIP_DIGEST_SYN, ダイジェストのアレイ) のメッセージ

                        /* Gossip to some random live member */
                        boolean gossipedToSeed = doGossipToLiveMember(message);
              // ランダムに選んだ1ノードにメッセージを送る

                        /* Gossip to some unreachable member with some probability to check if he is back up */
                        doGossipToUnreachableMember(message);

                        if (!gossipedToSeed || liveEndpoints_.size() < seeds_.size())
                            doGossipToSeed(message);
               // ある確率で、シードの1つにも同じメッセージを送る
                        doStatusCheck();

Synを受け取った後の処理

gms/GossipDigestSynVerbHandler.java

public class GossipDigestSynVerbHandler implements IVerbHandler
    public void doVerb(Message message)
        InetAddress from = message.getFrom();
        byte[] bytes = message.getMessageBody();
        DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );

           GossipDigestSynMessage gDigestMessage = GossipDigestSynMessage.serializer().deserialize(dis);
            List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests();

            /* Notify the Failure Detector */
            Gossiper.instance.notifyFailureDetector(gDigestList);

            doSort(gDigestList);

            List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
            // 差分のあるノードの GossipDigest を入れる。(相手が新しい情報を持っている時に情報をリクエストするため。)
            Map<InetAddress, EndPointState> deltaEpStateMap = new HashMap<InetAddress, EndPointState>();
      // 差分のあるノードの EndPointState そのものを入れる。(こちらが新しい情報を持っている時に、情報を送るため。)

            Gossiper.instance.examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
      // 情報の差分を発見して、deltaGossipDigestList, deltaEpStateMap を構成する。(1)

            GossipDigestAckMessage gDigestAck = new GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap);
            Message gDigestAckMessage = Gossiper.instance.makeGossipDigestAckMessage(gDigestAck);
            MessagingService.instance.sendOneWay(gDigestAckMessage, from); // 差分情報を入れた ACK を送信

(1) examineGossiper
gms/Gossiper.java

    synchronized void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> deltaGossipDigestList, Map<InetAddress, EndPointState> deltaEpStateMap)
    {
        for ( GossipDigest gDigest : gDigestList ) // 受け取った各ノードの Digest ごとにループ
        {
            int remoteGeneration = gDigest.getGeneration();
            int maxRemoteVersion = gDigest.getMaxVersion();
            /* Get state associated with the end point in digest */
            EndPointState epStatePtr = endPointStateMap_.get(gDigest.getEndPoint());
 
           if ( epStatePtr != null ) // 知っているノードの場合は差分を調べる
            {
                int localGeneration = epStatePtr.getHeartBeatState().getGeneration();
                int maxLocalVersion = getMaxEndPointStateVersion(epStatePtr);

                if ( remoteGeneration == localGeneration && maxRemoteVersion == maxLocalVersion )
                    continue; // 情報が同じ場合は何もしない。

                if ( remoteGeneration > localGeneration ) // 相手のジェネレーションが新しい場合は情報を要求
                {
                    requestAll(gDigest, deltaGossipDigestList, remoteGeneration);
          // == deltaGossipDigestList.add( new GossipDigest(gDigest.getEndPoint(), remoteGeneration, 0) );
                }
                if ( remoteGeneration < localGeneration ) // 自分のジェネレーションが新しい場合は情報を送る
                {
                    /* send all data with generation = localgeneration and version > 0 */
                    sendAll(gDigest, deltaEpStateMap, 0);
            // == deltaEpStateMap.put(gDigest.getEndPoint(), localEpStatePtr);
                }

                if ( remoteGeneration == localGeneration )
                {
                    if ( maxRemoteVersion > maxLocalVersion ) // 相手のバージョンが新しい場合は情報を要求
                    {
                        deltaGossipDigestList.add( new GossipDigest(gDigest.getEndPoint(), remoteGeneration, maxLocalVersion) );
                    }
                    if ( maxRemoteVersion < maxLocalVersion ) // 自分のバージョンが新しい場合は情報を送る
                    {
                        /* send all data with generation = localgeneration and version > maxRemoteVersion */
                        sendAll(gDigest, deltaEpStateMap, maxRemoteVersion);
            // == deltaEpStateMap.put(gDigest.getEndPoint(), localEpStatePtr);
                    }
                }
            }
            else // 知らないノードの場合は、情報を要求
            {
                /* We are here since we have no data for this endpoint locally so request everything. */
                requestAll(gDigest, deltaGossipDigestList, remoteGeneration);
        // == deltaGossipDigestList.add( new GossipDigest(gDigest.getEndPoint(), remoteGeneration, 0) );

            }
        }
    }

Ackを受け取った後の処理

gms/GossipDigestAckVerbHandler.java

public class GossipDigestAckVerbHandler implements IVerbHandler
{
    public void doVerb(Message message)
        InetAddress from = message.getFrom();
        byte[] bytes = message.getMessageBody();
        DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );

            GossipDigestAckMessage gDigestAckMessage = GossipDigestAckMessage.serializer().deserialize(dis);
            List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
            Map<InetAddress, EndPointState> epStateMap = gDigestAckMessage.getEndPointStateMap();

            if ( epStateMap.size() > 0 ) // 新しい情報が来たのでローカルの情報をアップデートする。
            {
                /* Notify the Failure Detector */
                Gossiper.instance.notifyFailureDetector(epStateMap);
                Gossiper.instance.applyStateLocally(epStateMap); // ここでアップデート (2)
            }

            Map<InetAddress, EndPointState> deltaEpStateMap = new HashMap<InetAddress, EndPointState>();
            for( GossipDigest gDigest : gDigestList ) // 要求された情報を Ack2 に入れる
            {
                InetAddress addr = gDigest.getEndPoint();
                EndPointState localEpStatePtr = Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
                if ( localEpStatePtr != null )
                    deltaEpStateMap.put(addr, localEpStatePtr);
            }

            GossipDigestAck2Message gDigestAck2 = new GossipDigestAck2Message(deltaEpStateMap);
            Message gDigestAck2Message = Gossiper.instance.makeGossipDigestAck2Message(gDigestAck2);
            MessagingService.instance.sendOneWay(gDigestAck2Message, from); // ACK2 を送る

Ack2を受け取った後の処理

gms/GossipDigestAck2VerbHandler.java

public class GossipDigestAck2VerbHandler implements IVerbHandler
    public void doVerb(Message message)
        InetAddress from = message.getFrom();
        byte[] bytes = message.getMessageBody();
        DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
        GossipDigestAck2Message gDigestAck2Message;
            gDigestAck2Message = GossipDigestAck2Message.serializer().deserialize(dis);
        Map<InetAddress, EndPointState> remoteEpStateMap = gDigestAck2Message.getEndPointStateMap();
        /* Notify the Failure Detector */
        Gossiper.instance.notifyFailureDetector(remoteEpStateMap);
        Gossiper.instance.applyStateLocally(remoteEpStateMap); // 受け取った情報でアップデートする (2)
    }
}

受け取った情報をアップデートする処理

特に他のノードの担当 token 情報を受け取る所をみてみます。

gms/Gossiper.java

    synchronized void applyStateLocally(Map<InetAddress, EndPointState> epStateMap)
        for (Entry<InetAddress, EndPointState> entry : epStateMap.entrySet()) // 各ノードについてループ
        {
            InetAddress ep = entry.getKey();
            EndPointState localEpStatePtr = endPointStateMap_.get(ep);
            EndPointState remoteState = entry.getValue();

            if ( localEpStatePtr != null )
            {
                int localGeneration = localEpStatePtr.getHeartBeatState().getGeneration();
                int remoteGeneration = remoteState.getHeartBeatState().getGeneration();

                if (remoteGeneration > localGeneration)
                {
                    handleGenerationChange(ep, remoteState);
                }
                else if ( remoteGeneration == localGeneration )
                {
                        /* manage the membership state */
                        int localMaxVersion = getMaxEndPointStateVersion(localEpStatePtr);
                        int remoteMaxVersion = getMaxEndPointStateVersion(remoteState);
                        if ( remoteMaxVersion > localMaxVersion )
                        {
                            markAlive(ep, localEpStatePtr);
                            applyHeartBeatStateLocally(ep, localEpStatePtr, remoteState);
                            /* apply ApplicationState */
                            applyApplicationStateLocally(ep, localEpStatePtr, remoteState);
                        }
                }
            }
            else
            {
                handleNewJoin(ep, remoteState); // 新規のノード情報の場合 (3)
            }
        }
    }

    private void handleNewJoin(InetAddress ep, EndPointState epState) // (3)
    {
        if (justRemovedEndPoints_.containsKey(ep))
            return;
        handleMajorStateChange(ep, epState, false); // (4)
    }

    private void handleMajorStateChange(InetAddress ep, EndPointState epState, boolean isKnownNode) // (4)
    {
        endPointStateMap_.put(ep, epState); // endPointStateMap_ に追加登録
        isAlive(ep, epState, isKnownNode);
        for (IEndPointStateChangeSubscriber subscriber : subscribers_)
            subscriber.onJoin(ep, epState); // 各 subscriber の onJoin ハンドラを呼び出し (5)
    }

/service/StorageService.java

    public void onJoin(InetAddress endpoint, EndPointState epState) // (5)
    {
        for (Map.Entry<String,ApplicationState> entry : epState.getSortedApplicationStates())
        {
            onChange(endpoint, entry.getKey(), entry.getValue());
        }
    }

    public void onChange(InetAddress endpoint, String apStateName, ApplicationState apState)  // (6)
    {
        if (!MOVE_STATE.equals(apStateName))
            return;
// 正常起動中のノードは、EndPointState に ApplicationStatus : (MOVE_STATE, "STATE_NORMAL, Token") を登録している。

        String apStateValue = apState.getValue();
        String[] pieces = apStateValue.split(DelimiterStr, -1);
        assert (pieces.length > 0);

        String moveName = pieces[0];

        if (moveName.equals(STATE_BOOTSTRAPPING))
            handleStateBootstrap(endpoint, pieces);
        else if (moveName.equals(STATE_NORMAL))  // ここが呼ばれる (7) 
            handleStateNormal(endpoint, pieces); 
        else if (moveName.equals(STATE_LEAVING))
            handleStateLeaving(endpoint, pieces);
        else if (moveName.equals(STATE_LEFT))
            handleStateLeft(endpoint, pieces);
    }

    private void handleStateNormal(InetAddress endPoint, String[] pieces) // (7)
    {
        Token token = getPartitioner().getTokenFactory().fromString(pieces[1]);
        // we don't want to update if this node is responsible for the token and it has a later startup time than endpoint.
        InetAddress currentNode = tokenMetadata_.getEndPoint(token);
        if (currentNode == null || Gossiper.instance.compareEndpointStartup(endPoint, currentNode) > 0)
            tokenMetadata_.updateNormalToken(token, endPoint); // ここで tokenMetadata_ が更新される。
        else
            logger_.info("Will not change my token ownership to " + endPoint);

        calculatePendingRanges();
        if (!isClientMode)
            SystemTable.updateToken(endPoint, token);
    }

(補足)MessagingService の仕組み

Message フォーマット

(To, Stage, Verb, Body)

Message を受け取った側は、Verb ごとに登録された VerbHandler が受信後の処理をおこなう。

StorageService が登録する VerbHandler

StorageService が微妙に Gossiper と密結合している。Gossiper だけ独立したクラスタサービスになっていた方が気持ちいいのだけど。。。。

        /* register the verb handlers */
        MessagingService.instance.registerVerbHandlers(Verb.BINARY, new BinaryVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.MUTATION, new RowMutationVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.READ_REPAIR, new ReadRepairVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.READ, new ReadVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.RANGE_SLICE, new RangeSliceVerbHandler());
        // see BootStrapper for a summary of how the bootstrap verbs interact
        MessagingService.instance.registerVerbHandlers(Verb.BOOTSTRAP_TOKEN, new BootStrapper.BootstrapTokenVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.STREAM_REQUEST, new StreamRequestVerbHandler() );
        MessagingService.instance.registerVerbHandlers(Verb.STREAM_INITIATE, new StreamInitiateVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.STREAM_INITIATE_DONE, new StreamInitiateDoneVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.STREAM_FINISHED, new StreamFinishedVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.READ_RESPONSE, new ResponseVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.TREE_REQUEST, new TreeRequestVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.TREE_RESPONSE, new AntiEntropyService.TreeResponseVerbHandler());

        MessagingService.instance.registerVerbHandlers(Verb.JOIN, new GossiperJoinVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_SYN, new GossipDigestSynVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK, new GossipDigestAckVerbHandler());
        MessagingService.instance.registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK2, new GossipDigestAck2VerbHandler());
Message送信
        MessagingService.instance.sendOneWay(message, to); // MessagingService で送る。
Gosssipの受信部分

net/MessagingService.java

    public static void receive(Message message)   // TCP Socket で受信したメッセージはここに入る。
    {
        message = SinkManager.processServerMessageSink(message);
        Runnable runnable = new MessageDeliveryTask(message);
        ExecutorService stage = StageManager.getStage(message.getMessageType());
        if (stage == null)
        {
            defaultExecutor_.execute(runnable);
        }
        else
        {
            stage.execute(runnable);
        }
    }

net/MessageDeliveryTask.java

public class MessageDeliveryTask implements Runnable
{
    private Message message_;
    public MessageDeliveryTask(Message message)
    {
        message_ = message;
    }
    public void run()
    {
        StorageService.Verb verb = message_.getVerb();
        switch (verb)
        {
            case BINARY:
            case MUTATION:
            case READ:
            case RANGE_SLICE:
            case READ_REPAIR:
        IVerbHandler verbHandler = MessagingService.instance.getVerbHandler(verb);
        verbHandler.doVerb(message_);
    }
}