めもめも

このブログに記載の内容は個人の見解であり、必ずしも所属組織の立場、戦略、意見を代表するものではありません。

ADK で A2A リモートエージェントとセッション内容を共有する方法

前提

A2A プロトコルでは、呼び出し側のエージェントと呼び出される側のエージェントでセッション情報(これまでのユーザー、および、エージェント群の会話の内容)を共有する仕組みが規定されておらず、必要に応じてユーザーが作り込む必要があります。

正確に言うと、A2A クライアントは、リクエストメッセージに「context id」を指定することで、「どのセッションに関するリクエストなのか」を示すことができますが、この ID で指定されるセッションの中身そのものを共有する仕組みが用意されていません。

一案としては、セッションデータベース的なものを外部に用意して、呼び出し側と呼び出される側でセッション内容を共有する方法が考えられますが、(A2A プロトコルの標準には含まれていない)この仕組みそのものを双方で合意する必要があり、リモートエージェントとの間にアーキテクチャー的な密結合が生まれてしまいます。(やればできるけど、A2A の設計思想的にどうなのよ、という話になります。)

セッションの内容を毎回全部送る例

zenn.dev

上記の記事では、この問題に対する一つの対応方法として、「これまでのセッションの内容を全部まとめてリモートエージェントに送りつける」という実装を行なっています。

記事内の「リモートエージェントに送信するリクエストの内容」で説明しているように、呼び出し側の ADK エージェントは、自身のセッションマネージャーからこれまでのイベント(会話履歴)を全部取り出して、JSONにシリアライズしたものをリモートエージェントへのテキストメッセージとして送信します。

一方、これを受け取ったリモートエージェントは、こちらも ADK で実装しており、毎回、新しいセッションを作成して、JSONで受け取ったすべてのイベントをリストアした上で、リクエストを処理します。

この方法であれば、context id でセッションを特定する必要すらなく、完全に疎結合(idempotent)な実装が実現できます。

・・・とはいえ、セッションが長くなると、毎回、すべてのセッションを送りつけてリストアさせるオーバーヘッドが気になりはじめるやも知れません。

両側で個別にセッションを保存する方法

もうちょっと洗練された方法としては、次のような案が考えられます。

  • 呼び出し側と呼び出される側が個別にセッションマネージャーを用意して、セッション情報を保存して、context id でセッションを特定する。

それぞれの側では、セッションマネージャーがアサインした session id でセッションが特定されますが、両側で session id が一致する保証がないので、共通の context id からローカルの session id にマッピングする辞書を用意します。

  • リモートエージェントを呼び出す際は、「前回呼び出した時点から、今までに発生した新しいイベント群」を context id とあわせて送信する。

  • 呼び出された側は、context id でローカルのセッションを特定して、受け取った新規差分イベントを認識した上で、リクエストを処理する。新規差分イベントと処理結果は、ローカルのセッションに追記する。

これであれば、「context id でセッションを認識する」という A2A 標準の考え方に従いつつ、A2A で規定されていないセッション保存方法を ADK 標準のセッションマネージャーでまかなうという、ほどほどの折衷案になりそうです。

と思っていたら!

最近、ADK や Agent Engine が native で A2A をサポートするように開発が進んでいますが、実装内容を見ていると、どうやら上記の「両側で個別にセッションを保存する方法」が標準機能として実装されつつあるようです。

zenn.dev

上記の記事では、ADK と Agent Engine の標準機能だけでシンプルに A2A 連携を実装する例が紹介されています。

  • A2A サーバー側では、ADK が標準提供する A2aAgentExecutor で A2A サーバー側の機能を実装する。
# A2A Agent Executorを生成するビルダー関数
def agent_executor_builder():
    return A2aAgentExecutor(
        runner=create_runner,
    )
  • A2A クライアント側では、ADK が標準提供する RemoteA2aAgent でリモートエージェントを登録して、サブエージェントとして利用する。
