執筆中です。
ざっくり全体像
クラスタに未参加のノードを新規に起動して、クラスタに参加させる処理を Bootstrap と呼ぶ。ざっくりの流れは・・・
Bootstrap中のノードのリストに追加する。
↓
担当 Token をもらって、担当 Range が決まる。(基本的には、データ量の多いノードの手前に入る。)
↓
担当 Range のデータを既存のノードからコピーして受け取る。
Bootstrap 開始部分
service/StorageService.java
// この時点で Gossiper は稼働しており、他のノードの tokenMetadata_ は持っている。 setMode("Joining: getting load information", true); StorageLoadBalancer.instance.waitForLoadInfo(); // 一瞬待つだけ。 Token token = BootStrapper.getBootstrapToken(tokenMetadata_, StorageLoadBalancer.instance.getLoadInfo()); // 担当する token をもらって (1) startBootstrap(token); // Bootstrap を開始。 // don't finish startup (enabling thrift) until after bootstrap is done while (isBootstrapMode) { Thread.sleep(100); } // Bootstrap 完了を待って終了
(1)担当するtokenをもらう部分
dht/BootStrapper.java
public static Token getBootstrapToken(final TokenMetadata metadata, final Map<InetAddress, Double> load) throws IOException, ConfigurationException return getBalancedToken(metadata, load); // (2) public static Token getBalancedToken(TokenMetadata metadata, Map<InetAddress, Double> load) // (2) InetAddress maxEndpoint = getBootstrapSource(metadata, load); // 担当 Range 内で Bootstrap 中のノードが最も少なく、データ量がもっとも多いノードを返す。 // (既に引き継ぎ開始中のノードは除外するため。) Token<?> t = getBootstrapTokenFrom(maxEndpoint); // (3) // そのノードの Range の中間点の token が返る。 // つまり、そのノードの担当領域の半分を引き継ぐ return t; private static Token<?> getBootstrapTokenFrom(InetAddress maxEndpoint) // (3) Message message = new Message(FBUtilities.getLocalAddress(), "", StorageService.Verb.BOOTSTRAP_TOKEN, ArrayUtils.EMPTY_BYTE_ARRAY); // Verb = BOOTSTRAP_TOKEN を引き継ぎ元のノードになげて、 // (4) BootstrapTokenCallback btc = new BootstrapTokenCallback(); MessagingService.instance.sendRR(message, maxEndpoint, btc); return btc.getToken(); // 担当 token を受け取る
担当tokenを返答する側の処理
service/StorageService.java
MessagingService.instance.registerVerbHandlers(Verb.BOOTSTRAP_TOKEN, new BootStrapper.BootstrapTokenVerbHandler()); // (4) // Verb = BOOTSTRAP_TOKEN のハンドラ (5)
dht/BootStrapper.java
public static class BootstrapTokenVerbHandler implements IVerbHandler public void doVerb(Message message) // (5) StorageService ss = StorageService.instance; String tokenString = ss.getBootstrapToken().toString(); // 自ノードが担当する Range の中間点の token を返す Message response; response = message.getReply(FBUtilities.getLocalAddress(), tokenString.getBytes("UTF-8"));
Bootstrap 実施部分
※ Keyspace は Config ファイルで指定される事に注意。動的に追加されるものではない。
dht/BootStrapper.java
public void startBootstrap() throws IOException for (String table : DatabaseDescriptor.getNonSystemTables()) // table は Config で指定された Keyspace の事。 Multimap<Range, InetAddress> rangesWithSourceTarget = getRangesWithSources(table); // 指定の Keyspace について、<(Replica含めて)自ノードがこれから担当する Range, 今、そこのデータを持っている IP 一式>---(A) の組全体を返す。(1) /* Send messages to respective folks to stream data over to me */ for (Map.Entry<InetAddress, Collection<Range>> entry : getWorkMap(rangesWithSourceTarget).asMap().entrySet()) // entry は、(A) の Multimap をひっくり返して <IP, 送るべき Range一式> の Multimap にしたもの //(同じ Range については、最初の生きている IP のみが選択される。) InetAddress source = entry.getKey(); StorageService.instance.addBootstrapSource(source, table); // Bootstrap で引き継ぎ中のノードのリストに追加 StreamIn.requestRanges(source, table, entry.getValue()); // データを持っているノードに Keyspace と Ranges を指定して、データ転送を要求 (2) /** get potential sources for each range, ordered by proximity (as determined by EndPointSnitch) */ Multimap<Range, InetAddress> getRangesWithSources(String table) // (1) final AbstractReplicationStrategy strat = StorageService.instance.getReplicationStrategy(table); Collection<Range> myRanges = strat.getPendingAddressRanges(tokenMetadata, token, address, table); // 自ノードがこれから担当する Range が戻る Multimap<Range, InetAddress> myRangeAddresses = ArrayListMultimap.create(); Multimap<Range, InetAddress> rangeAddresses = strat.getRangeAddresses(tokenMetadata, table); // 既存の Range 一式が入る for (Range myRange : myRanges) for (Range range : rangeAddresses.keySet()) if (range.contains(myRange)) List<InetAddress> preferred = DatabaseDescriptor.getEndPointSnitch(table).getSortedListByProximity(address, rangeAddresses.get(range)); // 欲しいデータを持っているノード一式の IP が近い順のリスト myRangeAddresses.putAll(myRange, preferred); return myRangeAddresses;
(2)データ転送要求開始からデータ転送の実施の流れ
streaming/StreamIn.java
public static void requestRanges(InetAddress source, String tableName, Collection<Range> ranges) StreamRequestMetadata streamRequestMetadata = new StreamRequestMetadata(FBUtilities.getLocalAddress(), ranges, tableName); Message message = StreamRequestMessage.makeStreamRequestMessage(new StreamRequestMessage(streamRequestMetadata)); // StreamRequest メッセージを送付 (3) MessagingService.instance.sendOneWay(message, source);
streaming/StreamRequestVerbHandler.java
public class StreamRequestVerbHandler implements IVerbHandler // (3) StreamRequest メッセージを受信した側 public void doVerb(Message message) byte[] body = message.getMessageBody(); ByteArrayInputStream bufIn = new ByteArrayInputStream(body); StreamRequestMessage streamRequestMessage = StreamRequestMessage.serializer().deserialize(new DataInputStream(bufIn)); StreamRequestMetadata[] streamRequestMetadata = streamRequestMessage.streamRequestMetadata_; for (StreamRequestMetadata srm : streamRequestMetadata) { StreamOut.transferRanges(srm.target_, srm.table_, srm.ranges_, null); // (4) } }
streaming/StreamOut.java
public static void transferRanges(InetAddress target, String tableName, Collection<Range> ranges, Runnable callback) // (4) 要求されたデータを転送 // SSTable ファイルを分割して、転送部分のみのファイルを切り出して転送する。 /* * (1) dump all the memtables to disk. * (2) anticompaction -- split out the keys in the range specified * (3) transfer the data. */ /* Get the list of files that need to be streamed */ transferSSTables(target, table.forceAntiCompaction(ranges, target), tableName); // SSTR GC deletes the file when done
※ データ転送元にもデータはレプリカとしてそのまま残る。RackUnawareStrategy の場合は、再レプリケーションは不要。(おしりのレプリカが不要になるが、それは、Major Compaction で消されるはず???)
※ RackAwareStrategy の場合、再レプリケーションが必要な気がするが、そこのロジックは不明。
Bootstrap完了部分
./service/StorageService.java
public void removeBootstrapSource(InetAddress s, String table) // データ受け取りが終わるとこれが呼ばれる。 { if (table == null) bootstrapSet.removeAll(s); else bootstrapSet.remove(s, table); if (logger_.isDebugEnabled()) logger_.debug(String.format("Removed %s/%s as a bootstrap source; remaining is [%s]", s, table == null ? "<ALL>" : table, StringUtils.join(bootstrapSet.keySet(), ", "))); if (bootstrapSet.isEmpty()) { finishBootstrapping(); // 全部の転送元からファイルを受け取り終わると Bootstrap 完了 } } private void finishBootstrapping() { isBootstrapMode = false; SystemTable.setBootstrapped(true); setToken(getLocalToken()); Gossiper.instance.addLocalApplicationState(MOVE_STATE, new ApplicationState(STATE_NORMAL + Delimiter + partitioner_.getTokenFactory().toString(getLocalToken()))); logger_.info("Bootstrap/move completed! Now serving reads."); setMode("Normal", false); }