めもめも

このブログに記載の内容は個人の見解であり、必ずしも所属組織の立場、戦略、意見を代表するものではありません。

Building microservices with go-micro on GCP(パート1)

go-micro について

github.com

go-micro は、microservice 用のサーバーを構築するための Go 言語のフレームワークです。gRPC による(同期型)API サーバー、および、Cloud Pub/Sub などのメッセージブローカーから受信したイベントを処理する(非同期型)API サーバーを簡単に作成することができます。ここでは、GCP の以下のサービスと組み合わせて、go-micro を利用する手順を紹介します。

Google Kubernetes Engine (GKE) : go-micro で作ったサービスを GKE の Pod としてデプロイします。

Cloud Pub/Sub : メッセージブローカーとして Cloud Pub/Sub を使用します。

go-micro はさまざまな機能のバックエンドをプラグインで選択できるようになっており、ここでは、Service Registory 機能として、Pod の Annotation を使用する Kubernetes Registry Plugin と、メッセージブローカーとして Cloud Pub/Sub を使用する googlepubsub Plugin を組み合わせて利用します。

事前準備

まずは、GCP のプロジェクトを作成します。ここでは、プロジェクト ID は「go-micro-test」とします。次に GKE で Kubernetes のクラスターを作成しますが、Pod から Cloud Pub/Sub などの外部サービスを利用できるよう、Access scopes には「Allow full access to all Cloud APIs」を指定してください。

この後の作業は、Cloud Shell からも行うことができますが、ここでは、GCE の VM を使って、開発用サーバーを用意しておくことにします。(Cloud Shell を使う場合は、ここは飛ばして、次のセクションに進んでください。)はじめに、適当なサイズの VM インスタンスを起動します。OS はデフォルトの Debian です。Kubernetes と同様に、Access scopes には「Allow full access to all Cloud APIs」を指定してください。

インスタンスが起動したら、SSH でログインして、Protocol buffer のコンパイラと Go 言語の開発環境を用意します。

sudo apt update
sudo apt -y upgrade
sudo apt install -y build-essential git protobuf-compiler
wget https://golang.org/dl/go1.14.6.linux-amd64.tar.gz
tar -xvzf go1.14.6.linux-amd64.tar.gz
sudo mv go /usr/local

cat <<'EOF' >>~/.bashrc
export GOROOT=/usr/local/go
export GOPATH=~/go
export PATH=$GOPATH/bin:$GOROOT/bin:$PATH
EOF

コンテナイメージをビルドするために Docker をインストールします。

sudo apt install -y apt-transport-https ca-certificates curl software-properties-common gnupg2
curl -fsSL https://download.docker.com/linux/debian/gpg | sudo apt-key add -
sudo add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/debian $(lsb_release -cs) stable"
sudo apt update
sudo apt install -y docker-ce
sudo usermod -aG docker $USER

ここで一度、サーバーを再起動しておきます。

sudo reboot

再起動後に、再度、SSH でログインして、環境が用意できたことを確認します。

$ go version
go version go1.14.6 linux/amd64

$ protoc --version
libprotoc 3.0.0

$ docker version
Client: Docker Engine - Community
 Version:           19.03.12
 API version:       1.40
 Go version:        go1.13.10
 Git commit:        48a66213fe
 Built:             Mon Jun 22 15:45:52 2020
 OS/Arch:           linux/amd64
 Experimental:      false
Server: Docker Engine - Community
 Engine:
  Version:          19.03.12
  API version:      1.40 (minimum version 1.12)
  Go version:       go1.13.10
  Git commit:       48a66213fe
  Built:            Mon Jun 22 15:44:23 2020
  OS/Arch:          linux/amd64
  Experimental:     false
 containerd:
  Version:          1.2.13
  GitCommit:        7ad184331fa3e55e52b890ea95e65ba581ae3429
 runc:
  Version:          1.0.0-rc10
  GitCommit:        dc9208a3303feef5b3839f4323d9beb36df0a9dd
 docker-init:
  Version:          0.18.0
  GitCommit:        fec3683

go-micro のインストールと動作確認

必須ではありませんが、開発中に便利なので、goimports コマンドを入れておきます。

go get golang.org/x/tools/cmd/goimports

go-micro の実行に必要なモジュールをインストールします。

go get github.com/golang/protobuf/{proto,protoc-gen-go}
GO111MODULE=on go get github.com/micro/micro/v2
GO111MODULE=on go get github.com/micro/protoc-gen-micro/v2