# A2Aクライアントエージェント(リモートの時計エージェントを指す)
time_agent = RemoteA2aAgent(
    name="time_agent",
    description="時計エージェント",
    agent_card=f"{a2a_url}/v1/card",
    a2a_client_factory=factory,
)

# オーケストレーションエージェント
root_agent = Agent(
    name="root_agent",
    model="gemini-2.5-flash",
...
    sub_agents=[time_agent],
)

ここまで簡単に A2A が使えていいのか!という気分にもなりますが、この裏側でセッション情報がどのように共有されているのかが気になりますよね。(気になってください。)

まず、呼び出し側の実装を見ると、 RemoteA2aAgent は、_construct_message_parts_from_session() でリモートエージェントに送信するメッセージを構成しています。(コード

      message_parts, context_id = self._construct_message_parts_from_session(
          ctx
      )

で、この関数の実装を見ると・・・

  def _construct_message_parts_from_session(
      self, ctx: InvocationContext
  ) -> tuple[list[A2APart], dict[str, Any], str]:
    """Construct A2A message parts from session events.

    Args:
      ctx: The invocation context

    Returns:
      List of A2A parts extracted from session events, context ID
    """
    message_parts: list[A2APart] = []
    context_id = None
    for event in reversed(ctx.session.events):
      if _is_other_agent_reply(self.name, event):
        event = _present_other_agent_message(event)
      elif event.author == self.name:
        # stop on content generated by current a2a agent given it should already
        # be in remote session
        if event.custom_metadata:
          context_id = (
              event.custom_metadata.get(A2A_METADATA_PREFIX + "context_id")
              if event.custom_metadata
              else None
          )
        break

既存のセッションに含まれるイベントを現在から過去に向かってチェックしていき、「直前に該当のエージェントを呼び出した時点〜現在」の範囲のイベントをメッセージに詰め込んでいます。青字のコメントを見ると「それ以前の内容はリモートセッションに存在する」とあり、まさに前述のアイデアがそのまま実装されています。

また、直前に該当のリモートエージェントを呼び出した際のイベント内の custom_metadata から、このリモートエージェントとのセッション共有に必要な context id を取得しています。

該当のリモートエージェントを初めて呼ぶ際は、context id は None の状態になりますが、その場合は、リモートエージェント側で新しい context id がセットされて帰ってくるので、その情報をここcustom_metadata に記録します。

※ A2A サーバー側で context id を生成する部分の実装は確認できていませんが、A2A プロトコルでそのように規定されているので、そうなっているんでしょう。

a2a-protocol.org

ちなみに、リモートエージェントに送信するメッセージを構成する実装の続きを見ると、過去の複数のイベントに含まれる Part オブジェクトをまとめて message_parts リストに入れて、これをメッセージとして送信しています。

      for part in event.content.parts:

        converted_part = self._genai_part_converter(part)
        if converted_part:
          message_parts.append(converted_part)

「セッションの内容を毎回全部送る例」で説明した方法では、個々のイベントをそのままの形でリモートエージェントに送って、リモートエージェント側でセッションをそのままリストアするという方法でしたが、こちらの場合、リモートエージェントは、「前回呼んだ時から今までの間にこんなことがあったよ」という内容の新規のリクエストを受け取ります。したがって、この内容をリモートエージェント側のセッションにリストアするのではなく、純粋に新規リクエストとして処理すれば、ADK Runner の通常のセッション管理の仕組みを通じて、新規イベントとしてセッションに記録されることになります。

というわけでリモートエージェント側のリクエスト処理は、context id でセッションを特定する以外は、特別なことはありません。実装については、このあたりを参照してください。

実装例

Agent Engine に A2A サーバーをデプロイする例を「セッションの内容を毎回全部送る例」にあわせた愚直な実装と、「と思っていたら!」で説明した、ADK / Agent Engine の標準モジュールを利用する実装で用意してみました。