めもめも

このブログに記載の内容は個人の見解であり、必ずしも所属組織の立場、戦略、意見を代表するものではありません。

Cassandraソース解析(3) - Bootstrap 処理

執筆中です。

ざっくり全体像

クラスタに未参加のノードを新規に起動して、クラスタに参加させる処理を Bootstrap と呼ぶ。ざっくりの流れは・・・

Bootstrap中のノードのリストに追加する。

担当 Token をもらって、担当 Range が決まる。(基本的には、データ量の多いノードの手前に入る。)

担当 Range のデータを既存のノードからコピーして受け取る。

Bootstrap 開始部分

service/StorageService.java

// この時点で Gossiper は稼働しており、他のノードの tokenMetadata_ は持っている。
     setMode("Joining: getting load information", true);
     StorageLoadBalancer.instance.waitForLoadInfo(); // 一瞬待つだけ。
     Token token = BootStrapper.getBootstrapToken(tokenMetadata_, StorageLoadBalancer.instance.getLoadInfo());
      // 担当する token をもらって (1)
     startBootstrap(token); // Bootstrap を開始。

            // don't finish startup (enabling thrift) until after bootstrap is done
            while (isBootstrapMode) { Thread.sleep(100); } // Bootstrap 完了を待って終了
(1)担当するtokenをもらう部分

dht/BootStrapper.java

    public static Token getBootstrapToken(final TokenMetadata metadata, final Map<InetAddress, Double> load) throws IOException, ConfigurationException
        return getBalancedToken(metadata, load); // (2)

    public static Token getBalancedToken(TokenMetadata metadata, Map<InetAddress, Double> load) // (2)
        InetAddress maxEndpoint = getBootstrapSource(metadata, load);
           // 担当 Range 内で Bootstrap 中のノードが最も少なく、データ量がもっとも多いノードを返す。
           // (既に引き継ぎ開始中のノードは除外するため。)
        Token<?> t = getBootstrapTokenFrom(maxEndpoint); // (3)
      // そのノードの Range の中間点の token が返る。
      // つまり、そのノードの担当領域の半分を引き継ぐ
        return t;

    private static Token<?> getBootstrapTokenFrom(InetAddress maxEndpoint) // (3)
        Message message = new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.BOOTSTRAP_TOKEN, ArrayUtils.EMPTY_BYTE_ARRAY);
     // Verb = BOOTSTRAP_TOKEN を引き継ぎ元のノードになげて、 // (4)
        BootstrapTokenCallback btc = new BootstrapTokenCallback();
        MessagingService.instance.sendRR(message, maxEndpoint, btc);
        return btc.getToken(); // 担当 token を受け取る
担当tokenを返答する側の処理

service/StorageService.java

        MessagingService.instance.registerVerbHandlers(Verb.BOOTSTRAP_TOKEN, new BootStrapper.BootstrapTokenVerbHandler()); // (4)
      // Verb = BOOTSTRAP_TOKEN のハンドラ (5)

dht/BootStrapper.java

    public static class BootstrapTokenVerbHandler implements IVerbHandler 
        public void doVerb(Message message) // (5)
            StorageService ss = StorageService.instance;
            String tokenString = ss.getBootstrapToken().toString();
              // 自ノードが担当する Range の中間点の token を返す
            Message response;
                response = message.getReply(FBUtilities.getLocalAddress(), tokenString.getBytes("UTF-8"));

Bootstrap 実施部分

※ Keyspace は Config ファイルで指定される事に注意。動的に追加されるものではない。

