めもめも

このブログに記載の内容は個人の見解であり、必ずしも所属組織の立場、戦略、意見を代表するものではありません。

「ITエンジニアのための強化学習理論入門」(初版1刷)正誤表

誤植の訂正

p.193 (下から6行目の末尾)

誤:極端に小さな値( 10^{-10}
正:極端に小さな値( -10^{10}

コードの訂正

GitHub 上のコードも修正してあります。

p.213 (SARSA の実装コード:[MSA-05])

誤:

16:      a_new = agent.policy[s]

正:

16:      a_new = agent.policy[s_new]

※ この修正に伴い、コードの実行結果が少し変化するため、関連する本文の内容を次のように変更させていただきます。

p.214([MSA-09] の出力結果とその直後の一文)

[変更前]

############
#++++++++++#
#         +#
#######   +#
#         +#
#  ++++++++#
#  + #######
#  +       #
#  +++++++G#
############

 ここでは、ε=0 を指定して、ランダムな行動は混ぜないようにしていますが、最短の経路を学ぶことはできていないようです。先に説明したように、SARSAでは、 ランダムな行動が混ざったデータを本来の行動ポリシーに基づいたデータとみなして 学習します。そのため、学習結果にこのような変動が発生します。

[変更後]

############
#+++++++   #
#      +   #
#######+   #
#    +++   #
#   ++     #
#   +#######
#   ++     #
#    +++++G#
############

 この例では、うまく最短の経路が学習できているようです。ただし、先に説明したように、SARSAでは、ランダムな行動が混ざったデータを本来の行動ポリシーに基づいたデータとみなして学習するため、もう少し複雑な問題になると、最短経路の学習に失敗する可能性が大きくなります(章末の演習問題2を参照)。

p.216(図4.13)

[変更後]

#----------#
#S        G#
#   ####   #
#   ####   #
#          #
#          #
#          #
############

※ SARSA が最短経路の学習に失敗しやすくなるように、迷路の形を変更しました。

p.218(図4.17:SARSAによる学習結果)

[変更後]

#----------#
#+        G#
#+  ####  +#
#+  ####+++#
#++++++++  #
#          #
#          #
############

Building microservices with go-micro on GCP(パート3)

パート2はこちらです。

enakai00.hatenablog.com

パート2では、Hello World! 的なサンプルを GKE にデプロイしましたが、ここでは、もう少し本番に近い実装を試してみます。

ショッピングカートからの発注処理

次のような一連の処理を saga パターンで実装します。

・商品購入サービス(purchase-service):クライアントは、ショピングカートに商品をつめて、購入リクエストを出します。すると、「未承認状態」の配送チケットの情報を含むイベントが発行されます。(配送チケットの情報は、ローカルのDBにも保存)

・在庫管理サービス(stock-service):未承認状態の配送チケットを受け取ると、チケットに記載された商品の在庫にリザーブをかけて、「在庫確保状態」の配送チケットをイベントとして発行します。

・決済サービス(payment-service):「在庫確保状態」のイベントを受け取ると、チケットに記載された商品の合計金額の引き落とし処理を行い、「支払完了」の配送チケットをイベントして発行します。

・商品購入サービス(purchase-service):「支払完了」のイベントを受け取ると、「承認済み」の配送チケットをイベントとして発行します。

・配送サービス(delivery-service):「承認済み」の配送チケットを受け取ると、実際の配送の手配を開始します。

※ 商品購入サービスは、周りのサービスが発行したイベントにより配送チケットの状態変化を認識した際は、ローカルDBの情報を適宜更新します。

上記は正常に配送まで進んだ場合の流れですが、異常系を含めた各サービスの処理内容は次のようにまとめられます。

すべてを実装することも可能ですが、ここでは説明のために、次の3つのコンポーネントのみを実装します。

・商品購入サービス(purchase-service)

・在庫管理サービス(stock-service)

・API ゲートウェイ(api-gateway-service)

API ゲートウェイは、商品購入サービスの gRPC の API を REST API に変換してクラスターの外部に公開します。

事前準備

今回使用するコードは、事前に Github にアップロードしてあります。パート2で用意したクラスターに続けてデプロイしていきます。先にデプロイした Pod は削除しておいてください。開発用ワークステーションで次のコマンドを実行して、コードを取得します。

go get -d github.com/enakai00/go-micro-gcp-example
cd $GOPATH/src/github.com/enakai00/go-micro-gcp-example

以下のサブディレクトリーに各サービスのコードがあります。

・purchase:購買サービス
・api-gateway:購買サービスの API ゲートウェイ
・stock:在庫管理サービス
・config :Kubernetes にデプロイするための yaml ファイル群

また各サービスが使用するデータベースとして、Cloud Datastore を使用します。Cloud Console から「Datastore モード」を選択しておいてください。

商品購入サービスの作成

サブディレクトリー purchase 以下のコードをみます。

まず、proto/purchase/purchase.proto を見ると、次の API が定義されています。

	rpc CreateCart(CreateCartRequest) returns (CreateCartResponse) {} // ショッピングカートを作成する
	rpc GetCart(GetCartRequest) returns (GetCartResponse) {} // ショピングカートの状態を取得する
	rpc AddItem(AddItemRequest) returns (AddItemResponse) {} // ショッピングカートに商品を追加する
	rpc GetCartContents(GetCartContentsRequest) returns (GetCartContentsResponse) {} // ショッピングカート内の商品を取得する
	rpc CloseCart(CloseCartRequest) returns (CloseCartResponse) {} // ショピングカートを閉じる(購入する商品を確定する)
	rpc Checkout(CheckoutRequest) returns (CheckoutResponse) {} // チェックアウトする(購入リクエストを出す)
	rpc GetOrderTicket(GetOrderTicketRequest) returns (GetOrderTicketResponse) {} // 購入リクエストの状態を取得する

※ デモ用なのでリクエスト処理中のエラーハンドリングは適当です。(I/O 関係のエラーは、クライアントにエラーを返さずに、その場で log.Fatal() で終了します。)

これらのハンドラーは、main.go で登録されており、ハンドラーのエントリーポイントは handler/grpc_handler.go にあります。いずれも Datastore の情報を操作することが主な処理で、実際の内容は、handler/operations.go にあります。

この中で、特にトランザクションを用いている部分を説明します。1つ目はカートの状態を「open(商品追加可能)」から「closed(商品追加不可能)」に変更する部分です。「open」以外の状態からは「close」に遷移しないことを保証するために(そのような要件がある前提)、現在の状態の確認と closed への変更を1つのトランザクションとして実行しています。

func CloseCart(cartid string) *purchase.Cart {
	cartStructWithoutTx, ok := getCartStruct(cartid)
	if !ok {
		return &purchase.Cart{}
	}
	_, err := client.RunInTransaction(context.Background(),
		func(tx *datastore.Transaction) error {
			var cartStruct ds.Cart
			err := tx.Get(cartStructWithoutTx.Key, &cartStruct)
			if err != nil {
				return err
			}
			if cartStruct.Status != "open" {
				return nil // cannot close
			}
			cartStruct.Status = "closed"
			_, err = tx.Put(cartStruct.Key, &cartStruct)
			if err != nil {
				return err
			}
			return nil
		})
	if err != nil {
		log.Fatalf("Error stroing data: %v", err)
	}
	return GetCart(cartid)
}


同様に、カートに商品を追加する際は、カートの状態が「open」であることを保証するために、「open」であることの確認と、商品の追加を1つのトランザクションにしています。(handler/operations.go

・・・というあたりは、microservice とは直接関係ない感じですが、saga パターンで重要になるのが、イベント発行の部分です。今回は、チェックアウト処理の際に、「配送チケットをDBに保存する処理と、配送チケット情報のイベントを Publish する処理」をアトミックに実施するためにトランザクションを使用しています。(handler/operations.go)具体的には、カートの状態が「close」であることの確認、配送チケットのDB保存、イベントメッセージの配信用DB保存、の3つの処理を1つのトランザクションで実施しています。その後、配信用DBに保存されたメッセージを Publish する処理を別途実施しています。(events/utils.go)(ここでは、簡単のために同じコードから Publish 処理をしていますが、本来は、別のタスクがバックグラウンドで定期的にバッチ処理するものと考えてください。)

func PublishEvents() int {
	query := datastore.NewQuery(eventPublishTable).Filter("sent =", false)
	it := client.Run(context.Background(), query)

	i := 0
	for {
		var eventEntity EventEntity
		_, err := it.Next(&eventEntity)
		if err == iterator.Done {
			break
		}

		msg := &broker.Message{
			Header: map[string]string{
				"eventid": eventEntity.Eventid,
				"type":    eventEntity.Type,
			},
			Body: eventEntity.EventData,
		}
		err = brk.Publish(publishTopic, msg)
		if err != nil {
			log.Fatalf("Error publishing event: %v", err)
		}

		eventEntity.Sent = true
		_, err = client.Put(context.Background(), eventEntity.Key, &eventEntity)
		if err != nil {
			log.Fatalf("Error stroing data: %v", err)
		}
		i += 1
	}
	return i
}

また、このサービスは、他のサービスが発行するイベントを処理するハンドラーも持ちます。main.goで events パッケージをインポートしたタイミングで、(パッケージのinit() 関数により)指定 Topic へのサブスクライブとイベントハンドラーの登録が行われます。

var (
	publishTopic      = os.Getenv("EVENT_PUBLISH_TOPIC")
	subscribeTopics   = os.Getenv("EVENT_SUBSCRIBE_TOPICS")
	projectID         = os.Getenv("GOOGLE_CLOUD_PROJECT")
	eventPublishTable = "PurchasePublishEvent"
	eventRecordTable  = "PurchaseReceivedEvent"
	client, _         = datastore.NewClient(context.Background(), projectID)
	brk               broker.Broker
)

func init() {
	brk = googlepubsub.NewBroker(googlepubsub.ProjectID(projectID))
	err := brk.Connect()
	if err != nil {
		log.Fatalf("Broker Connect error: %v", err)
	}

	for _, subscribeTopic := range strings.Split(subscribeTopics, ",") {
		_, err := brk.Subscribe(subscribeTopic, eventHandler,
			broker.Queue("purchase-service"),
			broker.DisableAutoAck())
		if err != nil {
			log.Fatalf("Broker Subscribe error: %v", err)
		}
	}
}

イベントを Publish するトピック、サブスクライブするトピック(, 区切りで複数指定可能)は、環境変数で指定されています。なお、broker.Queue("purchase-service") の部分は、サブスクリプション名を指定しています。同一サービスを複数起動した場合は、同じサブスクリプションを共有するので、メッセージの分配が行われます。(逆に言うと、異なるサービスが同一のサブスクリプション名を使用すると、一方のサービスのみにメッセージが配信されてしまうので注意してください。)また、autoack を disable している点にも注意してください。イベントの処理が終わってから Ack することで、イベントの処理中に障害停止した場合でもイベントが失われません。

イベントハンドラーのエントリーポイント(events/handler.go)は、次のようになります。

func eventHandler(p broker.Event) error {
	header := p.Message().Header
	eventid := header["eventid"]
	eventType := header["type"]

	duplicate, err := isDuplicated(eventid)
	if err != nil {
		log.Fatalf("Error checking duplicated event: %v", err)
	}
	if duplicate {
		p.Ack()
		return nil
	}

	switch eventType {
	case "purchase.OrderTicket":
		var purchaseOrderTicket purchase.OrderTicket
		err := json.Unmarshal(p.Message().Body, &purchaseOrderTicket)
		if err != nil {
			log.Fatalf("Error unmarshalling eventt: %v", err)
		}
		log.Infof("Handle event purchase.OrderTicket: %v", purchaseOrderTicket)
		err = handlePurchaseOrderTicket(purchaseOrderTicket)
		if err != nil {
			log.Warnf("Failed to handle purchaseOrderTicke: %v", err)
		} else {
			recordEvent(eventid)
			p.Ack()
		}
	default:
		log.Infof("Unknown event type: %s", eventType)
		recordEvent(eventid)
		p.Ack()
	}
	return nil
}

イベント本体は json にシリアライズされているので、ヘッダー(header["type"])を見て、書き戻す構造体を判別しています。また、ユニークなイベント ID もヘッダー(header["eventid"])に含めてあり、処理済みのイベント ID を DB に記録することで重複排除を行っています。今回は、イベントの内容に応じてDBの更新も入るので、トランザクションを用いてDBの更新と新しいイベントの発行をアトミックに行っています。(イベントの発行は、一度 DB に書き出すという前述の作戦を用います。)

それでは、実際にサービスをデプロイします。Makefile の先頭にある PRJECT_ID を実際のプロジェクトIDに変更します。

PROJECT_ID=go-micro-test

この後は、make コマンドで Docker イメージの作成とプライベートレジストリへの Push まで自動で行えます。

make build
make docker

deployment の定義は、config 以下の deployment/purchase-service.yml です。次のように環境変数で、Project ID に加えて、Pub/Sub のトピックを指定しています。

          - name: GOOGLE_CLOUD_PROJECT
            value: "go-micro-test"
          - name: EVENT_PUBLISH_TOPIC
            value: "com.example.purchase"
          - name: EVENT_SUBSCRIBE_TOPICS
            value: "com.example.stock"

Cloud Console から com.example.purchase と com.example.stock の2つの topic を作成しておき、次のコマンドでクラスターにデプロイします。

cd $GOPATH/src/github.com/enakai00/go-micro-gcp-example
kubectl create -f config/deployment/purchase-service.yml 


なお。。。。本質ではありませんが、go.mod の以下のエントリーに注意してください。

replace google.golang.org/grpc => github.com/enakai00/grpc-go v1.99.0

パート1で、etcd と grpc のモジュールの非互換依存問題のために、grpc モジュールのバージョンを下げる必要があると言いましたが、実際にこれを下げてしまうと、datastore クライアントが動かなくなります(datastore クライアントは新しい grpc モジュールが必要なため)。仕方がないので、新しい grpc モジュールに、etcd との組み合わせ問題を回避する monkey patch を当てたものを自分の github で公開しておきました。上記は、これを使用するための指定になります。

在庫管理サービスの作成

こちらは、gRPC の API は省略して、イベントハンドラーだけを実装しています。状態が「未承認(pre-approved)」の配送チケットを受診すると、在庫をリザーブしたのちに状態を変更した配送チケットをイベント送信します。ここでは、在庫リザーブ処理は省略して、デフォルトで在庫確保状態(reserved)に変更します。(ただし、商品名が「yellow」のものがあった場合は「確保失敗(reserve-failed)」、あるいは、「red」のものがあった場合は(支払い処理をスキップして)「支払済(paid)」に変更します。)

サブディレクトリー stock 以下の Makefile の先頭(プロジェクトID)を先と同様に修正して、次の手順でビルドとデプロイを行います。

cd $GOPATH/src/github.com/enakai00/go-micro-gcp-example/stock
make build
make docker
cd $GOPATH/src/github.com/enakai00/go-micro-gcp-example
kubectl create -f config/deployment/stock-service.yml 

API ゲートウェイの作成

これは、Web API フレームワークの echo と組み合わせて作成しています。REST で受けたものをそのまま、gRPC の API に投げなおします。gRPC の API からは、proto ファイルから生成された構造体で結果が返るので、そのまま(echo の機能で json にシリアライズして)クライアントに返します。

サブディレクトリー stock 以下の Makefile の先頭(プロジェクトID)を先と同様に修正して、次の手順でビルドとデプロイを行います。

cd $GOPATH/src/github.com/enakai00/go-micro-gcp-example/api-gateway
make build
make docker
cd $GOPATH/src/github.com/enakai00/go-micro-gcp-example
kubectl create -f config/deployment/api-gateway-service.yml 

また、このサービスは外部に公開する必要があるので、service も定義します。

kubectl create -f config/service/api-gateway-service.yml 

ここでは簡易的に NodePort を使用しているので、サービスポートと Node の External IP を確認して、ファイアウォールを開きます。

$ kubectl get svc
NAME                  TYPE        CLUSTER-IP    EXTERNAL-IP   PORT(S)        AGE
api-gateway-service   NodePort    10.48.3.164   <none>        80:31022/TCP   102s
kubernetes            ClusterIP   10.48.0.1     <none>        443/TCP        2d8h

$ kubectl get nodes --output wide
NAME                                       STATUS   ROLES    AGE    VERSION          INTERNAL-IP   EXTERNAL-IP     OS-IMAGE           
                  KERNEL-VERSION   CONTAINER-RUNTIME
gke-cluster-2-default-pool-1f2a5d4a-16cz   Ready    <none>   2d8h   v1.15.12-gke.2   10.128.0.8    34.66.xxx.xxx    Container-Optimized
 OS from Google   4.19.112+        docker://19.3.1
...

この例であれば、ポートは 31022、External IP は 34.66.xxx.xxx になるので、31022 に対するアクセスを許可します。

$ gcloud compute firewall-rules create test-node-port --allow tcp:31022

curl コマンドで指定の IP:Port が反応することを確認します。

$ curl http://34.66.xxx.xxx:31022
api-gateway

実際にリクエストする様子は次のようになります。(jq コマンドは事前に「sudo apt install jq」で入れておいてください。)

$ curl -s -XPOST -H 'Content-Type: application/json' -d '{"cartid": "1234567"}' http://34.66.xxx.xxx:31022/purchase/api/create_cart | jq .
{
  "cart": {
    "cartid": "1234567",
    "status": "open"
  }
}

$ curl -s -XPOST -H 'Content-Type: application/json' -d '{"cartid": "1234567", "itemid": "blue", "count": 3}' http://34.66.xxx.xxx:31022/purchase/api/add_item | jq .
{
  "cart_items": [
    {
      "itemid": "blue",
      "count": 3
    }
  ]
}

$ curl -s -XPOST -H 'Content-Type: application/json' -d '{"cartid": "1234567", "itemid": "red", "count": 2}' http://34.66.xxx.xxx:31022/purchase/api/add_item | jq .
{
  "cart_items": [
    {
      "itemid": "blue",
      "count": 3
    },
    {
      "itemid": "red",
      "count": 2
    }
  ]
}

$ curl -s -XPOST -H 'Content-Type: application/json' -d '{"cartid": "1234567"}' http://34.66.xxx.xxx:31022/purchase/api/close_cart | jq .
{
  "cart": {
    "cartid": "1234567",
    "status": "closed"
  }
}

$ curl -s -XPOST -H 'Content-Type: application/json' -d '{"cartid": "1234567"}' http://34.66.xxx.xxx:31022/purchase/api/checkout | jq .
{
  "order_ticket": {
    "orderid": "a352044c-1939-4ff6-b22a-793a0dfe7d03",
    "status": "pre-approved",
    "cart_items": [
      {
        "itemid": "blue",
        "count": 3
      },
      {
        "itemid": "red",
        "count": 2
      }
    ]
  }
}

ここまでカートを作成して、商品を追加して、カートをクローズして、チェックアウトする、という流れです。チェックアウト直後の order_ticket は、"status": "pre-approved" である点に注意してください。

この後、裏側では、イベントのやり取りで処理が進んでいます。今回の場合、商品に "red" が含まれているので、在庫管理サービスは、チケットを「支払済(paid)」に変更します。このイベントを受けた購買サービスは、チケットを「承認済み(approved)」に変更して、DBのデータをアップデートします。実際に、チケットの状態を確認すると、"status": "approved" になっていることがわかります。

$ curl -s -XPOST -H 'Content-Type: application/json' -d '{"orderid": "a352044c-1939-4ff6-b22a-793a0dfe7d03"}' http://34.66.xxx.xxx:31022/purchase/api/get_order_ticket | jq .
{
  "order_ticket": {
    "orderid": "a352044c-1939-4ff6-b22a-793a0dfe7d03",
    "status": "approved",
    "cart_items": [
      {
        "itemid": "blue",
        "count": 3
      },
      {
        "itemid": "red",
        "count": 2
      }
    ]
  }
}

Building microservices with go-micro on GCP(パート2)

「パート1」はこちらです。

enakai00.hatenablog.com

GKE, Cloud Pub/Sub 対応サービスの作成

パート1で作成したサンプルサービスを GKE と Cloud Pub/Sub に対応するように修正します。

修正後の main.go は次のようになります。

package main

import (
        "fmt"
        "sample/handler"

        "github.com/micro/go-micro/v2"
        "github.com/micro/go-micro/v2/broker"
        log "github.com/micro/go-micro/v2/logger"

        sample "sample/proto/sample"

        "github.com/micro/go-plugins/broker/googlepubsub/v2"
        _ "github.com/micro/go-plugins/registry/kubernetes/v2"
)

var topic = "com.example.sample-topic"

func sub(brk broker.Broker) {
        // subscribe a topic with queue specified (default: autoack)
        _, err := brk.Subscribe(topic, func(p broker.Event) error {
                fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header)
                return nil
        }, broker.Queue(topic))
        if err != nil {
                fmt.Println(err)
        }
}

func main() {
        // New Service
        service := micro.NewService(
                micro.Name("com.example.service.sample"),
                micro.Version("latest"),
        )

        // Initialise service
        service.Init(micro.AfterStart(func() error {
                // Project ID should be specified with env: GOOGLEPUBSUB_PROJECT_ID
                brk := googlepubsub.NewBroker()
                if err := brk.Connect(); err != nil {
                        log.Fatalf("Broker Connect error: %v", err)
                }
                go sub(brk)
                return nil
        }))
        service.Init()

        // Register Handler
        sample.RegisterSampleHandler(service.Server(), new(handler.Sample))

        // Run service
        if err := service.Run(); err != nil {
                log.Fatal(err)
        }
}

import 文の最後の2つのモジュールが、Cloud Pub/Sub と GKE に対応するプラグインモジュールです。gRPC の同期 API サービスは、自動生成されたサンプルのままで、ここでは、Cloud Pub/Sub からメッセージを受け取って、その内容を表示するハンドラーを追加しています。関数 sub() がその部分になります。サブスクライブする Topic は、変数 topic で指定しています。

ここでは、これをビルドして、Docker イメージを作成しますが、少しだけ Known issue のワークアラウンドが必要です。まず、protobuf モジュールのバージョンを少し落とします。

go mod edit -require=google.golang.org/protobuf@v1.23.0

次にデフォルトで用意された Dockerfile を次のように修正します。(2行目の RUN apk add コマンドを追加します。)

Dockerfile

FROM alpine
RUN apk add --no-cache libc6-compat
ADD sample-service /sample-service
ENTRYPOINT [ "/sample-service" ]

これで OK です。make コマンドでビルドとイメージ作成を行います。

make build
make docker

次のようにイメージができています。

$ docker images
REPOSITORY          TAG                 IMAGE ID            CREATED             SIZE
sample-service      latest              7fd56f73c983        4 seconds ago       50.8MB
alpine              latest              a24bb4013296        2 months ago        5.57MB

GKE のクラスターから pull できるように、GCP 上のプライベートレジストリに push しておきます。

gcloud auth configure-docker
docker tag sample-service:latest gcr.io/go-micro-test/sample-service:latest
docker push gcr.io/go-micro-test/sample-service:latest

クライアントの作成

出来上がったイメージをデプロイするまえに、このサービスにアクセスするクライアントのイメージも作っておきます。まずは、以前と同様にテンプレートを生成します。

cd $GOPATH/src
micro new --namespace=com.example --gopath=false client
cd client

proto ファイルはサーバー側と同じものを使う必要があるので、コピーしてきます。

rm -rf proto
cp -a ../sample/proto ./

main.go を次の内容に書き換えます。

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/micro/go-micro/v2"
    "github.com/micro/go-micro/v2/broker"
    log "github.com/micro/go-micro/v2/logger"

    sample "client/proto/sample"

    "github.com/micro/go-plugins/broker/googlepubsub/v2"
    _ "github.com/micro/go-plugins/registry/kubernetes/v2"
)

var topic = "com.example.sample-topic"

func pub(brk broker.Broker) {
    i := 0
    for range time.Tick(time.Second * 5) {
        // build a message
        msg := &broker.Message{
            Header: map[string]string{
                "id": fmt.Sprintf("%d", i),
            },
            Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())),
        }
        // publish it
        if err := brk.Publish(topic, msg); err != nil {
            log.Fatalf("[pub] failed: %v", err)
        } else {
            fmt.Println("[pub] pubbed message:", string(msg.Body))
        }
        i++
    }
}