サンプルサービスを生成して、go-micro が正しくインストールされていることを確認します。--namespace オプションは、サービス名を World Wide でユニークにするために、FQDN 逆順命名規約で Namespace を指定します。

cd $GOPATH/src
micro new --namespace=com.example --gopath=false sample
cd sample

$GOPATH/src/sample 以下に次のようなテンプレートが生成されています。

$ tree .
.
├── Dockerfile
├── generate.go
├── go.mod
├── handler
│   └── sample.go
├── main.go
├── Makefile
├── plugin.go
├── proto
│   └── sample
│       └── sample.proto
├── README.md
└── subscriber
    └── sample.go

この時、デフォルトで用意された go.mod の内容に注意してください。

go.mod

module sample

go 1.13

// This can be removed once etcd becomes go gettable, version 3.4 and 3.5 is not,
// see https://github.com/etcd-io/etcd/issues/11154 and https://github.com/etcd-io/etcd/issues/11931.
replace google.golang.org/grpc => google.golang.org/grpc v1.26.0

etcd と grpc のモジュールの非互換依存問題があるため、それを回避するための replace エントリが記載されています。自動生成されるテンプレートを使わないと、こういった点にはまる事があるので注意してください。

また、Makefile も用意されているので、次のコマンドでバイナリーのビルドができます。(この時、proto ファイルのコンパイルも行われます。)

make build

出来上がったバイナリーは、ローカルで実行することもできます。

$ ./sample-service 
2020-08-06 11:50:28  file=v2@v2.9.1/service.go:200 level=info Starting [service] com.example.service.sample
2020-08-06 11:50:28  file=grpc/grpc.go:864 level=info Server [grpc] Listening on [::]:39755
2020-08-06 11:50:28  file=grpc/grpc.go:881 level=info Broker [http] Connected to 127.0.0.1:39849
2020-08-06 11:50:28  file=grpc/grpc.go:697 level=info Registry [mdns] Registering node: com.example.service.sample-a764a88f-35ea-4bc5-a603-a5a1e133fff2
2020-08-06 11:50:28  file=grpc/grpc.go:730 level=info Subscribing to topic: com.example.service.sample

gRPC の Listen ポートはランダムに割り当てられますが、Service Discovery 機能があるので気にしなくても構いません。(デフォルトでは、mDNS による discovery 機能が提供されます。)

別のターミナルから SSH ログインして、micro コマンドでサービスの状態を確認してみます。

$ micro list services
com.example.service.sample
micro.http.broker

$ micro get service com.example.service.sample
Endpoint: Sample.Call
Request: {
        message_state MessageState {
                no_unkeyed_literals NoUnkeyedLiterals
                do_not_compare DoNotCompare
                do_not_copy DoNotCopy
                message_info MessageInfo
        }
        int32 int32
        unknown_fields []uint8
        name string
}
Response: {
        message_state MessageState {
                no_unkeyed_literals NoUnkeyedLiterals
                do_not_compare DoNotCompare
                do_not_copy DoNotCopy
                message_info MessageInfo
        }
        int32 int32
        unknown_fields []uint8
        msg string
}
...

Sample.Call というエンドポイントの API があることがわかります。次のように、サービス名とエンドポイント名を指定して、gRPC のリクエストを投げることもできます。

$ micro call com.example.service.sample Sample.Call '{"name": "world!"}'
{
        "msg": "Hello world!"
}

proto ファイルが読める方であれば、proto/sample/sample.proto を見ると API の仕様がわかります。

これで、基本的な動作確認ができました。先ほど起動したサービスは、Ctrl+C で停止しておきます。

次のパートでは、GKE 上にサービスをデプロイします。

enakai00.hatenablog.com

おまけ

proto ファイルの読み方がわからない、という方にはこちらの入門書がお勧めです。

www.amazon.co.jp

Choreography-based saga をローカルで実験するためのフレームワーク

github.com

Choreography-based saga とは?

マイクロサービスの環境で、複数のサービスが連携するトランザクションを実行する手法にSaga パターンがあります。

一般にトランザクションに求められる特性として ACID がありますが、RDB を用いたトランザクションでは、特に Atomicity が容易に実現できます。つまり、ある処理を途中まで実行した段階で、「やっぱりやーめた!」となったときに、「ロールバック」のおまじないを唱えるだけで、それまでの処理をなかった事にできます。

一方、マイクロサービスの場合は、サービスごとに個別にDBを持っているので、(DBのトランザクション機能を用いて)複数のサービスにまたがる処理を「やっぱりやーめた!」と言って元にもどすのは困難です。

