めもめも

このブログに記載の内容は個人の見解であり、必ずしも所属組織の立場、戦略、意見を代表するものではありません。

Building microservices with go-micro on GCP(パート2)

「パート1」はこちらです。

enakai00.hatenablog.com

GKE, Cloud Pub/Sub 対応サービスの作成

パート1で作成したサンプルサービスを GKE と Cloud Pub/Sub に対応するように修正します。

修正後の main.go は次のようになります。

package main

import (
        "fmt"
        "sample/handler"

        "github.com/micro/go-micro/v2"
        "github.com/micro/go-micro/v2/broker"
        log "github.com/micro/go-micro/v2/logger"

        sample "sample/proto/sample"

        "github.com/micro/go-plugins/broker/googlepubsub/v2"
        _ "github.com/micro/go-plugins/registry/kubernetes/v2"
)

var topic = "com.example.sample-topic"

func sub(brk broker.Broker) {
        // subscribe a topic with queue specified (default: autoack)
        _, err := brk.Subscribe(topic, func(p broker.Event) error {
                fmt.Println("[sub] received message:", string(p.Message().Body), "header", p.Message().Header)
                return nil
        }, broker.Queue(topic))
        if err != nil {
                fmt.Println(err)
        }
}

func main() {
        // New Service
        service := micro.NewService(
                micro.Name("com.example.service.sample"),
                micro.Version("latest"),
        )

        // Initialise service
        service.Init(micro.AfterStart(func() error {
                // Project ID should be specified with env: GOOGLEPUBSUB_PROJECT_ID
                brk := googlepubsub.NewBroker()
                if err := brk.Connect(); err != nil {
                        log.Fatalf("Broker Connect error: %v", err)
                }
                go sub(brk)
                return nil
        }))
        service.Init()

        // Register Handler
        sample.RegisterSampleHandler(service.Server(), new(handler.Sample))

        // Run service
        if err := service.Run(); err != nil {
                log.Fatal(err)
        }
}

import 文の最後の2つのモジュールが、Cloud Pub/Sub と GKE に対応するプラグインモジュールです。gRPC の同期 API サービスは、自動生成されたサンプルのままで、ここでは、Cloud Pub/Sub からメッセージを受け取って、その内容を表示するハンドラーを追加しています。関数 sub() がその部分になります。サブスクライブする Topic は、変数 topic で指定しています。

ここでは、これをビルドして、Docker イメージを作成しますが、少しだけ Known issue のワークアラウンドが必要です。まず、protobuf モジュールのバージョンを少し落とします。

go mod edit -require=google.golang.org/protobuf@v1.23.0

次にデフォルトで用意された Dockerfile を次のように修正します。(2行目の RUN apk add コマンドを追加します。)

Dockerfile

FROM alpine
RUN apk add --no-cache libc6-compat
ADD sample-service /sample-service
ENTRYPOINT [ "/sample-service" ]

これで OK です。make コマンドでビルドとイメージ作成を行います。

make build
make docker

次のようにイメージができています。

$ docker images
REPOSITORY          TAG                 IMAGE ID            CREATED             SIZE
sample-service      latest              7fd56f73c983        4 seconds ago       50.8MB
alpine              latest              a24bb4013296        2 months ago        5.57MB

GKE のクラスターから pull できるように、GCP 上のプライベートレジストリに push しておきます。

gcloud auth configure-docker
docker tag sample-service:latest gcr.io/go-micro-test/sample-service:latest
docker push gcr.io/go-micro-test/sample-service:latest

クライアントの作成

出来上がったイメージをデプロイするまえに、このサービスにアクセスするクライアントのイメージも作っておきます。まずは、以前と同様にテンプレートを生成します。

cd $GOPATH/src
micro new --namespace=com.example --gopath=false client
cd client

proto ファイルはサーバー側と同じものを使う必要があるので、コピーしてきます。

rm -rf proto
cp -a ../sample/proto ./

main.go を次の内容に書き換えます。

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/micro/go-micro/v2"
    "github.com/micro/go-micro/v2/broker"
    log "github.com/micro/go-micro/v2/logger"

    sample "client/proto/sample"

    "github.com/micro/go-plugins/broker/googlepubsub/v2"
    _ "github.com/micro/go-plugins/registry/kubernetes/v2"
)

var topic = "com.example.sample-topic"

func pub(brk broker.Broker) {
    i := 0
    for range time.Tick(time.Second * 5) {
        // build a message
        msg := &broker.Message{
            Header: map[string]string{
                "id": fmt.Sprintf("%d", i),
            },
            Body: []byte(fmt.Sprintf("%d: %s", i, time.Now().String())),
        }
        // publish it
        if err := brk.Publish(topic, msg); err != nil {
            log.Fatalf("[pub] failed: %v", err)
        } else {
            fmt.Println("[pub] pubbed message:", string(msg.Body))
        }
        i++
    }
}

