めもめも

このブログに記載の内容は個人の見解であり、必ずしも所属組織の立場、戦略、意見を代表するものではありません。

Choreography-based saga をローカルで実験するためのフレームワーク

github.com

Choreography-based saga とは?

マイクロサービスの環境で、複数のサービスが連携するトランザクションを実行する手法にSaga パターンがあります。

一般にトランザクションに求められる特性として ACID がありますが、RDB を用いたトランザクションでは、特に Atomicity が容易に実現できます。つまり、ある処理を途中まで実行した段階で、「やっぱりやーめた!」となったときに、「ロールバック」のおまじないを唱えるだけで、それまでの処理をなかった事にできます。

一方、マイクロサービスの場合は、サービスごとに個別にDBを持っているので、(DBのトランザクション機能を用いて)複数のサービスにまたがる処理を「やっぱりやーめた!」と言って元にもどすのは困難です。

そこで、Saga パターンでは、トランザクションに含まれる一連の処理をワークフローのように定義しておき、「やっぱりやーめた」となった時は、それまでに行った処理をキャンセルするフローを実行して、論理的に元の状態に戻すという作戦を取ります。(ワークフローに含まれる個々の「タスク」は1つのサービスに閉じているので、タスク単位での通常のトランザクションは普通に実装可能です。)

さらに、このワークフローを実行する方式として、「Orchestration-based saga」と「Choreography-based saga」があります。

「Orchestration-based saga」では、「ワークフローマネージャー」に相当する機能をどこかのサービスに実装して、そこから各サービスを呼び出してワークフローを実行していきます。ただし、この方式の場合、ワークフローマネージャーとそこから操作されるサービス群の依存関係が強くなる(各サービスの自律性が損なわれる)というデメリットがあります。

そこで、ワークフローを管理するコントローラーを立てずに、各サービスが自立分散的に動作することで、ワークフローを実現しようというアイデアが、「Choreography-based saga」です。

簡単に言うと、各サービスは自分が管理するオブジェクトに変更があると、その事実を「イベント」として発行します。その他のサービスは、このイベントを見て、次に自分がやるべき作業を考えて、それを実行したのちに、その結果をまたイベントとして発行します。このようにして、イベントベースの連携処理がパタパタと繋がっていって、結果的にワークフローが実行されるというものです。(現実世界の事務作業もこんな感じですよね・・・)

具体的な実装としては、Cloud Pub/Sub のようなメッセージキューを用いて、サービスごとのイベントキューを定義しておき、各サービスは、自分のキューにイベントを吐き出していきます。このイベントを誰が受け取るかは気にしない点に注意してください。一方、他のサービスのイベントをトリガーとするサービスは、興味のあるサービスのキューにサブスクライブしておいて、イベントを受け取ります。

なんだかよくわからないという方のために、簡単な例を示します。


これは、Microservices Patterns という書籍にある例を少し簡単化したもので、フードデリバリーのサービスを題材にしたもので、次のようなワークフローになります。

1. クライアントからの発注イベントが ClientEventQueue に入る。
2. OrderService は発注情報(Status = "pending")を生成してDBに保存すると、その情報を示すイベントが OrderEventQueue に入る。
3. KitchenService は発注内容を見て、配送チケット情報(Status = "pending")をDBに保存すると、その情報を示すイベントが KitchenEventQueue に入る。
4. ConsumerService は、配送チケット情報を見て、発注された食事が発注者の Preference (ベジタリアン、アレルギー食品など)に合っている事をチェックする。チェック結果を示すイベントが ConsumerEventQueue に入る。
5. KitchenService は、チェック結果を見て問題なければ、配送チケット情報を更新(Status = "approved")する。その情報を示すイベントが KitchenEventQueue に入る。
6. OrderService は、配送チケット情報を見て、Status = "approved" であれば、発注情報を更新(Status = "approved")して、その情報を示すイベントが OrderEventQueue に入る。

ここでは、正常系のみを示していますが、たとえば、ConsumerService の発注確認結果が NG の場合は、そのあとは、発注を取り消すフローが走ります。

ワークフローのプロトタイプ実装

で・・・・、実際にこのようなワークフローを設計しだすと、机上の設計だけでは不安になるので、プロトタイプを実装して実験したくなります。この際、実際に Pub/Sub のメッセージキューを用意して、複数のマイクロサービスをデプロイして・・・・というのは大変なので、もっと簡易的にローカルでシミュレーションしたくなります。

そこで、Golang の Channel を用いて、複数の Goroutine からアクセスできる擬似メッセージキューを実装してみました。これを使うと、1つのマイクロサービスを1つの Goroutine として起動して、上述のようなワークフローが意図通りに流れるかをローカルで確認することができます。(擬似メッセージキューを本物の Pub/Sub に差し替えられるように、アダプター的に作ってあります。)

コードの説明は面倒なので割愛して、main だけお見せするとこんな感じです。。。

