パート2はこちらです。
enakai00.hatenablog.com
パート2では、Hello World! 的なサンプルを GKE にデプロイしましたが、ここでは、もう少し本番に近い実装を試してみます。
ショッピングカートからの発注処理
次のような一連の処理を saga パターンで実装します。
・商品購入サービス(purchase-service):クライアントは、ショピングカートに商品をつめて、購入リクエストを出します。すると、「未承認状態」の配送チケットの情報を含むイベントが発行されます。(配送チケットの情報は、ローカルのDBにも保存)
・在庫管理サービス(stock-service):未承認状態の配送チケットを受け取ると、チケットに記載された商品の在庫にリザーブをかけて、「在庫確保状態」の配送チケットをイベントとして発行します。
・決済サービス(payment-service):「在庫確保状態」のイベントを受け取ると、チケットに記載された商品の合計金額の引き落とし処理を行い、「支払完了」の配送チケットをイベントして発行します。
・商品購入サービス(purchase-service):「支払完了」のイベントを受け取ると、「承認済み」の配送チケットをイベントとして発行します。
・配送サービス(delivery-service):「承認済み」の配送チケットを受け取ると、実際の配送の手配を開始します。
※ 商品購入サービスは、周りのサービスが発行したイベントにより配送チケットの状態変化を認識した際は、ローカルDBの情報を適宜更新します。
上記は正常に配送まで進んだ場合の流れですが、異常系を含めた各サービスの処理内容は次のようにまとめられます。
すべてを実装することも可能ですが、ここでは説明のために、次の3つのコンポーネントのみを実装します。
・商品購入サービス(purchase-service)
・在庫管理サービス(stock-service)
・API ゲートウェイ(api-gateway-service)
API ゲートウェイは、商品購入サービスの gRPC の API を REST API に変換してクラスターの外部に公開します。
事前準備
今回使用するコードは、事前に Github にアップロードしてあります。パート2で用意したクラスターに続けてデプロイしていきます。先にデプロイした Pod は削除しておいてください。開発用ワークステーションで次のコマンドを実行して、コードを取得します。
go get -d github.com/enakai00/go-micro-gcp-example
cd $GOPATH/src/github.com/enakai00/go-micro-gcp-example
以下のサブディレクトリーに各サービスのコードがあります。
・purchase:購買サービス
・api-gateway:購買サービスの API ゲートウェイ
・stock:在庫管理サービス
・config :Kubernetes にデプロイするための yaml ファイル群
また各サービスが使用するデータベースとして、Cloud Datastore を使用します。Cloud Console から「Datastore モード」を選択しておいてください。
商品購入サービスの作成
サブディレクトリー purchase 以下のコードをみます。
まず、proto/purchase/purchase.proto を見ると、次の API が定義されています。
rpc CreateCart(CreateCartRequest) returns (CreateCartResponse) {}
rpc GetCart(GetCartRequest) returns (GetCartResponse) {}
rpc AddItem(AddItemRequest) returns (AddItemResponse) {}
rpc GetCartContents(GetCartContentsRequest) returns (GetCartContentsResponse) {}
rpc CloseCart(CloseCartRequest) returns (CloseCartResponse) {}
rpc Checkout(CheckoutRequest) returns (CheckoutResponse) {}
rpc GetOrderTicket(GetOrderTicketRequest) returns (GetOrderTicketResponse) {}
※ デモ用なのでリクエスト処理中のエラーハンドリングは適当です。(I/O 関係のエラーは、クライアントにエラーを返さずに、その場で log.Fatal() で終了します。)
これらのハンドラーは、main.go で登録されており、ハンドラーのエントリーポイントは handler/grpc_handler.go にあります。いずれも Datastore の情報を操作することが主な処理で、実際の内容は、handler/operations.go にあります。
この中で、特にトランザクションを用いている部分を説明します。1つ目はカートの状態を「open(商品追加可能)」から「closed(商品追加不可能)」に変更する部分です。「open」以外の状態からは「close」に遷移しないことを保証するために(そのような要件がある前提)、現在の状態の確認と closed への変更を1つのトランザクションとして実行しています。
func CloseCart(cartid string) *purchase.Cart {
cartStructWithoutTx, ok := getCartStruct(cartid)
if !ok {
return &purchase.Cart{}
}
_, err := client.RunInTransaction(context.Background(),
func(tx *datastore.Transaction) error {
var cartStruct ds.Cart
err := tx.Get(cartStructWithoutTx.Key, &cartStruct)
if err != nil {
return err
}
if cartStruct.Status != "open" {
return nil
}
cartStruct.Status = "closed"
_, err = tx.Put(cartStruct.Key, &cartStruct)
if err != nil {
return err
}
return nil
})
if err != nil {
log.Fatalf("Error stroing data: %v", err)
}
return GetCart(cartid)
}
同様に、カートに商品を追加する際は、カートの状態が「open」であることを保証するために、「open」であることの確認と、商品の追加を1つのトランザクションにしています。(handler/operations.go)
・・・というあたりは、microservice とは直接関係ない感じですが、saga パターンで重要になるのが、イベント発行の部分です。今回は、チェックアウト処理の際に、「配送チケットをDBに保存する処理と、配送チケット情報のイベントを Publish する処理」をアトミックに実施するためにトランザクションを使用しています。(handler/operations.go)具体的には、カートの状態が「close」であることの確認、配送チケットのDB保存、イベントメッセージの配信用DB保存、の3つの処理を1つのトランザクションで実施しています。その後、配信用DBに保存されたメッセージを Publish する処理を別途実施しています。(events/utils.go)(ここでは、簡単のために同じコードから Publish 処理をしていますが、本来は、別のタスクがバックグラウンドで定期的にバッチ処理するものと考えてください。)
func PublishEvents() int {
query := datastore.NewQuery(eventPublishTable).Filter("sent =", false)
it := client.Run(context.Background(), query)
i := 0
for {
var eventEntity EventEntity
_, err := it.Next(&eventEntity)
if err == iterator.Done {
break
}
msg := &broker.Message{
Header: map[string]string{
"eventid": eventEntity.Eventid,
"type": eventEntity.Type,
},
Body: eventEntity.EventData,
}
err = brk.Publish(publishTopic, msg)
if err != nil {
log.Fatalf("Error publishing event: %v", err)
}
eventEntity.Sent = true
_, err = client.Put(context.Background(), eventEntity.Key, &eventEntity)
if err != nil {
log.Fatalf("Error stroing data: %v", err)
}
i += 1
}
return i
}
また、このサービスは、他のサービスが発行するイベントを処理するハンドラーも持ちます。main.goで events パッケージをインポートしたタイミングで、(パッケージのinit() 関数により)指定 Topic へのサブスクライブとイベントハンドラーの登録が行われます。
var (
publishTopic = os.Getenv("EVENT_PUBLISH_TOPIC")
subscribeTopics = os.Getenv("EVENT_SUBSCRIBE_TOPICS")
projectID = os.Getenv("GOOGLE_CLOUD_PROJECT")
eventPublishTable = "PurchasePublishEvent"
eventRecordTable = "PurchaseReceivedEvent"
client, _ = datastore.NewClient(context.Background(), projectID)
brk broker.Broker
)
func init() {
brk = googlepubsub.NewBroker(googlepubsub.ProjectID(projectID))
err := brk.Connect()
if err != nil {
log.Fatalf("Broker Connect error: %v", err)
}
for _, subscribeTopic := range strings.Split(subscribeTopics, ",") {
_, err := brk.Subscribe(subscribeTopic, eventHandler,
broker.Queue("purchase-service"),
broker.DisableAutoAck())
if err != nil {
log.Fatalf("Broker Subscribe error: %v", err)
}
}
}
イベントを Publish するトピック、サブスクライブするトピック(, 区切りで複数指定可能)は、環境変数で指定されています。なお、broker.Queue("purchase-service") の部分は、サブスクリプション名を指定しています。同一サービスを複数起動した場合は、同じサブスクリプションを共有するので、メッセージの分配が行われます。(逆に言うと、異なるサービスが同一のサブスクリプション名を使用すると、一方のサービスのみにメッセージが配信されてしまうので注意してください。)また、autoack を disable している点にも注意してください。イベントの処理が終わってから Ack することで、イベントの処理中に障害停止した場合でもイベントが失われません。
イベントハンドラーのエントリーポイント(events/handler.go)は、次のようになります。
func eventHandler(p broker.Event) error {
header := p.Message().Header
eventid := header["eventid"]
eventType := header["type"]
duplicate, err := isDuplicated(eventid)
if err != nil {
log.Fatalf("Error checking duplicated event: %v", err)
}
if duplicate {
p.Ack()
return nil
}
switch eventType {
case "purchase.OrderTicket":
var purchaseOrderTicket purchase.OrderTicket
err := json.Unmarshal(p.Message().Body, &purchaseOrderTicket)
if err != nil {
log.Fatalf("Error unmarshalling eventt: %v", err)
}
log.Infof("Handle event purchase.OrderTicket: %v", purchaseOrderTicket)
err = handlePurchaseOrderTicket(purchaseOrderTicket)
if err != nil {
log.Warnf("Failed to handle purchaseOrderTicke: %v", err)
} else {
recordEvent(eventid)
p.Ack()
}
default:
log.Infof("Unknown event type: %s", eventType)
recordEvent(eventid)
p.Ack()
}
return nil
}
イベント本体は json にシリアライズされているので、ヘッダー(header["type"])を見て、書き戻す構造体を判別しています。また、ユニークなイベント ID もヘッダー(header["eventid"])に含めてあり、処理済みのイベント ID を DB に記録することで重複排除を行っています。今回は、イベントの内容に応じてDBの更新も入るので、トランザクションを用いてDBの更新と新しいイベントの発行をアトミックに行っています。(イベントの発行は、一度 DB に書き出すという前述の作戦を用います。)
それでは、実際にサービスをデプロイします。Makefile の先頭にある PRJECT_ID を実際のプロジェクトIDに変更します。
PROJECT_ID=go-micro-test
この後は、make コマンドで Docker イメージの作成とプライベートレジストリへの Push まで自動で行えます。
make build
make docker
deployment の定義は、config 以下の deployment/purchase-service.yml です。次のように環境変数で、Project ID に加えて、Pub/Sub のトピックを指定しています。
- name: GOOGLE_CLOUD_PROJECT
value: "go-micro-test"
- name: EVENT_PUBLISH_TOPIC
value: "com.example.purchase"
- name: EVENT_SUBSCRIBE_TOPICS
value: "com.example.stock"
Cloud Console から com.example.purchase と com.example.stock の2つの topic を作成しておき、次のコマンドでクラスターにデプロイします。
cd $GOPATH/src/github.com/enakai00/go-micro-gcp-example
kubectl create -f config/deployment/purchase-service.yml
なお。。。。本質ではありませんが、go.mod の以下のエントリーに注意してください。
replace google.golang.org/grpc => github.com/enakai00/grpc-go v1.99.0
パート1で、etcd と grpc のモジュールの非互換依存問題のために、grpc モジュールのバージョンを下げる必要があると言いましたが、実際にこれを下げてしまうと、datastore クライアントが動かなくなります(datastore クライアントは新しい grpc モジュールが必要なため)。仕方がないので、新しい grpc モジュールに、etcd との組み合わせ問題を回避する monkey patch を当てたものを自分の github で公開しておきました。上記は、これを使用するための指定になります。
在庫管理サービスの作成
こちらは、gRPC の API は省略して、イベントハンドラーだけを実装しています。状態が「未承認(pre-approved)」の配送チケットを受診すると、在庫をリザーブしたのちに状態を変更した配送チケットをイベント送信します。ここでは、在庫リザーブ処理は省略して、デフォルトで在庫確保状態(reserved)に変更します。(ただし、商品名が「yellow」のものがあった場合は「確保失敗(reserve-failed)」、あるいは、「red」のものがあった場合は(支払い処理をスキップして)「支払済(paid)」に変更します。)
サブディレクトリー stock 以下の Makefile の先頭(プロジェクトID)を先と同様に修正して、次の手順でビルドとデプロイを行います。
cd $GOPATH/src/github.com/enakai00/go-micro-gcp-example/stock
make build
make docker
cd $GOPATH/src/github.com/enakai00/go-micro-gcp-example
kubectl create -f config/deployment/stock-service.yml
API ゲートウェイの作成
これは、Web API フレームワークの echo と組み合わせて作成しています。REST で受けたものをそのまま、gRPC の API に投げなおします。gRPC の API からは、proto ファイルから生成された構造体で結果が返るので、そのまま(echo の機能で json にシリアライズして)クライアントに返します。
サブディレクトリー stock 以下の Makefile の先頭(プロジェクトID)を先と同様に修正して、次の手順でビルドとデプロイを行います。
cd $GOPATH/src/github.com/enakai00/go-micro-gcp-example/api-gateway
make build
make docker
cd $GOPATH/src/github.com/enakai00/go-micro-gcp-example
kubectl create -f config/deployment/api-gateway-service.yml
また、このサービスは外部に公開する必要があるので、service も定義します。
kubectl create -f config/service/api-gateway-service.yml
ここでは簡易的に NodePort を使用しているので、サービスポートと Node の External IP を確認して、ファイアウォールを開きます。
$ kubectl get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
api-gateway-service NodePort 10.48.3.164 <none> 80:31022/TCP 102s
kubernetes ClusterIP 10.48.0.1 <none> 443/TCP 2d8h
$ kubectl get nodes --output wide
NAME STATUS ROLES AGE VERSION INTERNAL-IP EXTERNAL-IP OS-IMAGE
KERNEL-VERSION CONTAINER-RUNTIME
gke-cluster-2-default-pool-1f2a5d4a-16cz Ready <none> 2d8h v1.15.12-gke.2 10.128.0.8 34.66.xxx.xxx Container-Optimized
OS from Google 4.19.112+ docker://19.3.1
...
この例であれば、ポートは 31022、External IP は 34.66.xxx.xxx になるので、31022 に対するアクセスを許可します。
$ gcloud compute firewall-rules create test-node-port --allow tcp:31022
curl コマンドで指定の IP:Port が反応することを確認します。
$ curl http://34.66.xxx.xxx:31022
api-gateway
実際にリクエストする様子は次のようになります。(jq コマンドは事前に「sudo apt install jq」で入れておいてください。)
$ curl -s -XPOST -H 'Content-Type: application/json' -d '{"cartid": "1234567"}' http://34.66.xxx.xxx:31022/purchase/api/create_cart | jq .
{
"cart": {
"cartid": "1234567",
"status": "open"
}
}
$ curl -s -XPOST -H 'Content-Type: application/json' -d '{"cartid": "1234567", "itemid": "blue", "count": 3}' http://34.66.xxx.xxx:31022/purchase/api/add_item | jq .
{
"cart_items": [
{
"itemid": "blue",
"count": 3
}
]
}
$ curl -s -XPOST -H 'Content-Type: application/json' -d '{"cartid": "1234567", "itemid": "red", "count": 2}' http://34.66.xxx.xxx:31022/purchase/api/add_item | jq .
{
"cart_items": [
{
"itemid": "blue",
"count": 3
},
{
"itemid": "red",
"count": 2
}
]
}
$ curl -s -XPOST -H 'Content-Type: application/json' -d '{"cartid": "1234567"}' http://34.66.xxx.xxx:31022/purchase/api/close_cart | jq .
{
"cart": {
"cartid": "1234567",
"status": "closed"
}
}
$ curl -s -XPOST -H 'Content-Type: application/json' -d '{"cartid": "1234567"}' http://34.66.xxx.xxx:31022/purchase/api/checkout | jq .
{
"order_ticket": {
"orderid": "a352044c-1939-4ff6-b22a-793a0dfe7d03",
"status": "pre-approved",
"cart_items": [
{
"itemid": "blue",
"count": 3
},
{
"itemid": "red",
"count": 2
}
]
}
}
ここまでカートを作成して、商品を追加して、カートをクローズして、チェックアウトする、という流れです。チェックアウト直後の order_ticket は、"status": "pre-approved" である点に注意してください。
この後、裏側では、イベントのやり取りで処理が進んでいます。今回の場合、商品に "red" が含まれているので、在庫管理サービスは、チケットを「支払済(paid)」に変更します。このイベントを受けた購買サービスは、チケットを「承認済み(approved)」に変更して、DBのデータをアップデートします。実際に、チケットの状態を確認すると、"status": "approved" になっていることがわかります。
$ curl -s -XPOST -H 'Content-Type: application/json' -d '{"orderid": "a352044c-1939-4ff6-b22a-793a0dfe7d03"}' http://34.66.xxx.xxx:31022/purchase/api/get_order_ticket | jq .
{
"order_ticket": {
"orderid": "a352044c-1939-4ff6-b22a-793a0dfe7d03",
"status": "approved",
"cart_items": [
{
"itemid": "blue",
"count": 3
},
{
"itemid": "red",
"count": 2
}
]
}
}