func request(sampleClient sample.SampleService) {
    i := 0
    for range time.Tick(time.Second * 5) {
        //invoke sample service method
        resp, err := sampleClient.Call(context.TODO(),
            &sample.Request{Name: fmt.Sprintf(" world! : %d", i)})
        if err != nil {
            fmt.Printf("request failed: %v\n", err)
        } else {
            fmt.Println(resp.Msg)
        }
        i++
    }
}

func main() {
    // New Service
    service := micro.NewService(
        micro.Name("com.example.service.sample.client"), //name the client service
    )
    // Initialise service
    service.Init()
    brk := googlepubsub.NewBroker() // Project ID: GOOGLEPUBSUB_PROJECT_ID
    if err := brk.Connect(); err != nil {
        log.Fatalf("Broker Connect error: %v", err)
    }
    go pub(brk)
    sampleClient := sample.NewSampleService("com.example.service.sample", service.Client())
    go request(sampleClient)
    select {} // block forever
}

これは、5秒ごとに、gRPC の API リクエスト送信、および、イベントメッセージの発行を繰り返します。

Makefile 内の proto ファイルの指定を修正します。

.PHONY: proto
proto:
    
        protoc --proto_path=. --micro_out=${MODIFY}:. --go_out=${MODIFY}:. proto/sample/sample.proto