func main() {
	// Create event queues
	clientEventQueue := localQueue.Create("ClientEvents") // interface eventQueue.Queue
	orderEventQueue := localQueue.Create("OrderEvents")
	consumerEventQueue := localQueue.Create("ConsumerEvents")
	kitchenEventQueue := localQueue.Create("KitchenEvents")

	// Start handlers
	orderService.StartHandlers(orderEventQueue, clientEventQueue, kitchenEventQueue)
	kitchenService.StartHandlers(kitchenEventQueue, orderEventQueue, consumerEventQueue)
	consumerService.StartHandlers(consumerEventQueue, kitchenEventQueue)

	// Start saga with an OrderRequest event
	orderRequest := events.OrderRequest{
		ConsumerID:   "consumer001",
		RestaurantID: "restaurant001",
		FoodID:       "food001",
	}
	jsonBytes, _ := json.Marshal(orderRequest)
	eventQueue.Send("OrderRequest", jsonBytes, clientEventQueue)

	// Oberve OrderEvent queue
	orderEventSubscription := orderEventQueue.Subscribe("Client:OrderEvent")
	for {
		orderEventSubscription.Receive()
	}
}

最初に、event queues を作っているところを localQueue パッケージから、他のパッケージに差し替える事で、本物の Pub/Sub に差し替えられる想定です。

これを実行すると、イベントの送受信のログがこんな感じで流れます。

2020/08/03 09:44:26 Send event to ClientEvents: {"consumerID":"consumer001","restaurantID":"restaurant001","foodID":"food001"}
2020/08/03 09:44:26 Receive event by OrderService:ClientEvent: {"consumerID":"consumer001","restaurantID":"restaurant001","foodID":"food001"}
2020/08/03 09:44:26 Send event to OrderEvents: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","orderStatus":"pending","consumerID":"consumer001","restaurantID":"restaurant001","foodID":"food001"}
2020/08/03 09:44:26 Receive event by Client:OrderEvent: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","orderStatus":"pending","consumerID":"consumer001","restaurantID":"restaurant001","foodID":"food001"}
2020/08/03 09:44:26 Receive event by KitchenService:OrderEvent: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","orderStatus":"pending","consumerID":"consumer001","restaurantID":"restaurant001","foodID":"food001"}
2020/08/03 09:44:26 Send event to KitchenEvents: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","consumerID":"consumer001","status":"pending","restaurantID":"restaurant001","FoodID":"food001","FoodType":"vegan"}
2020/08/03 09:44:26 Receive event by ConsumerService:KitchenEvent: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","consumerID":"consumer001","status":"pending","restaurantID":"restaurant001","FoodID":"food001","FoodType":"vegan"}
2020/08/03 09:44:26 Receive event by OrderService:KitchenEvent: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","consumerID":"consumer001","status":"pending","restaurantID":"restaurant001","FoodID":"food001","FoodType":"vegan"}
2020/08/03 09:44:26 Send event to ConsumerEvents: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","ConsumerID":"consumer001","status":"verified"}
2020/08/03 09:44:26 Receive event by KitchenService:ConsumerEvent: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","ConsumerID":"consumer001","status":"verified"}
2020/08/03 09:44:26 Send event to KitchenEvents: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","consumerID":"consumer001","status":"approved","restaurantID":"restaurant001","FoodID":"food001","FoodType":"vegan"}
2020/08/03 09:44:26 Receive event by ConsumerService:KitchenEvent: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","consumerID":"consumer001","status":"approved","restaurantID":"restaurant001","FoodID":"food001","FoodType":"vegan"}
2020/08/03 09:44:26 Receive event by OrderService:KitchenEvent: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","consumerID":"consumer001","status":"approved","restaurantID":"restaurant001","FoodID":"food001","FoodType":"vegan"}
2020/08/03 09:44:26 Send event to OrderEvents: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","orderStatus":"approved","consumerID":"consumer001","restaurantID":"restaurant001","foodID":"food001"}
2020/08/03 09:44:26 Receive event by Client:OrderEvent: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","orderStatus":"approved","consumerID":"consumer001","restaurantID":"restaurant001","foodID":"food001"}
2020/08/03 09:44:26 Receive event by KitchenService:OrderEvent: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","orderStatus":"approved","consumerID":"consumer001","restaurantID":"restaurant001","foodID":"food001"}

おまけ:Saga パターンの耐障害性について

次のような考慮をした実装が必要です。

・各 Event Handler は Idempotent に作ります(メッセージキューは、イベントの重複配信があるので。)

・サービスが1つのイベントを発行する際は、「オブジェクトの更新+イベント発行+トリガーイベントのID記録」をアトミックに実行します。(たとえば、オブジェクトを更新したあと、イベント発行の前に障害停止して、イベントが消失すると取り返しがつかないので。)トリガーイベントのID記録は、同じトリガーイベントが重複配信された際に、これを無視するために利用します。

・トリガーイベントに対するメッセージキューへの Ack は、イベント発行後に行います。(Ack の前に障害停止すると、再起動時に同じイベント処理が走りますが、Event Handler が Idempotent であれば問題ありません。)

・キューに入ったイベントがあやまって消失しない事は、キューの耐障害性で保証します。

今回のサンプルであれば、次のような流れになります。
orderService/eventHandler.go

			// Begin Transaction
			orderDatabase[orderStatus.OrderID] = orderStatus // store order in DB
			jsonBytes, _ := json.Marshal(orderStatus)
			eventQueue.Send("OrderStatus", jsonBytes, orderEventQueue)
			// End Transaction
			// event.Ack()

「DBのアップデートとイベントの発行を1つのトランザクションに入れる」という方法は、実は工夫が必要です。たとえば、「DBをアップデートして、イベントを配信用DBに保存する」というDBに閉じた処理をトランザクションで実施しておき、そのあと、配信用DBに保存されたイベントを送信する処理を別タスクとして実装します。