そこで、Saga パターンでは、トランザクションに含まれる一連の処理をワークフローのように定義しておき、「やっぱりやーめた」となった時は、それまでに行った処理をキャンセルするフローを実行して、論理的に元の状態に戻すという作戦を取ります。(ワークフローに含まれる個々の「タスク」は1つのサービスに閉じているので、タスク単位での通常のトランザクションは普通に実装可能です。)

さらに、このワークフローを実行する方式として、「Orchestration-based saga」と「Choreography-based saga」があります。

「Orchestration-based saga」では、「ワークフローマネージャー」に相当する機能をどこかのサービスに実装して、そこから各サービスを呼び出してワークフローを実行していきます。ただし、この方式の場合、ワークフローマネージャーとそこから操作されるサービス群の依存関係が強くなる(各サービスの自律性が損なわれる)というデメリットがあります。

そこで、ワークフローを管理するコントローラーを立てずに、各サービスが自立分散的に動作することで、ワークフローを実現しようというアイデアが、「Choreography-based saga」です。

簡単に言うと、各サービスは自分が管理するオブジェクトに変更があると、その事実を「イベント」として発行します。その他のサービスは、このイベントを見て、次に自分がやるべき作業を考えて、それを実行したのちに、その結果をまたイベントとして発行します。このようにして、イベントベースの連携処理がパタパタと繋がっていって、結果的にワークフローが実行されるというものです。(現実世界の事務作業もこんな感じですよね・・・)

具体的な実装としては、Cloud Pub/Sub のようなメッセージキューを用いて、サービスごとのイベントキューを定義しておき、各サービスは、自分のキューにイベントを吐き出していきます。このイベントを誰が受け取るかは気にしない点に注意してください。一方、他のサービスのイベントをトリガーとするサービスは、興味のあるサービスのキューにサブスクライブしておいて、イベントを受け取ります。

なんだかよくわからないという方のために、簡単な例を示します。


これは、Microservices Patterns という書籍にある例を少し簡単化したもので、フードデリバリーのサービスを題材にしたもので、次のようなワークフローになります。

1. クライアントからの発注イベントが ClientEventQueue に入る。
2. OrderService は発注情報(Status = "pending")を生成してDBに保存すると、その情報を示すイベントが OrderEventQueue に入る。
3. KitchenService は発注内容を見て、配送チケット情報(Status = "pending")をDBに保存すると、その情報を示すイベントが KitchenEventQueue に入る。
4. ConsumerService は、配送チケット情報を見て、発注された食事が発注者の Preference (ベジタリアン、アレルギー食品など)に合っている事をチェックする。チェック結果を示すイベントが ConsumerEventQueue に入る。
5. KitchenService は、チェック結果を見て問題なければ、配送チケット情報を更新(Status = "approved")する。その情報を示すイベントが KitchenEventQueue に入る。
6. OrderService は、配送チケット情報を見て、Status = "approved" であれば、発注情報を更新(Status = "approved")して、その情報を示すイベントが OrderEventQueue に入る。

ここでは、正常系のみを示していますが、たとえば、ConsumerService の発注確認結果が NG の場合は、そのあとは、発注を取り消すフローが走ります。

ワークフローのプロトタイプ実装

で・・・・、実際にこのようなワークフローを設計しだすと、机上の設計だけでは不安になるので、プロトタイプを実装して実験したくなります。この際、実際に Pub/Sub のメッセージキューを用意して、複数のマイクロサービスをデプロイして・・・・というのは大変なので、もっと簡易的にローカルでシミュレーションしたくなります。

そこで、Golang の Channel を用いて、複数の Goroutine からアクセスできる擬似メッセージキューを実装してみました。これを使うと、1つのマイクロサービスを1つの Goroutine として起動して、上述のようなワークフローが意図通りに流れるかをローカルで確認することができます。(擬似メッセージキューを本物の Pub/Sub に差し替えられるように、アダプター的に作ってあります。)

コードの説明は面倒なので割愛して、main だけお見せするとこんな感じです。。。