先ほどと同じく、Know issue のワークアラウンドを入れます。

go mod edit -require=google.golang.org/protobuf@v1.23.0

Dockerfile

FROM alpine
RUN apk add --no-cache libc6-compat
ADD client-service /client-service
ENTRYPOINT [ "/client-service" ]

これで準備完了です。イメージをビルドして、プライベートレジストリに push しておきます。

make build
make docker
docker tag client-service:latest gcr.io/go-micro-test/client-service:latest
docker push gcr.io/go-micro-test/client-service:latest

クライアントのデプロイ

いよいよ出来上がったイメージを GKE クラスターにデプロイします。

Cloud Shell 以外の開発サーバーを使用している場合は、kubectl コマンドをインストールしたのちに、gcloud コマンドの実行アカウントを Project owner の個人アカウントに切り替えておきます。(この後の RBAC の設定には owner 権限が必要になるため。)

sudo apt-get install kubectl
gcloud auth login

クラスタにアクセスするためのクレデンシャルを取得します。

gcloud container clusters get-credentials cluster-1 --zone us-central1-c --project go-micro-test

Service Registory 機能のプラグインが Pod のメタデータを操作できるように、RBAC で権限を付与します。次の yaml ファイルを用意して、リソースを定義します。

rbac.yml

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: micro-registry
rules:
- apiGroups:
  - ""
  resources:
  - pods
  verbs:
  - list
  - patch
  - watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: micro-registry
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: micro-registry
subjects:
- kind: ServiceAccount
  name: default
  namespace: default
