「パート1」はこちらです。
GKE, Cloud Pub/Sub 対応サービスの作成
パート1で作成したサンプルサービスを GKE と Cloud Pub/Sub に対応するように修正します。
修正後の main.go は次のようになります。
package main import ( "fmt" "sample/handler" "github.com/micro/go-micro/v2" "github.com/micro/go-micro/v2/broker" log "github.com/micro/go-micro/v2/logger" sample "sample/proto/sample" "github.com/micro/go-plugins/broker/googlepubsub/v2" _ "github.com/micro/go-plugins/registry/kubernetes/v2" ) var topic = "com.example.sample-topic" func sub(brk broker.Broker) { // subscribe a topic with queue specified (default: autoack) _, err := brk.Subscribe(topic, func(p broker.Event) error { fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header) return nil }, broker.Queue(topic)) if err != nil { fmt.Println(err) } } func main() { // New Service service := micro.NewService( micro.Name("com.example.service.sample"), micro.Version("latest"), ) // Initialise service service.Init(micro.AfterStart(func() error { // Project ID should be specified with env: GOOGLEPUBSUB_PROJECT_ID brk := googlepubsub.NewBroker() if err := brk.Connect(); err != nil { log.Fatalf("Broker Connect error: %v", err) } go sub(brk) return nil })) service.Init() // Register Handler sample.RegisterSampleHandler(service.Server(), new(handler.Sample)) // Run service if err := service.Run(); err != nil { log.Fatal(err) } }
import 文の最後の2つのモジュールが、Cloud Pub/Sub と GKE に対応するプラグインモジュールです。gRPC の同期 API サービスは、自動生成されたサンプルのままで、ここでは、Cloud Pub/Sub からメッセージを受け取って、その内容を表示するハンドラーを追加しています。関数 sub() がその部分になります。サブスクライブする Topic は、変数 topic で指定しています。
ここでは、これをビルドして、Docker イメージを作成しますが、少しだけ Known issue のワークアラウンドが必要です。まず、protobuf モジュールのバージョンを少し落とします。
go mod edit -require=google.golang.org/protobuf@v1.23.0
次にデフォルトで用意された Dockerfile を次のように修正します。(2行目の RUN apk add コマンドを追加します。)
Dockerfile
FROM alpine RUN apk add --no-cache libc6-compat ADD sample-service /sample-service ENTRYPOINT [ "/sample-service" ]
これで OK です。make コマンドでビルドとイメージ作成を行います。
make build make docker
次のようにイメージができています。
$ docker images REPOSITORY TAG IMAGE ID CREATED SIZE sample-service latest 7fd56f73c983 4 seconds ago 50.8MB alpine latest a24bb4013296 2 months ago 5.57MB
GKE のクラスターから pull できるように、GCP 上のプライベートレジストリに push しておきます。
gcloud auth configure-docker docker tag sample-service:latest gcr.io/go-micro-test/sample-service:latest docker push gcr.io/go-micro-test/sample-service:latest
クライアントの作成
出来上がったイメージをデプロイするまえに、このサービスにアクセスするクライアントのイメージも作っておきます。まずは、以前と同様にテンプレートを生成します。
cd $GOPATH/src micro new --namespace=com.example --gopath=false client cd client
proto ファイルはサーバー側と同じものを使う必要があるので、コピーしてきます。
rm -rf proto cp -a ../sample/proto ./
main.go を次の内容に書き換えます。
package main import ( "context" "fmt" "time" "github.com/micro/go-micro/v2" "github.com/micro/go-micro/v2/broker" log "github.com/micro/go-micro/v2/logger" sample "client/proto/sample" "github.com/micro/go-plugins/broker/googlepubsub/v2" _ "github.com/micro/go-plugins/registry/kubernetes/v2" ) var topic = "com.example.sample-topic" func pub(brk broker.Broker) { i := 0 for range time.Tick(time.Second * 5) { // build a message msg := &broker.Message{ Header: map[string]string{ "id": fmt.Sprintf("%d", i), }, Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())), } // publish it if err := brk.Publish(topic, msg); err != nil { log.Fatalf("[pub] failed: %v", err) } else { fmt.Println("[pub] pubbed message:", string(msg.Body)) } i++ } } func request(sampleClient sample.SampleService) { i := 0 for range time.Tick(time.Second * 5) { //invoke sample service method resp, err := sampleClient.Call(context.TODO(), &sample.Request{Name: fmt.Sprintf(" world! : %d", i)}) if err != nil { fmt.Printf("request failed: %v\n", err) } else { fmt.Println(resp.Msg) } i++ } } func main() { // New Service service := micro.NewService( micro.Name("com.example.service.sample.client"), //name the client service ) // Initialise service service.Init() brk := googlepubsub.NewBroker() // Project ID: GOOGLEPUBSUB_PROJECT_ID if err := brk.Connect(); err != nil { log.Fatalf("Broker Connect error: %v", err) } go pub(brk) sampleClient := sample.NewSampleService("com.example.service.sample", service.Client()) go request(sampleClient) select {} // block forever }
これは、5秒ごとに、gRPC の API リクエスト送信、および、イベントメッセージの発行を繰り返します。
Makefile 内の proto ファイルの指定を修正します。
.PHONY: proto proto: protoc --proto_path=. --micro_out=${MODIFY}:. --go_out=${MODIFY}:. proto/sample/sample.proto
先ほどと同じく、Know issue のワークアラウンドを入れます。
go mod edit -require=google.golang.org/protobuf@v1.23.0
Dockerfile
FROM alpine RUN apk add --no-cache libc6-compat ADD client-service /client-service ENTRYPOINT [ "/client-service" ]
これで準備完了です。イメージをビルドして、プライベートレジストリに push しておきます。
make build make docker docker tag client-service:latest gcr.io/go-micro-test/client-service:latest docker push gcr.io/go-micro-test/client-service:latest
クライアントのデプロイ
いよいよ出来上がったイメージを GKE クラスターにデプロイします。
Cloud Shell 以外の開発サーバーを使用している場合は、kubectl コマンドをインストールしたのちに、gcloud コマンドの実行アカウントを Project owner の個人アカウントに切り替えておきます。(この後の RBAC の設定には owner 権限が必要になるため。)
sudo apt-get install kubectl gcloud auth login
クラスタにアクセスするためのクレデンシャルを取得します。
gcloud container clusters get-credentials cluster-1 --zone us-central1-c --project go-micro-test
Service Registory 機能のプラグインが Pod のメタデータを操作できるように、RBAC で権限を付与します。次の yaml ファイルを用意して、リソースを定義します。
rbac.yml
apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: micro-registry rules: - apiGroups: - "" resources: - pods verbs: - list - patch - watch --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: micro-registry roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole name: micro-registry subjects: - kind: ServiceAccount name: default namespace: default
kubectl create -f rbac.yml
まずはじめに、クライアントの Pod をデプロイします。次の yaml ファイルを利用します。環境変数 GOOGLEPUBSUB_PROJECT_ID は、Cloud Pub/Sub 用のプラグインが使用します。MICRO_REGISTRY は、Service Discovery に Kubernetes 用のプラグインを使用するための指定です。
client-service.yml
apiVersion: apps/v1 kind: Deployment metadata: name: client-service spec: replicas: 1 selector: matchLabels: app: client-service template: metadata: labels: app: client-service spec: containers: - name: client-service image: gcr.io/go-micro-test/client-service:latest imagePullPolicy: Always env: - name: GOOGLEPUBSUB_PROJECT_ID value: "go-micro-test" - name: MICRO_REGISTRY value: "kubernetes"
$ kubectl create -f client-service.yml deployment.apps/client-service created $ kubectl get pods NAME READY STATUS RESTARTS AGE client-service-58fc4b64f8-zm6kg 1/1 Running 0 3s $ kubectl logs client-service-58fc4b64f8-zm6kg request failed: {"id":"go.micro.client","code":500,"detail":"service com.example.service.sample: not found","status":"Internal Server Error"} [pub] pubbed message: 0: 2020-08-06 22:28:03.664818352 +0000 UTC m=+5.032804253 [pub] pubbed message: 1: 2020-08-06 22:28:08.664661524 +0000 UTC m=+10.032647430 request failed: {"id":"go.micro.client","code":500,"detail":"service com.example.service.sample: not found","status":"Internal Server Error"} request failed: {"id":"go.micro.client","code":500,"detail":"service com.example.service.sample: not found","status":"Internal Server Error"} [pub] pubbed message: 2: 2020-08-06 22:28:13.664699377 +0000 UTC m=+15.032685317 request failed: {"id":"go.micro.client","code":500,"detail":"service com.example.service.sample: not found","status":"Internal Server Error"} [pub] pubbed message: 3: 2020-08-06 22:28:18.664639436 +0000 UTC m=+20.032625348 [pub] pubbed message: 4: 2020-08-06 22:28:23.664834003 +0000 UTC m=+25.032819937 request failed: {"id":"go.micro.client","code":500,"detail":"service com.example.service.sample: not found","status":"Internal Server Error"}
ログを見ると分かるように、定期的に API リクエストとメッセージ送信を行っています。(今はまだサーバーがないので、API リクエストは失敗しています。)
また、このタイミングで、Cloud Pub/Sub の topic が自動で作成されます。
$ gcloud pubsub topics list ... name: projects/go-micro-test/topics/com.example.sample-topic
サービスのデプロイ
続いて、サービスをデプロイします。次の yaml ファイルを利用します。ここでは、環境変数 MICRO_SERVER_ADDRESS で Listening port を 8080 に固定しています。
sample-service.yml
apiVersion: apps/v1 kind: Deployment metadata: name: sample-service spec: replicas: 3 selector: matchLabels: app: sample-service template: metadata: labels: app: sample-service spec: containers: - name: sample-service image: gcr.io/go-micro-test/sample-service:latest imagePullPolicy: Always env: - name: GOOGLEPUBSUB_PROJECT_ID value: "go-micro-test" - name: MICRO_REGISTRY value: "kubernetes" - name: MICRO_SERVER_ADDRESS value: "0.0.0.0:8080" ports: - containerPort: 8080 name: service-port
また、負荷分散機能を確認するために「replicas: 3」を指定しています。Cloud Pub/Sub のメッセージについては、各サーバープロセスが Subscription を共有することで、Cloud Pub/Sub 側の機能によるメッセージ配信の分散が行われます。gRPC リクエストについては、go-micro の機能によりクライアント側で分散します。(Service Discovery で発見したサーバーに対して、ランダムハッシュによる送信先の分散を行います。)
$ kubectl create -f sample-service.yml deployment.apps/sample-service created $ kubectl get pods NAME READY STATUS RESTARTS AGE client-service-58fc4b64f8-zm6kg 1/1 Running 0 19m sample-service-7b48ccdc49-4zk8w 1/1 Running 0 18m sample-service-7b48ccdc49-rwkd9 1/1 Running 0 18m sample-service-7b48ccdc49-z5drz 1/1 Running 0 18m
クライアントのログを確認すると、gRPC リクエストに対する応答が得られています。
$ kubectl logs client-service-58fc4b64f8-zm6kg | tail -6 Hello world! : 24 [pub] pubbed message: 24: 2020-08-06 22:30:03.664689398 +0000 UTC m=+125.032675329 Hello world! : 25 [pub] pubbed message: 25: 2020-08-06 22:30:08.664661393 +0000 UTC m=+130.032647340 Hello world! : 26 [pub] pubbed message: 26: 2020-08-06 22:30:13.664647293 +0000 UTC m=+135.032633230
サーバー側のログを確認すると負荷分散の様子がわかります。
$ kubectl logs sample-service-7b48ccdc49-4zk8w | tail -10 [sub] received message: 90: 2020-08-06 22:35:33.664661081 +0000 UTC m=+455.032647033 header map[id:90] [sub] received message: 93: 2020-08-06 22:35:48.664685902 +0000 UTC m=+470.032671836 header map[id:93] 2020-08-06 22:36:03 file=handler/sample.go:15 level=info Received Sample.Call request [sub] received message: 96: 2020-08-06 22:36:03.664726242 +0000 UTC m=+485.032712129 header map[id:96] 2020-08-06 22:36:08 file=handler/sample.go:15 level=info Received Sample.Call request 2020-08-06 22:36:13 file=handler/sample.go:15 level=info Received Sample.Call request 2020-08-06 22:36:18 file=handler/sample.go:15 level=info Received Sample.Call request [sub] received message: 99: 2020-08-06 22:36:18.664686962 +0000 UTC m=+500.032672850 header map[id:99] [sub] received message: 102: 2020-08-06 22:36:33.664669789 +0000 UTC m=+515.032655723 header map[id:102] 2020-08-06 22:36:38 file=handler/sample.go:15 level=info Received Sample.Call request
Cloud Pub/Sub のメッセージについては、ラウンドロビンで、正確に 3 回ごとに 1 回だけメッセージを受信しています。gRPC リクエストについては、ランダムに分散されています(仮にラウンドロビンであれば、15秒ごとに受信するはずです)。
本番利用における注意
下記のエントリーの「おまけ:Saga パターンの耐障害性について」を参照
次のパート3では、より実践的なサービスを作ります。