func request(sampleClient sample.SampleService) {
    i := 0
    for range time.Tick(time.Second * 5) {
        //invoke sample service method
        resp, err := sampleClient.Call(context.TODO(),
            &sample.Request{Name: fmt.Sprintf(" world! : %d", i)})
        if err != nil {
            fmt.Printf("request failed: %v\n", err)
        } else {
            fmt.Println(resp.Msg)
        }
        i++
    }
}

func main() {
    // New Service
    service := micro.NewService(
        micro.Name("com.example.service.sample.client"), //name the client service
    )
    // Initialise service
    service.Init()
    brk := googlepubsub.NewBroker() // Project ID: GOOGLEPUBSUB_PROJECT_ID
    if err := brk.Connect(); err != nil {
        log.Fatalf("Broker Connect error: %v", err)
    }
    go pub(brk)
    sampleClient := sample.NewSampleService("com.example.service.sample", service.Client())
    go request(sampleClient)
    select {} // block forever
}

これは、5秒ごとに、gRPC の API リクエスト送信、および、イベントメッセージの発行を繰り返します。

Makefile 内の proto ファイルの指定を修正します。

.PHONY: proto
proto:
    
        protoc --proto_path=. --micro_out=${MODIFY}:. --go_out=${MODIFY}:. proto/sample/sample.proto

先ほどと同じく、Know issue のワークアラウンドを入れます。

go mod edit -require=google.golang.org/protobuf@v1.23.0

Dockerfile

FROM alpine
RUN apk add --no-cache libc6-compat
ADD client-service /client-service
ENTRYPOINT [ "/client-service" ]

これで準備完了です。イメージをビルドして、プライベートレジストリに push しておきます。

make build
make docker
docker tag client-service:latest gcr.io/go-micro-test/client-service:latest
docker push gcr.io/go-micro-test/client-service:latest

クライアントのデプロイ

いよいよ出来上がったイメージを GKE クラスターにデプロイします。

Cloud Shell 以外の開発サーバーを使用している場合は、kubectl コマンドをインストールしたのちに、gcloud コマンドの実行アカウントを Project owner の個人アカウントに切り替えておきます。(この後の RBAC の設定には owner 権限が必要になるため。)

sudo apt-get install kubectl
gcloud auth login

クラスタにアクセスするためのクレデンシャルを取得します。

gcloud container clusters get-credentials cluster-1 --zone us-central1-c --project go-micro-test

Service Registory 機能のプラグインが Pod のメタデータを操作できるように、RBAC で権限を付与します。次の yaml ファイルを用意して、リソースを定義します。

rbac.yml

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: micro-registry
rules:
- apiGroups:
  - ""
  resources:
  - pods
  verbs:
  - list
  - patch
  - watch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: micro-registry
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: micro-registry
subjects:
- kind: ServiceAccount
  name: default
  namespace: default
kubectl create -f rbac.yml 


まずはじめに、クライアントの Pod をデプロイします。次の yaml ファイルを利用します。環境変数 GOOGLEPUBSUB_PROJECT_ID は、Cloud Pub/Sub 用のプラグインが使用します。MICRO_REGISTRY は、Service Discovery に Kubernetes 用のプラグインを使用するための指定です。

client-service.yml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: client-service
spec:
  replicas: 1
  selector:
    matchLabels:
      app: client-service
  template:
    metadata:
      labels:
        app: client-service
    spec:
        containers:
        - name: client-service
          image: gcr.io/go-micro-test/client-service:latest
          imagePullPolicy: Always
          env:
          - name: GOOGLEPUBSUB_PROJECT_ID
            value: "go-micro-test"
          - name: MICRO_REGISTRY
            value: "kubernetes"
$ kubectl create -f client-service.yml 
deployment.apps/client-service created

$ kubectl get pods
NAME                              READY   STATUS    RESTARTS   AGE
client-service-58fc4b64f8-zm6kg   1/1     Running   0          3s

$ kubectl logs client-service-58fc4b64f8-zm6kg
request failed: {"id":"go.micro.client","code":500,"detail":"service com.example.service.sample: not found","status":"Internal Server Error"}
[pub] pubbed message: 0: 2020-08-06 22:28:03.664818352 +0000 UTC m=+5.032804253
[pub] pubbed message: 1: 2020-08-06 22:28:08.664661524 +0000 UTC m=+10.032647430
request failed: {"id":"go.micro.client","code":500,"detail":"service com.example.service.sample: not found","status":"Internal Server Error"}
request failed: {"id":"go.micro.client","code":500,"detail":"service com.example.service.sample: not found","status":"Internal Server Error"}
[pub] pubbed message: 2: 2020-08-06 22:28:13.664699377 +0000 UTC m=+15.032685317
request failed: {"id":"go.micro.client","code":500,"detail":"service com.example.service.sample: not found","status":"Internal Server Error"}
[pub] pubbed message: 3: 2020-08-06 22:28:18.664639436 +0000 UTC m=+20.032625348
[pub] pubbed message: 4: 2020-08-06 22:28:23.664834003 +0000 UTC m=+25.032819937
request failed: {"id":"go.micro.client","code":500,"detail":"service com.example.service.sample: not found","status":"Internal Server Error"}