kubectl create -f rbac.yml 


まずはじめに、クライアントの Pod をデプロイします。次の yaml ファイルを利用します。環境変数 GOOGLEPUBSUB_PROJECT_ID は、Cloud Pub/Sub 用のプラグインが使用します。MICRO_REGISTRY は、Service Discovery に Kubernetes 用のプラグインを使用するための指定です。

client-service.yml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: client-service
spec:
  replicas: 1
  selector:
    matchLabels:
      app: client-service
  template:
    metadata:
      labels:
        app: client-service
    spec:
        containers:
        - name: client-service
          image: gcr.io/go-micro-test/client-service:latest
          imagePullPolicy: Always
          env:
          - name: GOOGLEPUBSUB_PROJECT_ID
            value: "go-micro-test"
          - name: MICRO_REGISTRY
            value: "kubernetes"
$ kubectl create -f client-service.yml 
deployment.apps/client-service created

$ kubectl get pods
NAME                              READY   STATUS    RESTARTS   AGE
client-service-58fc4b64f8-zm6kg   1/1     Running   0          3s

$ kubectl logs client-service-58fc4b64f8-zm6kg
request failed: {"id":"go.micro.client","code":500,"detail":"service com.example.service.sample: not found","status":"Internal Server Error"}
[pub] pubbed message: 0: 2020-08-06 22:28:03.664818352 +0000 UTC m=+5.032804253
[pub] pubbed message: 1: 2020-08-06 22:28:08.664661524 +0000 UTC m=+10.032647430
request failed: {"id":"go.micro.client","code":500,"detail":"service com.example.service.sample: not found","status":"Internal Server Error"}
request failed: {"id":"go.micro.client","code":500,"detail":"service com.example.service.sample: not found","status":"Internal Server Error"}
[pub] pubbed message: 2: 2020-08-06 22:28:13.664699377 +0000 UTC m=+15.032685317
request failed: {"id":"go.micro.client","code":500,"detail":"service com.example.service.sample: not found","status":"Internal Server Error"}
[pub] pubbed message: 3: 2020-08-06 22:28:18.664639436 +0000 UTC m=+20.032625348
[pub] pubbed message: 4: 2020-08-06 22:28:23.664834003 +0000 UTC m=+25.032819937
request failed: {"id":"go.micro.client","code":500,"detail":"service com.example.service.sample: not found","status":"Internal Server Error"}