func main() {
	// Create event queues
	clientEventQueue := localQueue.Create("ClientEvents") // interface eventQueue.Queue
	orderEventQueue := localQueue.Create("OrderEvents")
	consumerEventQueue := localQueue.Create("ConsumerEvents")
	kitchenEventQueue := localQueue.Create("KitchenEvents")

	// Start handlers
	orderService.StartHandlers(orderEventQueue, clientEventQueue, kitchenEventQueue)
	kitchenService.StartHandlers(kitchenEventQueue, orderEventQueue, consumerEventQueue)
	consumerService.StartHandlers(consumerEventQueue, kitchenEventQueue)

	// Start saga with an OrderRequest event
	orderRequest := events.OrderRequest{
		ConsumerID:   "consumer001",
		RestaurantID: "restaurant001",
		FoodID:       "food001",
	}
	jsonBytes, _ := json.Marshal(orderRequest)
	eventQueue.Send("OrderRequest", jsonBytes, clientEventQueue)

	// Oberve OrderEvent queue
	orderEventSubscription := orderEventQueue.Subscribe("Client:OrderEvent")
	for {
		orderEventSubscription.Receive()
	}
}

最初に、event queues を作っているところを localQueue パッケージから、他のパッケージに差し替える事で、本物の Pub/Sub に差し替えられる想定です。

これを実行すると、イベントの送受信のログがこんな感じで流れます。

2020/08/03 09:44:26 Send event to ClientEvents: {"consumerID":"consumer001","restaurantID":"restaurant001","foodID":"food001"}
2020/08/03 09:44:26 Receive event by OrderService:ClientEvent: {"consumerID":"consumer001","restaurantID":"restaurant001","foodID":"food001"}
2020/08/03 09:44:26 Send event to OrderEvents: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","orderStatus":"pending","consumerID":"consumer001","restaurantID":"restaurant001","foodID":"food001"}
2020/08/03 09:44:26 Receive event by Client:OrderEvent: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","orderStatus":"pending","consumerID":"consumer001","restaurantID":"restaurant001","foodID":"food001"}
2020/08/03 09:44:26 Receive event by KitchenService:OrderEvent: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","orderStatus":"pending","consumerID":"consumer001","restaurantID":"restaurant001","foodID":"food001"}
2020/08/03 09:44:26 Send event to KitchenEvents: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","consumerID":"consumer001","status":"pending","restaurantID":"restaurant001","FoodID":"food001","FoodType":"vegan"}
2020/08/03 09:44:26 Receive event by ConsumerService:KitchenEvent: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","consumerID":"consumer001","status":"pending","restaurantID":"restaurant001","FoodID":"food001","FoodType":"vegan"}
2020/08/03 09:44:26 Receive event by OrderService:KitchenEvent: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","consumerID":"consumer001","status":"pending","restaurantID":"restaurant001","FoodID":"food001","FoodType":"vegan"}
2020/08/03 09:44:26 Send event to ConsumerEvents: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","ConsumerID":"consumer001","status":"verified"}
2020/08/03 09:44:26 Receive event by KitchenService:ConsumerEvent: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","ConsumerID":"consumer001","status":"verified"}
2020/08/03 09:44:26 Send event to KitchenEvents: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","consumerID":"consumer001","status":"approved","restaurantID":"restaurant001","FoodID":"food001","FoodType":"vegan"}
2020/08/03 09:44:26 Receive event by ConsumerService:KitchenEvent: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","consumerID":"consumer001","status":"approved","restaurantID":"restaurant001","FoodID":"food001","FoodType":"vegan"}
2020/08/03 09:44:26 Receive event by OrderService:KitchenEvent: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","consumerID":"consumer001","status":"approved","restaurantID":"restaurant001","FoodID":"food001","FoodType":"vegan"}
2020/08/03 09:44:26 Send event to OrderEvents: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","orderStatus":"approved","consumerID":"consumer001","restaurantID":"restaurant001","foodID":"food001"}
2020/08/03 09:44:26 Receive event by Client:OrderEvent: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","orderStatus":"approved","consumerID":"consumer001","restaurantID":"restaurant001","foodID":"food001"}
2020/08/03 09:44:26 Receive event by KitchenService:OrderEvent: {"orderID":"bdc75f46-6bf1-4940-954c-a6fe89061d3c","orderStatus":"approved","consumerID":"consumer001","restaurantID":"restaurant001","foodID":"food001"}

おまけ:Saga パターンの耐障害性について

次のような考慮をした実装が必要です。

・各 Event Handler は Idempotent に作ります(メッセージキューは、イベントの重複配信があるので。)

・サービスが1つのイベントを発行する際は、「オブジェクトの更新+イベント発行+トリガーイベントのID記録」をアトミックに実行します。(たとえば、オブジェクトを更新したあと、イベント発行の前に障害停止して、イベントが消失すると取り返しがつかないので。)トリガーイベントのID記録は、同じトリガーイベントが重複配信された際に、これを無視するために利用します。

・トリガーイベントに対するメッセージキューへの Ack は、イベント発行後に行います。(Ack の前に障害停止すると、再起動時に同じイベント処理が走りますが、Event Handler が Idempotent であれば問題ありません。)