ログを見ると分かるように、定期的に API リクエストとメッセージ送信を行っています。(今はまだサーバーがないので、API リクエストは失敗しています。)

また、このタイミングで、Cloud Pub/Sub の topic が自動で作成されます。

$ gcloud pubsub topics list
...
name: projects/go-micro-test/topics/com.example.sample-topic

サービスのデプロイ

続いて、サービスをデプロイします。次の yaml ファイルを利用します。ここでは、環境変数 MICRO_SERVER_ADDRESS で Listening port を 8080 に固定しています。

sample-service.yml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: sample-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: sample-service
  template:
    metadata:
      labels:
        app: sample-service
    spec:
        containers:
        - name: sample-service
          image: gcr.io/go-micro-test/sample-service:latest
          imagePullPolicy: Always
          env:
          - name: GOOGLEPUBSUB_PROJECT_ID
            value: "go-micro-test"
          - name: MICRO_REGISTRY
            value: "kubernetes"
          - name: MICRO_SERVER_ADDRESS
            value: "0.0.0.0:8080"
          ports:
          - containerPort: 8080
            name: service-port

また、負荷分散機能を確認するために「replicas: 3」を指定しています。Cloud Pub/Sub のメッセージについては、各サーバープロセスが Subscription を共有することで、Cloud Pub/Sub 側の機能によるメッセージ配信の分散が行われます。gRPC リクエストについては、go-micro の機能によりクライアント側で分散します。(Service Discovery で発見したサーバーに対して、ランダムハッシュによる送信先の分散を行います。)

$ kubectl create -f sample-service.yml 
deployment.apps/sample-service created

$ kubectl get pods
NAME                              READY   STATUS    RESTARTS   AGE
client-service-58fc4b64f8-zm6kg   1/1     Running   0          19m
sample-service-7b48ccdc49-4zk8w   1/1     Running   0          18m
sample-service-7b48ccdc49-rwkd9   1/1     Running   0          18m
sample-service-7b48ccdc49-z5drz   1/1     Running   0          18m

クライアントのログを確認すると、gRPC リクエストに対する応答が得られています。

$ kubectl logs client-service-58fc4b64f8-zm6kg | tail -6
Hello  world! : 24
[pub] pubbed message: 24: 2020-08-06 22:30:03.664689398 +0000 UTC m=+125.032675329
Hello  world! : 25
[pub] pubbed message: 25: 2020-08-06 22:30:08.664661393 +0000 UTC m=+130.032647340
Hello  world! : 26
[pub] pubbed message: 26: 2020-08-06 22:30:13.664647293 +0000 UTC m=+135.032633230

サーバー側のログを確認すると負荷分散の様子がわかります。

$ kubectl logs sample-service-7b48ccdc49-4zk8w | tail -10
[sub] received message: 90: 2020-08-06 22:35:33.664661081 +0000 UTC m=+455.032647033 header map[id:90]
[sub] received message: 93: 2020-08-06 22:35:48.664685902 +0000 UTC m=+470.032671836 header map[id:93]
2020-08-06 22:36:03  file=handler/sample.go:15 level=info Received Sample.Call request
[sub] received message: 96: 2020-08-06 22:36:03.664726242 +0000 UTC m=+485.032712129 header map[id:96]
2020-08-06 22:36:08  file=handler/sample.go:15 level=info Received Sample.Call request
2020-08-06 22:36:13  file=handler/sample.go:15 level=info Received Sample.Call request
2020-08-06 22:36:18  file=handler/sample.go:15 level=info Received Sample.Call request
[sub] received message: 99: 2020-08-06 22:36:18.664686962 +0000 UTC m=+500.032672850 header map[id:99]
[sub] received message: 102: 2020-08-06 22:36:33.664669789 +0000 UTC m=+515.032655723 header map[id:102]
2020-08-06 22:36:38  file=handler/sample.go:15 level=info Received Sample.Call request

Cloud Pub/Sub のメッセージについては、ラウンドロビンで、正確に 3 回ごとに 1 回だけメッセージを受信しています。gRPC リクエストについては、ランダムに分散されています(仮にラウンドロビンであれば、15秒ごとに受信するはずです)。

本番利用における注意

下記のエントリーの「おまけ:Saga パターンの耐障害性について」を参照

enakai00.hatenablog.com


次のパート3では、より実践的なサービスを作ります。

enakai00.hatenablog.com