ログを見ると分かるように、定期的に API リクエストとメッセージ送信を行っています。(今はまだサーバーがないので、API リクエストは失敗しています。)

また、このタイミングで、Cloud Pub/Sub の topic が自動で作成されます。

$ gcloud pubsub topics list
...
name: projects/go-micro-test/topics/com.example.sample-topic

サービスのデプロイ

続いて、サービスをデプロイします。次の yaml ファイルを利用します。ここでは、環境変数 MICRO_SERVER_ADDRESS で Listening port を 8080 に固定しています。

sample-service.yml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: sample-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: sample-service
  template:
    metadata:
      labels:
        app: sample-service
    spec:
        containers:
        - name: sample-service
          image: gcr.io/go-micro-test/sample-service:latest
          imagePullPolicy: Always
          env:
          - name: GOOGLEPUBSUB_PROJECT_ID
            value: "go-micro-test"
          - name: MICRO_REGISTRY
            value: "kubernetes"
          - name: MICRO_SERVER_ADDRESS
            value: "0.0.0.0:8080"
          ports:
          - containerPort: 8080
            name: service-port

また、負荷分散機能を確認するために「replicas: 3」を指定しています。Cloud Pub/Sub のメッセージについては、各サーバープロセスが Subscription を共有することで、Cloud Pub/Sub 側の機能によるメッセージ配信の分散が行われます。gRPC リクエストについては、go-micro の機能によりクライアント側で分散します。(Service Discovery で発見したサーバーに対して、ランダムハッシュによる送信先の分散を行います。)

