めもめも

このブログに記載の内容は個人の見解であり、必ずしも所属組織の立場、戦略、意見を代表するものではありません。

MidoNetのソースを読んだ時の俺メモ

参考資料

Midolman code walkthrough

・ソースコードの取得

# git clone https://github.com/midonet/midonet.git

midolmanが仮想ポートにトンネルIDを割り当てるタイミング

前回の勉強会で質問が出たので調べてみました。

結論:ポートを定義してZooKeeperに登録するタイミングで割り当てています。

./midolman/src/main/java/org/midonet/midolman/state/zkManagers/PortZkManager.java

    private List<Op> prepareCreate(UUID id,
            PortDirectory.RouterPortConfig config) throws StateAccessException,
            SerializationException {
        List<Op> ops = new ArrayList<Op>();
        // On create, make a tunnel key id. This won't end up being used if
        // the port becomes interior.
        int tunnelKeyId = tunnelZkManager.createTunnelKeyId();     // <---ココ
        config.tunnelKey = tunnelKeyId;
...

./midolman/src/main/java/org/midonet/midolman/state/zkManagers/TunnelZkManager.java

    public int createTunnelKeyId()
            throws StateAccessException {
        String path = zk.addPersistentSequential(paths.getTunnelPath(), null);  // <---(1)
        int key = extractTunnelKeyFromPath(path); // <---(2)
...
        return key;
    }

(1) zk.addPersistentSequential()は、ZooKeeperのツリー上にPERSISTENT_SEQUENTIALモードで、シーケンシャル番号を持ったノードを追加します。

(2) extractTunnelKeyFromPath()は、ノードに振られた番号を返します。

以上終了。

・・・ではつまらないので、Neutron AgentがVifPortを作る所から見ておきます。

./midolman/src/main/java/org/midonet/cluster/data/neutron/NeutronPlugin.java

/**
 * MidoNet implementation of Neutron plugin interface.
 */
@SuppressWarnings("unused")
public class NeutronPlugin implements NetworkApi, L3Api, SecurityGroupApi,
                                      LoadBalancerApi {
...
    private void createPortOps(List<Op> ops, Port port)
        throws SerializationException, StateAccessException {
        networkZkManager.prepareCreateNeutronPort(ops, port);
        if (port.isVif()) {
            PortConfig cfg = networkZkManager.prepareCreateVifPort(ops, port);  // <---(1)
            securityGroupZkManager.preparePortSecurityGroupBindings(ops, port,
                                                                    cfg);

(1)
./midolman/src/main/java/org/midonet/cluster/data/neutron/NetworkZkManager.java

    public PortConfig prepareCreateVifPort(List<Op> ops, Port port)
        throws StateAccessException, SerializationException {
        BridgePortConfig cfg = prepareCreateBridgePort(ops, port); // <---(2)
...

(2)
./midolman/src/main/java/org/midonet/cluster/data/neutron/NetworkZkManager.java

    public BridgePortConfig prepareCreateBridgePort(List<Op> ops, Port port)
        throws SerializationException, StateAccessException {
        BridgePortConfig cfg = new BridgePortConfig(port.networkId,
                                                    port.adminStateUp);
        ops.addAll(portZkManager.prepareCreate(port.id, cfg));   // <--- (3)
        return cfg;
    }

(3)のところで、冒頭のprepareCreate()が呼び出されています。

パケットシミュレーションが始まるまでの部分

参考資料:Midolman fast path overview

./midolman/src/main/scala/org/midonet/midolman/DeduplicationActor.scala

class DeduplicationActor(    // <-- パケットシミュレーションを実施するアクター
            val cookieGen: CookieGenerator,
            val dpConnPool: DatapathConnectionPool,
            val clusterDataClient: DataClient,
            val connTrackStateTable: FlowStateTable[ConnTrackKey, ConnTrackValue],
            val natStateTable: FlowStateTable[NatKey, NatBinding],
            val storage: FlowStateStorage,
            val natLeaser: NatLeaser,
            val metrics: PacketPipelineMetrics,
            val packetOut: Int => Unit)
            extends Actor with ActorLogWithoutPath {
...
    override def receive = {
...
        case HandlePackets(packets) =>   // パケット処理依頼メッセージ
            actionsCache.clearProcessedFlowMatches()
            connTrackStateTable.expireIdleEntries((), invalidateExpiredConnTrackKeys)
            natStateTable.expireIdleEntries((), invalidateExpiredNatKeys)
            natLeaser.obliterateUnusedBlocks()
            var i = 0
            while (i < packets.length && packets(i) != null) {
                handlePacket(packets(i))   // パケットごとにhandlePacket()実施 --->(1)
                i += 1
            }
...
    private def handlePacket(packet: Packet): Unit = {  // (1)
        val flowMatch = packet.getMatch
        val actions = actionsCache.actions.get(flowMatch)
        if (actions != null) {  // 後で調べる(in kernel tableからは消えているが、計算済みのアクションがユーザーランドのキャッシュに残っている場合と思われる。)
            log.debug("Got actions from the cache {} for match {}",
                       actions, flowMatch)
            executePacket(packet, actions)
            metrics.packetsProcessed.mark()
            packetOut(1)
        } else if (FlowStatePackets.isStateMessage(packet)) {
            processPacket(packet)
        } else suspendedPackets.get(flowMatch) match {
            case null =>
                processPacket(packet)  // ---> (2)
            case packets =>
                log.debug("A matching packet is already being handled")
                packets.add(packet)
                packetOut(1)
                giveUpWorkflows(waitingRoom.doExpirations())
        }
    }
...
    // No simulation is in progress for the flow match. Create a new
    // cookie and start the packet workflow.
    private def processPacket(packet: Packet): Unit = {  // (2)
        val newCookie = cookieGen.next
        startWorkflow(packet, Left(newCookie))  // ---> (3)
    }
    protected def startWorkflow(packet: Packet, // (3)
                                cookieOrEgressPort: Either[Int, UUID],
                                parentCookie: Option[Int] = None): Unit =
        try {
            val ctx = packetContext(packet, cookieOrEgressPort, parentCookie)
            MDC.put("cookie", ctx.cookieStr)
            log.debug(s"New cookie for new match ${packet.getMatch}")
            runWorkflow(ctx)  // --> (4)
        } catch {
            case ex: Exception =>
                log.error("Unable to execute workflow", ex)
        } finally {
            if (cookieOrEgressPort.isLeft)
                packetOut(1)
            MDC.remove("cookie")
        }
...
    protected def runWorkflow(pktCtx: PacketContext): Unit =
        try {
            complete(pktCtx, workflow.start(pktCtx))   // workflow.start()でシミュレーションが開始する。
        } catch {
            case NotYetException(f, msg) =>
                log.debug(s"Postponing simulation because: $msg")
                postponeOn(pktCtx, f)
            case ex: Exception => handleErrorOn(pktCtx, ex)
        } finally {
            flushTransactions()
        }

./midolman/src/main/scala/org/midonet/midolman/PacketWorkflow.scala

class PacketWorkflow(protected val dpState: DatapathState, // パケットのシミュレーションを扱うクラス
                     val datapath: Datapath,
                     val dataClient: DataClient,
                     val dpConnPool: DatapathConnectionPool,
                     val cbExecutor: CallbackExecutor,
                     val actionsCache: ActionsCache,
                     val replicator: FlowStateReplicator)
                    (implicit val system: ActorSystem)
        extends FlowTranslator with PacketHandler {
...
    override def start(context: PacketContext): PipelinePath = {  // シミュレーションの開始
        context.prepareForSimulation(FlowController.lastInvalidationEvent)
        context.log.debug(s"Initiating processing, attempt: ${context.runs}")
        val res = if (context.ingressed)
                    handlePacketWithCookie(context)
                  else
                    doEgressPortSimulation(context) // VirtualTopology外部との接続ポートから入ってきたパケットの場合 -->(1)
        res
    }
...
    private def doEgressPortSimulation(context: PacketContext) = {  // (1)
        context.log.debug("Handling generated packet")
        processSimulationResult(context, runSimulation(context))    // -->(2)
    }
...
    def runSimulation(context: PacketContext): SimulationResult =   // (2)
        new Coordinator(context).simulate()  // ここで実際のシュミレーションが始まる

./midolman/src/main/scala/org/midonet/midolman/simulation/Coordinator.scala

    /**
     * Simulate the packet moving through the virtual topology. The packet
     * begins its journey through the virtual topology in one of these ways:
     * 1) it ingresses an exterior port of a virtual device (in which case the
     * packet arrives via the datapath switch from an entity outside the
     * virtual topology).
     * 2) it egresses an interior port of a virtual device (in which case the
     * packet was generated by that virtual device).
     *
     * In case 1, the match object for the packet was computed by the
     * FlowController and must contain an inPortID. If a wildcard flow is
     * eventually installed in the FlowController, the match will be a subset
     * of the match originally provided to the simulation. Note that in this
     * case the generatedPacketEgressPort argument should be null and will be
     * ignored.
     *
     * In case 2, the match object for the packet was computed by the Device
     * that emitted the packet and must not contain an inPortID. If the packet
     * is not dropped, it will eventually result in a packet being emitted
     * from one or more of the datapath ports. However, a flow is never
     * installed as a result of such a simulation. Note that in this case the
     * generatedPacketEgressPort argument must not be null.
     *
     * When the future returned by this method completes all the actions
     * resulting from the simulation (install flow and/or execute packet)
     * have been completed.
     *
     * The resulting future is never in a failed state.
     */
    def simulate(): SimulationResult = {  // シュミレーションの開始
        log.debug("Simulating a packet")
        context.cookieOrEgressPort match {
            case Left(_) => // This is a packet from the datapath
                val inPortId = context.inputPort
                packetIngressesPort(inPortId, getPortGroups = true)
            case Right(egressID) =>
                packetEgressesPort(egressID) // Virtual Toplogy外部との接続ポートから入ってきたパケットの処理
        }
    }
...
    /**
     * Simulate the packet egressing a virtual port. This is NOT intended
     * for flooding bridges
     */
    private def packetEgressesPort(portID: UUID): SimulationResult = { // ここから先はまた別途・・・。
        val port = tryAsk[Port](portID)
        context.addFlowTag(port.deviceTag)
...