・キューに入ったイベントがあやまって消失しない事は、キューの耐障害性で保証します。

今回のサンプルであれば、次のような流れになります。
orderService/eventHandler.go

			// Begin Transaction
			orderDatabase[orderStatus.OrderID] = orderStatus // store order in DB
			jsonBytes, _ := json.Marshal(orderStatus)
			eventQueue.Send("OrderStatus", jsonBytes, orderEventQueue)
			// End Transaction
			// event.Ack()

「DBのアップデートとイベントの発行を1つのトランザクションに入れる」という方法は、実は工夫が必要です。たとえば、「DBをアップデートして、イベントを配信用DBに保存する」というDBに閉じた処理をトランザクションで実施しておき、そのあと、配信用DBに保存されたイベントを送信する処理を別タスクとして実装します。

「ITエンジニアのための強化学習理論入門」が発売されます

www.amazon.co.jp

表題の書籍が技術評論社より発売されることになりました。執筆にご協力いただいた方々には、あらためてお礼を申し上げます。販売開始に先立って、「はじめに」「目次」「図表サンプル」を掲載させていただきますので、先行予約される方の参考にしていただければと思います。

はじめに

 「Q LearningとSARSAの違いを説明してください。」皆さんは、この質問に即答できるでしょうか? 本書を読めば、自信を持って答えられます! —— と、謎の宣伝文句(?)から始まりましたが、少しばかり背景を説明しておきましょう。
 2015年に『ITエンジニアのための機械学習理論入門』(技術評論社)を出版させていただいた後、驚くほどの勢いで機械学習の入門書が書店にあふれるようになりました。そしてまた、回帰モデルによる数値予測、分類モデルによる画像データの識別など、教師データを用いた機械学習モデル、いわゆる「教師あり学習」は、一般企業における活用が進みました。その一方で、エージェントが学習データを収集しながら学習処理を進める「強化学習」の利用は未だ敷居が高く、一般企業における活用は「まだこれから」という状況です。本書では、今後のスキルアップや強化学習の活用に向けた準備をしようと考える ITエンジニアの方々に向けて、強化学習のアルゴリズムを基礎から解説しています。
 動的計画法による厳密解の導出方法から始まり、ニューラルネットワークと強化学習を組み合わせた「DQN(Deep Q Network)」まで、「強化学習がなぜうまくいくのか」という基本原理を解説します。Pythonで実装したコードをGoogle Colaboratoryで実行しながら、それぞれのアルゴリズムがどのように機能するのかを「実感して理解する」ことが本書の一貫したテーマです。既存の機械学習ライブラリをブラックボックスとして用いるのではなく、具体的な動作原理が確認できるように、すべてのアルゴリズムを一から実装しています。「三目並べ」や「あるけあるけゲーム」など、シンプルな題材を用いて、エージェント同士の対戦による相互学習や、実行時の先読みによる性能向上など、より実践的なテクニックにも触れています。
 冒頭の「Q Learning」と「SARSA」は、どちらも強化学習の基礎的なアルゴリズムですが、機械学習の活用が広がるスピードを考えると、近い将来、機械学習に関わるITエンジニアの採用面接では、冒頭のような質問が「あたりまえ」になる日が近いのかも知れません。試験対策が本書の目的ではありませんが、一般的な「教師あり学習」の仕組みを学んだ上で、次のステップとして「強化学習」に取り組みたいと考える皆さんの知的好奇心を満たし、ITエンジニアとしての活動の幅を広げるきっかけが提供できれば、筆者にとってこの上ない喜びです。

目次

第1章 強化学習のゴールと課題
1.1 強化学習の考え方
1.2 実行環境のセットアップ
1.3 バンディットアルゴリズム(基本編)
1.4 バンディットアルゴリズム(応用編)

第2章 環境モデルを用いた強化学習の枠組み
2.1 マルコフ決定過程による環境のモデル化
2.2 エージェントの行動ポリシーと状態価値関数
2.3 動的計画法による状態価値関数の決定

第3章 行動ポリシーの改善アルゴリズム
3.1 ポリシー反復法
3.2 価値反復法
3.3 より実践的な実装例

第4章 サンプリングデータを用いた学習法
4.1 モンテカルロ法
4.2 TD(Temporal-Difference)法

第5章 ニューラルネットワークによる関数近似
5.1 ニューラルネットワークによる状態価値関数の計算
5.2 ニューラルネットワークを用いたQ-Learning

図表サンプル