dht/BootStrapper.java

    public void startBootstrap() throws IOException
        for (String table : DatabaseDescriptor.getNonSystemTables())
        // table は Config で指定された Keyspace の事。
            Multimap<Range, InetAddress> rangesWithSourceTarget = getRangesWithSources(table);
            // 指定の Keyspace について、<(Replica含めて)自ノードがこれから担当する Range, 今、そこのデータを持っている IP 一式>---(A) の組全体を返す。(1)

            /* Send messages to respective folks to stream data over to me */
            for (Map.Entry<InetAddress, Collection<Range>> entry : getWorkMap(rangesWithSourceTarget).asMap().entrySet())
              // entry は、(A) の Multimap をひっくり返して <IP, 送るべき Range一式> の Multimap にしたもの
       //(同じ Range については、最初の生きている IP のみが選択される。)
                InetAddress source = entry.getKey();
                StorageService.instance.addBootstrapSource(source, table);
         // Bootstrap で引き継ぎ中のノードのリストに追加
                StreamIn.requestRanges(source, table, entry.getValue());
         // データを持っているノードに Keyspace と Ranges を指定して、データ転送を要求 (2)

   /** get potential sources for each range, ordered by proximity (as determined by EndPointSnitch) */
    Multimap<Range, InetAddress> getRangesWithSources(String table) // (1)
        final AbstractReplicationStrategy strat = StorageService.instance.getReplicationStrategy(table);
        Collection<Range> myRanges = strat.getPendingAddressRanges(tokenMetadata, token, address, table);
          // 自ノードがこれから担当する Range が戻る

        Multimap<Range, InetAddress> myRangeAddresses = ArrayListMultimap.create();
        Multimap<Range, InetAddress> rangeAddresses = strat.getRangeAddresses(tokenMetadata, table);
     // 既存の Range 一式が入る
        for (Range myRange : myRanges)
            for (Range range : rangeAddresses.keySet())
                if (range.contains(myRange))
                    List<InetAddress> preferred = DatabaseDescriptor.getEndPointSnitch(table).getSortedListByProximity(address, rangeAddresses.get(range));
          // 欲しいデータを持っているノード一式の IP が近い順のリスト
                    myRangeAddresses.putAll(myRange, preferred);
        return myRangeAddresses;
(2)データ転送要求開始からデータ転送の実施の流れ

streaming/StreamIn.java

    public static void requestRanges(InetAddress source, String tableName, Collection<Range> ranges)
        StreamRequestMetadata streamRequestMetadata = new StreamRequestMetadata(FBUtilities.getLocalAddress(), ranges, tableName);
        Message message = StreamRequestMessage.makeStreamRequestMessage(new StreamRequestMessage(streamRequestMetadata));
        // StreamRequest メッセージを送付 (3)
        MessagingService.instance.sendOneWay(message, source);

streaming/StreamRequestVerbHandler.java

public class StreamRequestVerbHandler implements IVerbHandler
// (3) StreamRequest メッセージを受信した側
    public void doVerb(Message message)
        byte[] body = message.getMessageBody();
        ByteArrayInputStream bufIn = new ByteArrayInputStream(body);

            StreamRequestMessage streamRequestMessage = StreamRequestMessage.serializer().deserialize(new DataInputStream(bufIn));
            StreamRequestMetadata[] streamRequestMetadata = streamRequestMessage.streamRequestMetadata_;

            for (StreamRequestMetadata srm : streamRequestMetadata)
            {
                StreamOut.transferRanges(srm.target_, srm.table_, srm.ranges_, null);
                // (4)
            }
        }

streaming/StreamOut.java

    public static void transferRanges(InetAddress target, String tableName, Collection<Range> ranges, Runnable callback)
      // (4) 要求されたデータを転送
      // SSTable ファイルを分割して、転送部分のみのファイルを切り出して転送する。
        /*
         * (1) dump all the memtables to disk.
         * (2) anticompaction -- split out the keys in the range specified
         * (3) transfer the data.
        */
            /* Get the list of files that need to be streamed */
            transferSSTables(target, table.forceAntiCompaction(ranges, target), tableName); // SSTR GC deletes the file when done

※ データ転送元にもデータはレプリカとしてそのまま残る。RackUnawareStrategy の場合は、再レプリケーションは不要。(おしりのレプリカが不要になるが、それは、Major Compaction で消されるはず???)
※ RackAwareStrategy の場合、再レプリケーションが必要な気がするが、そこのロジックは不明。

Bootstrap完了部分

./service/StorageService.java

    public void removeBootstrapSource(InetAddress s, String table)
    // データ受け取りが終わるとこれが呼ばれる。
    {
        if (table == null)
            bootstrapSet.removeAll(s);
        else
            bootstrapSet.remove(s, table);
        if (logger_.isDebugEnabled())
            logger_.debug(String.format("Removed %s/%s as a bootstrap source; remaining is [%s]", s, table == null ? "<ALL>" : table, StringUtils.join(bootstrapSet.keySet(), ", ")));

        if (bootstrapSet.isEmpty())
        {
            finishBootstrapping();
            // 全部の転送元からファイルを受け取り終わると Bootstrap 完了
        }
    }

    private void finishBootstrapping()
    {
        isBootstrapMode = false;
        SystemTable.setBootstrapped(true);
        setToken(getLocalToken());
        Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_NORMAL + Delimiter + partitioner_.getTokenFactory().toString(getLocalToken())));
        logger_.info("Bootstrap/move completed! Now serving reads.");
        setMode("Normal", false);
    }