midolmanが仮想ポートにトンネルIDを割り当てるタイミング
前回の勉強会で質問が出たので調べてみました。
結論:ポートを定義してZooKeeperに登録するタイミングで割り当てています。
./midolman/src/main/java/org/midonet/midolman/state/zkManagers/PortZkManager.java
private List<Op> prepareCreate(UUID id, PortDirectory.RouterPortConfig config) throws StateAccessException, SerializationException { List<Op> ops = new ArrayList<Op>(); // On create, make a tunnel key id. This won't end up being used if // the port becomes interior. int tunnelKeyId = tunnelZkManager.createTunnelKeyId(); // <---ココ config.tunnelKey = tunnelKeyId; ...
./midolman/src/main/java/org/midonet/midolman/state/zkManagers/TunnelZkManager.java
public int createTunnelKeyId() throws StateAccessException { String path = zk.addPersistentSequential(paths.getTunnelPath(), null); // <---(1) int key = extractTunnelKeyFromPath(path); // <---(2) ... return key; }
(1) zk.addPersistentSequential()は、ZooKeeperのツリー上にPERSISTENT_SEQUENTIALモードで、シーケンシャル番号を持ったノードを追加します。
(2) extractTunnelKeyFromPath()は、ノードに振られた番号を返します。
以上終了。
・・・ではつまらないので、Neutron AgentがVifPortを作る所から見ておきます。
./midolman/src/main/java/org/midonet/cluster/data/neutron/NeutronPlugin.java
/** * MidoNet implementation of Neutron plugin interface. */ @SuppressWarnings("unused") public class NeutronPlugin implements NetworkApi, L3Api, SecurityGroupApi, LoadBalancerApi { ... private void createPortOps(List<Op> ops, Port port) throws SerializationException, StateAccessException { networkZkManager.prepareCreateNeutronPort(ops, port); if (port.isVif()) { PortConfig cfg = networkZkManager.prepareCreateVifPort(ops, port); // <---(1) securityGroupZkManager.preparePortSecurityGroupBindings(ops, port, cfg);
(1)
./midolman/src/main/java/org/midonet/cluster/data/neutron/NetworkZkManager.java
public PortConfig prepareCreateVifPort(List<Op> ops, Port port) throws StateAccessException, SerializationException { BridgePortConfig cfg = prepareCreateBridgePort(ops, port); // <---(2) ...
(2)
./midolman/src/main/java/org/midonet/cluster/data/neutron/NetworkZkManager.java
public BridgePortConfig prepareCreateBridgePort(List<Op> ops, Port port) throws SerializationException, StateAccessException { BridgePortConfig cfg = new BridgePortConfig(port.networkId, port.adminStateUp); ops.addAll(portZkManager.prepareCreate(port.id, cfg)); // <--- (3) return cfg; }
(3)のところで、冒頭のprepareCreate()が呼び出されています。
パケットシミュレーションが始まるまでの部分
参考資料:Midolman fast path overview
./midolman/src/main/scala/org/midonet/midolman/DeduplicationActor.scala
class DeduplicationActor( // <-- パケットシミュレーションを実施するアクター val cookieGen: CookieGenerator, val dpConnPool: DatapathConnectionPool, val clusterDataClient: DataClient, val connTrackStateTable: FlowStateTable[ConnTrackKey, ConnTrackValue], val natStateTable: FlowStateTable[NatKey, NatBinding], val storage: FlowStateStorage, val natLeaser: NatLeaser, val metrics: PacketPipelineMetrics, val packetOut: Int => Unit) extends Actor with ActorLogWithoutPath { ... override def receive = { ... case HandlePackets(packets) => // パケット処理依頼メッセージ actionsCache.clearProcessedFlowMatches() connTrackStateTable.expireIdleEntries((), invalidateExpiredConnTrackKeys) natStateTable.expireIdleEntries((), invalidateExpiredNatKeys) natLeaser.obliterateUnusedBlocks() var i = 0 while (i < packets.length && packets(i) != null) { handlePacket(packets(i)) // パケットごとにhandlePacket()実施 --->(1) i += 1 } ... private def handlePacket(packet: Packet): Unit = { // (1) val flowMatch = packet.getMatch val actions = actionsCache.actions.get(flowMatch) if (actions != null) { // 後で調べる(in kernel tableからは消えているが、計算済みのアクションがユーザーランドのキャッシュに残っている場合と思われる。) log.debug("Got actions from the cache {} for match {}", actions, flowMatch) executePacket(packet, actions) metrics.packetsProcessed.mark() packetOut(1) } else if (FlowStatePackets.isStateMessage(packet)) { processPacket(packet) } else suspendedPackets.get(flowMatch) match { case null => processPacket(packet) // ---> (2) case packets => log.debug("A matching packet is already being handled") packets.add(packet) packetOut(1) giveUpWorkflows(waitingRoom.doExpirations()) } } ... // No simulation is in progress for the flow match. Create a new // cookie and start the packet workflow. private def processPacket(packet: Packet): Unit = { // (2) val newCookie = cookieGen.next startWorkflow(packet, Left(newCookie)) // ---> (3) } protected def startWorkflow(packet: Packet, // (3) cookieOrEgressPort: Either[Int, UUID], parentCookie: Option[Int] = None): Unit = try { val ctx = packetContext(packet, cookieOrEgressPort, parentCookie) MDC.put("cookie", ctx.cookieStr) log.debug(s"New cookie for new match ${packet.getMatch}") runWorkflow(ctx) // --> (4) } catch { case ex: Exception => log.error("Unable to execute workflow", ex) } finally { if (cookieOrEgressPort.isLeft) packetOut(1) MDC.remove("cookie") } ... protected def runWorkflow(pktCtx: PacketContext): Unit = try { complete(pktCtx, workflow.start(pktCtx)) // workflow.start()でシミュレーションが開始する。 } catch { case NotYetException(f, msg) => log.debug(s"Postponing simulation because: $msg") postponeOn(pktCtx, f) case ex: Exception => handleErrorOn(pktCtx, ex) } finally { flushTransactions() }
./midolman/src/main/scala/org/midonet/midolman/PacketWorkflow.scala
class PacketWorkflow(protected val dpState: DatapathState, // パケットのシミュレーションを扱うクラス val datapath: Datapath, val dataClient: DataClient, val dpConnPool: DatapathConnectionPool, val cbExecutor: CallbackExecutor, val actionsCache: ActionsCache, val replicator: FlowStateReplicator) (implicit val system: ActorSystem) extends FlowTranslator with PacketHandler { ... override def start(context: PacketContext): PipelinePath = { // シミュレーションの開始 context.prepareForSimulation(FlowController.lastInvalidationEvent) context.log.debug(s"Initiating processing, attempt: ${context.runs}") val res = if (context.ingressed) handlePacketWithCookie(context) else doEgressPortSimulation(context) // VirtualTopology外部との接続ポートから入ってきたパケットの場合 -->(1) res } ... private def doEgressPortSimulation(context: PacketContext) = { // (1) context.log.debug("Handling generated packet") processSimulationResult(context, runSimulation(context)) // -->(2) } ... def runSimulation(context: PacketContext): SimulationResult = // (2) new Coordinator(context).simulate() // ここで実際のシュミレーションが始まる
./midolman/src/main/scala/org/midonet/midolman/simulation/Coordinator.scala
/** * Simulate the packet moving through the virtual topology. The packet * begins its journey through the virtual topology in one of these ways: * 1) it ingresses an exterior port of a virtual device (in which case the * packet arrives via the datapath switch from an entity outside the * virtual topology). * 2) it egresses an interior port of a virtual device (in which case the * packet was generated by that virtual device). * * In case 1, the match object for the packet was computed by the * FlowController and must contain an inPortID. If a wildcard flow is * eventually installed in the FlowController, the match will be a subset * of the match originally provided to the simulation. Note that in this * case the generatedPacketEgressPort argument should be null and will be * ignored. * * In case 2, the match object for the packet was computed by the Device * that emitted the packet and must not contain an inPortID. If the packet * is not dropped, it will eventually result in a packet being emitted * from one or more of the datapath ports. However, a flow is never * installed as a result of such a simulation. Note that in this case the * generatedPacketEgressPort argument must not be null. * * When the future returned by this method completes all the actions * resulting from the simulation (install flow and/or execute packet) * have been completed. * * The resulting future is never in a failed state. */ def simulate(): SimulationResult = { // シュミレーションの開始 log.debug("Simulating a packet") context.cookieOrEgressPort match { case Left(_) => // This is a packet from the datapath val inPortId = context.inputPort packetIngressesPort(inPortId, getPortGroups = true) case Right(egressID) => packetEgressesPort(egressID) // Virtual Toplogy外部との接続ポートから入ってきたパケットの処理 } } ... /** * Simulate the packet egressing a virtual port. This is NOT intended * for flooding bridges */ private def packetEgressesPort(portID: UUID): SimulationResult = { // ここから先はまた別途・・・。 val port = tryAsk[Port](portID) context.addFlowTag(port.deviceTag) ...