$ kubectl create -f sample-service.yml 
deployment.apps/sample-service created

$ kubectl get pods
NAME                              READY   STATUS    RESTARTS   AGE
client-service-58fc4b64f8-zm6kg   1/1     Running   0          19m
sample-service-7b48ccdc49-4zk8w   1/1     Running   0          18m
sample-service-7b48ccdc49-rwkd9   1/1     Running   0          18m
sample-service-7b48ccdc49-z5drz   1/1     Running   0          18m

クライアントのログを確認すると、gRPC リクエストに対する応答が得られています。

$ kubectl logs client-service-58fc4b64f8-zm6kg | tail -6
Hello  world! : 24
[pub] pubbed message: 24: 2020-08-06 22:30:03.664689398 +0000 UTC m=+125.032675329
Hello  world! : 25
[pub] pubbed message: 25: 2020-08-06 22:30:08.664661393 +0000 UTC m=+130.032647340
Hello  world! : 26
[pub] pubbed message: 26: 2020-08-06 22:30:13.664647293 +0000 UTC m=+135.032633230

サーバー側のログを確認すると負荷分散の様子がわかります。

$ kubectl logs sample-service-7b48ccdc49-4zk8w | tail -10
[sub] received message: 90: 2020-08-06 22:35:33.664661081 +0000 UTC m=+455.032647033 header map[id:90]
[sub] received message: 93: 2020-08-06 22:35:48.664685902 +0000 UTC m=+470.032671836 header map[id:93]
2020-08-06 22:36:03  file=handler/sample.go:15 level=info Received Sample.Call request
[sub] received message: 96: 2020-08-06 22:36:03.664726242 +0000 UTC m=+485.032712129 header map[id:96]
2020-08-06 22:36:08  file=handler/sample.go:15 level=info Received Sample.Call request
2020-08-06 22:36:13  file=handler/sample.go:15 level=info Received Sample.Call request
2020-08-06 22:36:18  file=handler/sample.go:15 level=info Received Sample.Call request
[sub] received message: 99: 2020-08-06 22:36:18.664686962 +0000 UTC m=+500.032672850 header map[id:99]
[sub] received message: 102: 2020-08-06 22:36:33.664669789 +0000 UTC m=+515.032655723 header map[id:102]
2020-08-06 22:36:38  file=handler/sample.go:15 level=info Received Sample.Call request

Cloud Pub/Sub のメッセージについては、ラウンドロビンで、正確に 3 回ごとに 1 回だけメッセージを受信しています。gRPC リクエストについては、ランダムに分散されています(仮にラウンドロビンであれば、15秒ごとに受信するはずです)。

本番利用における注意

下記のエントリーの「おまけ:Saga パターンの耐障害性について」を参照

enakai00.hatenablog.com


次のパート3では、より実践的なサービスを作ります。

enakai00.hatenablog.com