作ってわかる! はじめてのgRPC

02 RPC の具現化である gRPC

RPC(Remote Procedure Call)とは

package main

func main() {
	res := hello("hsaki")
	fmt.Println(res)
}

func hello(name string) string {
	return fmt.Sprintf("Hello, %s!", name)
}

「main 関数の中から hello という Procedure(手続き)を Call(呼び出し)している」と表現できる。
この例であげた Procedure Call は「main 関数と呼び出し Procedure である hello 関数が、同じローカル上にある」パターンです。
これに対して Remote Procedure Call は、呼び出し元(=main 関数)と呼び出される Procedure(=hello 関数)が別の場所・別のサーバー上にあるパターンのことを指している。

gRPC とは

RPC のやり方でサービス間通信を行うために、様々なプロコトルの中の一つ、Google が開発・提案した RPC のプロトコルが gRPC。

gRPC が用いる技術

  • HTTP/2
  • Protocal Buffers

通信方式 - HTTP/2

gRPC では、HTTP/2 の POST リクエストとそのレスポンスを使っている。

POST /path/to/hello

Request Body
200 OK

Response Body

シリアライズ方式 - protocol Buffers

リクエストボディ、レスポンスボディはそのままプレーンテキストで記述されるのではなく、Protocol Buffers というシリアライズ方式を用いてバイナリに変換したもの。

03 Proto ファイルで Procedure を定義する

proto ファイルの記述方式

hello.proto
// protoのバージョンの宣言
syntax = "proto3";

// protoファイルから自動生成させるGoのコードの置き先
// (詳細は4章にて)
option go_package = "pkg/grpc";

// packageの宣言
package myapp;

// サービスの定義
service GreetingService {
	// サービスが持つメソッドの定義
	rpc Hello (HelloRequest) returns (HelloResponse);
}

// 型の定義
message HelloRequest {
	string name = 1;
}

message HelloResponse {
	string message = 1;
}

Protocol Buffer Language のバージョン指定

proto ファイルを記述する Protocol Buffer Language には、proto2proto3の2種類のバージョンがある。

package の宣言

proto ファイルでは「他の proto ファイルで定義された型を使って記述する」ということもできるようになっており、その際に「パッケージ名.型名」という形で他の proto ファイル内の型を参照することになる。

proto ファイル内で定義できる型

Well Known Type

Protocol Buffers に組み込みで用意されている型以外にも、Google が定義してパッケージとして公開した便利な型の集合「Well Known Types」がある。

https://developers.google.com/protocol-buffers/docs/reference/google.protobufhttps://developers.google.com/protocol-buffers/docs/reference/google.protobuf

時刻を表すTimestamp型や、引数・戻り値なしを表現するためのEmpty型のような便利な型がgoogle.protobufというパッケージに多数定義されている。

// Timestamp型を使ってMyMessage型を定義した例

// Timestamp型を記述しているprotoファイルをimport
import "google/protobuf/timestamp.proto";

message MyMessage {
	string message = 1;
	// パッケージ名"google.protobuf" + 型名"Timestamp"で記述
	google.protobuf.Timestamp create_time = 2;
}

04 proto ファイルからコードを自動生成する

前準備

依存パッケージのインストール

$ brew install protobuf

proto ファイルからコードを自動生成させるために、protocコマンドを使用。

Go のパッケージをインストール。

  • google.golang.org/grpc: Go で gRPC を扱うためのパッケージ
  • google.golang.org/grpc/cmd/protoc-gen-go-grpc: protoc コマンドが Go のコードを生成するのに利用

コード生成

protoc コマンドでコードを生成する

./src
├─ api
│   └─ hello.proto # protoファイル
├─ pkg
│   └─ grpc # ここにコードを自動生成させる
├─ go.mod
└─ go.sum
$ cd api
$ protoc --go_out=../pkg/grpc --go_out=paths=source_relative \
				--go-grpc_out=../pkg/grpc --go-grpc_opt=paths=source_relative \
				hello.proto

以下のファイルが生成される。

  • hello.pb.go:proto ファイルから自動生成されたリクエスト/レスポンス型を定義した部分のコード
  • hello_grpc.pb.go:proto ファイルから自動生成されたサービス部分のコード

コード自動生成の仕様

proto ファイルの記述がどのような Go のコードに変換されるのかについて

05 gRPC サーバーを動かしてみよう

gRPC サーバーの実装

サーバーを起動する部分のコードを書く

サーバー側の実装
cmd/server/main.go
package main

import (
	// (一部抜粋)
+	hellopb "mygrpc/pkg/grpc"
)

func main() {
	// 1. 8080番portのLisnterを作成
	port := 8080
	listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
	if err != nil {
		panic(err)
	}

	// 2. gRPCサーバーを作成
	s := grpc.NewServer()

	// 3. gRPCサーバーにGreetingServiceを登録
	hellopb.RegisterGreetingServiceServer(s, NewMyServer())

	// 4. 作成したgRPCサーバーを、8080番ポートで稼働させる
	go func() {
		log.Printf("start gRPC server port: %v", port)
		s.Serve(listener)
	}()

	// 5.Ctrl+Cが入力されたらGraceful shutdownされるようにする
	quit := make(chan os.Signal, 1)
	signal.Notify(quit, os.Interrupt)
	<-quit
	log.Println("stopping gRPC server...")
	s.GracefulStop()
}

// 自作サービス構造体のコンストラクタを定義
func NewMyServer() *myServer {
	return &myServer{}
}

func (s *myServer) Hello(ctx context.Context, req *hellopb.HelloRequest) (*hellopb.HelloResponse, error) {
	// リクエストからnameフィールドを取り出して
	// "Hello, [名前]!"というレスポンスを返す
	return &hellopb.HelloResponse{
		Message: fmt.Sprintf("Hello, %s!", req.GetName()),
	}, nil
}

サービスの実態を作成

pkg/grpc/hello_grpc.pb.go
// RegisterGreetingServiceServer関数の定義
// -> 第二引数はGreetingServiceServerインターフェース型
func RegisterGreetingServiceServer(s grpc.ServiceRegistrar, srv GreetingServiceServer)

// GreetingServiceServerインターフェース型の定義
type GreetingServiceServer interface {
	// Helloメソッドを持つ
	Hello(context.Context, *HelloRequest) (*HelloResponse, error)
	mustEmbedUnimplementedGreetingServiceServer()
}

自作サービス構造体の定義

/cmd/server/main.go
type myServer struct {
	hellopb.UnimplementedGreetingServiceServer
}

サーバーを起動して動作確認をしてみよう

gRPCurl のインストール

curlコマンドのようにターミナル上で gRPC のリクエストを送ることができる。

$ brew install grpcurl

サーバーリフクレクションの設定

gRPCurl を使うためには、リクエストを送る gRPC サーバーに「サーバーリフレクション」という設定がなされていることが前提。

cmd/server/main.go
import (
	// (一部抜粋)
+	"google.golang.org/grpc/reflection"
)

func main() {
	// (略)
	hellopb.RegisterGreetingServiceServer(s, [サーバーに登録するサービス])

+	// 4. サーバーリフレクションの設定
+	reflection.Register(s)
	// (略)
}

サーバーリフレクションとは?

シリアライズ・デシリアライズを行うためには、proto ファイルによって書かれた「シリアライズのルール」を知る必要がある。
gRPC クライアントは、サーバーファイルと同じく protoc コマンドから自動生成されたコードを使用して作るため、その「シリアライズのルール」が既に組み込まれているのですが、gRPCurl コマンドは違う。
元から proto ファイルによるメッセージ型の定義を知らない gRPCurl コマンドは、代わりに「gRPC サーバーそのものから、proto ファイルの情報を取得する」ことで「シリアライズのルール」を知り通信する。
そしてその「gRPC サーバーそのものから、proto ファイルの情報を取得する」ための機能がサーバーリフレクション。

動作確認

サーバー内に実装されているサービス一覧の確認

$ grpcurl -plaintext localhost:8080 list
grpc.reflection.v1alpha.ServerReflection
myapp.GreetingService

あるサービスのメソッド一覧の確認

$ grpcurl -plaintext localhost:8080 list myapp.GreetingService
myapp.GreetingService.Hello

メソッドの呼び出し

$ grpcurl -plaintext -d '{"name": "hsaki"}' localhost:8080 myapp.GreetingService.Hello
{
  "message": "Hello, hsaki!"
}

06 gRPC クライアントを動かしてみよう

クライアント側の実装
cmd/client/main.go
import (
	// (一部抜粋)
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	hellopb "mygrpc/pkg/grpc"
)

var (
	scanner *bufio.Scanner
	client  hellopb.GreetingServiceClient
)

func main() {
	fmt.Println("start gRPC Client.")

	// 1. 標準入力から文字列を受け取るスキャナを用意
	scanner = bufio.NewScanner(os.Stdin)

	// 2. gRPCサーバーとのコネクションを確立
	address := "localhost:8080"
	conn, err := grpc.Dial(
		address,

		grpc.WithTransportCredentials(insecure.NewCredentials()),
		grpc.WithBlock(),
	)
	if err != nil {
		log.Fatal("Connection failed.")
		return
	}
	defer conn.Close()

	// 3. gRPCクライアントを生成
	client = hellopb.NewGreetingServiceClient(conn)

	for {
		fmt.Println("1: send Request")
		fmt.Println("2: exit")
		fmt.Print("please enter >")

		scanner.Scan()
		in := scanner.Text()

		switch in {
		case "1":
			Hello()

		case "2":
			fmt.Println("bye.")
			goto M
		}
	}
}

func Hello() {
	fmt.Println("Please enter your name.")
	scanner.Scan()
	name := scanner.Text()

	req := &hellopb.HelloRequest{
		Name: name,
	}
	res, err := client.Hello(context.Background(), req)
	if err != nil {
		fmt.Println(err)
	} else {
		fmt.Println(res.GetMessage())
	}
}

07 gRPC で実現できるストリーミング処理

gRPC で可能な通信方式

  • Unary RPC
  • Server Streaming RPC
  • Client Streaming RPC
  • Bidirectional Streaming RPC

Unary RPC

「1 リクエスト-1 レスポンス」の通信方法。

Server Streaming RPC

クライアントから送られた 1 回のリクエストに対して、サーバーからのレスポンスが複数返ってくる通信方式.
例えば、「サーバー側からプッシュ通知を受け取る」場面など。

Client Streaming RPC

クライアントから複数回リクエストを送信し、サーバーがそれに対してレスポンスを 1 回返す通信方式。
例えば、クライアント側から複数回に分けてデータをアップロードして、すべて受け取った段階でサーバーが一回だけ OK と返すような用途など。

Bidirectional Streaming RPC

WebSocket のようなサーバー・クライアントともに任意のタイミングでリクエスト・レスポンスを送ることができる通信方式。
例えば、ping-pong のような 1 リクエスト-1 レスポンスをすべて受け取るまでレスポンスは返さないということも可能。

gRPC のストリーミングを支える技術

柔軟なストリーミング通信ができるのは、gRPC が HTTP/2 のプロトコル上で実現されているから。

HTTP/2 の「フレーム」

HTTP/2 では、送受信するデータをフレームという単位に分割して扱っている。

フレームのタイプ

全部で 10 種類のフレームタイプが定義されているが、その中で特によく使われるのが以下の 2 種類。

  • DATA フレーム: リクエスト/レスポンスボディを送信するフレーム
  • HEADERS フレーム: リクエスト/レスポンスヘッダーを送信するフレーム

この 2 つのフレームを使って、以下のようにクライアントとサーバーが複数回に分けてデータを送信する。

  1. 最初に HEADER フレームを送る
  2. リクエストボディを複数個の DATA フレームに分けて送信する
  3. (レスポンスの場合) gRPC ステータスを含んだ最後の  HEADER フレームを送る

フレームのフラグ

このように、HTTP/2 では 1 つの送受信データを複数個のフレームに分割してやり取りする。
そのため、最後のフレームであることを、どこかのタイミングで知らせてあげる必要がある。

gRPC のストリーミングでは、送信する最後のフレームのフラグフィールドに END_STREAM フラグをつけることで、もう送るデータがないことを相手に知らせる。

08 サーバーストリーミングの実装

proto ファイルでの定義

api/hello.proto
service GreetingService {
	// サービスが持つメソッドの定義
	rpc Hello (HelloRequest) returns (HelloResponse);
+	// サーバーストリーミングRPC
+	rpc HelloServerStream (HelloRequest) returns (stream HelloResponse);
}

サーバーサイドの実装

自動生成されたコード

pkg/grpc/hello_grpc.pb.go
type GreetingServiceServer interface {
	// サービスが持つメソッドの定義
	Hello(context.Context, *HelloRequest) (*HelloResponse, error)
+	// サーバーストリーミングRPC
+	HelloServerStream(*HelloRequest, GreetingService_HelloServerStreamServer) error
	mustEmbedUnimplementedGreetingServiceServer()
}

// 自動生成された、サーバーストリーミングのためのインターフェース(for サーバー)
+type GreetingService_HelloServerStreamServer interface {
+	Send(*HelloResponse) error
+	grpc.ServerStream
+}

サーバーサイドのビジネスロジックを実装する

cmd/server/main.go
func (s *myServer) HelloServerStream(req *hellopb.HelloRequest, stream hellopb.GreetingService_HelloServerStreamServer) error {
	resCount := 5
	for i := 0; i < resCount; i++ {
		if err := stream.Send(&hellopb.HelloResponse{
			Message: fmt.Sprintf("[%d] Hello, %s!", i, req.GetName()),
		}); err != nil {
			return err
		}
		time.Sleep(time.Second * 1)
	}
	return nil
}

Sendメソッドを何度も実行することで何度もクライアントにレスポンスを返すことが可能。
return分で nil or error を返すことでストリームを終わらせることができる。

クライアントコードの実装

自動生成されたコード

pkg/grpc/hello_grpc.pb.go
type GreetingServiceClient interface {
	// サービスが持つメソッドの定義
	Hello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error)
+	// サーバーストリーミングRPC
+	HelloServerStream(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (GreetingService_HelloServerStreamClient, error)
}

// 自動生成された、サーバーストリーミングのためのインターフェース(for クライアント)
type GreetingService_HelloServerStreamClient interface {
	Recv() (*HelloResponse, error)
	grpc.ClientStream
}

クライアントの実装

+func HelloServerStream() {
+	fmt.Println("Please enter your name.")
+	scanner.Scan()
+	name := scanner.Text()
+
+	req := &hellopb.HelloRequest{
+		Name: name,
+	}
+	stream, err := client.HelloServerStream(context.Background(), req)
+	if err != nil {
+		fmt.Println(err)
+		return
+	}
+
+	for {
+		res, err := stream.Recv()
+		if errors.Is(err, io.EOF) {
+			fmt.Println("all the responses have already received.")
+			break
+		}
+
+		if err != nil {
+			fmt.Println(err)
+		}
+		fmt.Println(res)
+	}
+}

Recv メソッドでレスポンスを受け取るとき、これ以上受け取るレスポンスがないという状態なら、第一戻り値は nil、第二戻り値の err には io.EOF が格納される。

var EOF = errors.New("EOF")

09 クライアントストリーミングの実装

proto ファイルでの定義

api/hello.proto
service GreetingService {
	// サービスが持つメソッドの定義
	rpc Hello (HelloRequest) returns (HelloResponse);
	// サーバーストリーミングRPC
	rpc HelloServerStream (HelloRequest) returns (stream HelloResponse);
+	// クライアントストリーミングRPC
+	rpc HelloClientStream (stream HelloRequest) returns (HelloResponse);
}

サーバーサイドの実装

自動生成されたコード
pkg/grpc/hello_grpc.pb.go
type GreetingServiceServer interface {
	// サービスが持つメソッドの定義
	Hello(context.Context, *HelloRequest) (*HelloResponse, error)
	// サーバーストリーミングRPC
	HelloServerStream(*HelloRequest, GreetingService_HelloServerStreamServer) error
+	// クライアントストリーミングRPC
+	HelloClientStream(GreetingService_HelloClientStreamServer) error
	mustEmbedUnimplementedGreetingServiceServer()
}

// 自動生成された、クライアントストリーミングのためのインターフェース(for サーバー)
type GreetingService_HelloClientStreamServer interface {
	SendAndClose(*HelloResponse) error
	Recv() (*HelloRequest, error)
	grpc.ServerStream
}
ビジネスロジックを実装
cmd/server/main.go
func (s *myServer) HelloClientStream(stream hellopb.GreetingService_HelloClientStreamServer) error {
	nameList := make([]string, 0)
	for {
		req, err := stream.Recv()
		if errors.Is(err, io.EOF) {
			message := fmt.Sprintf("Hello, %v!", nameList)
			return stream.SendAndClose(&hellopb.HelloResponse{
				Message: message,
			})
		}
		if err != nil {
			return err
		}
		nameList = append(nameList, req.GetName())
	}
}

クライアントコードの実装

自動生成されたコード
pkg/grpc/hello_grpc.pb.go
type GreetingServiceClient interface {
	// サービスが持つメソッドの定義
	Hello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error)
	// サーバーストリーミングRPC
	HelloServerStream(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (GreetingService_HelloServerStreamClient, error)
+	// クライアントストリーミングRPC
+	HelloClientStream(ctx context.Context, opts ...grpc.CallOption) (GreetingService_HelloClientStreamClient, error)
}

// 自動生成された、クライアントストリーミングのためのインターフェース(for クライアント)
type GreetingService_HelloClientStreamClient interface {
	Send(*HelloRequest) error
	CloseAndRecv() (*HelloResponse, error)
	grpc.ClientStream
}
クライアントの実装
cmd/client/main.go
+func HelloClientStream() {
+	stream, err := client.HelloClientStream(context.Background())
+	if err != nil {
+		fmt.Println(err)
+		return
+	}
+
+	sendCount := 5
+	fmt.Printf("Please enter %d names.\n", sendCount)
+	for i := 0; i < sendCount; i++ {
+		scanner.Scan()
+		name := scanner.Text()
+
+		if err := stream.Send(&hellopb.HelloRequest{
+			Name: name,
+		}); err != nil {
+			fmt.Println(err)
+			return
+		}
+	}
+
+	res, err := stream.CloseAndRecv()
+	if err != nil {
+		fmt.Println(err)
+	} else {
+		fmt.Println(res.GetMessage())
+	}
+}

10 双方向ストリーミングの実装

proto ファイルでの定義

api/hello.proto
service GreetingService {
	// サービスが持つメソッドの定義
	rpc Hello (HelloRequest) returns (HelloResponse);
	// サーバーストリーミングRPC
	rpc HelloServerStream (HelloRequest) returns (stream HelloResponse);
	// クライアントストリーミングRPC
	rpc HelloClientStream (stream HelloRequest) returns (HelloResponse);
+	// 双方向ストリーミングRPC
+	rpc HelloBiStreams (stream HelloRequest) returns (stream HelloResponse);
}

サーバーサイドの実装

自動生成されたコード
pkg/grpc/hello_grpc.pb.go
type GreetingServiceServer interface {
	// サービスが持つメソッドの定義
	Hello(context.Context, *HelloRequest) (*HelloResponse, error)
	// サーバーストリーミングRPC
	HelloServerStream(*HelloRequest, GreetingService_HelloServerStreamServer) error
	// クライアントストリーミングRPC
	HelloClientStream(GreetingService_HelloClientStreamServer) error
+	// 双方向ストリーミングRPC
+	HelloBiStreams(GreetingService_HelloBiStreamsServer) error
	mustEmbedUnimplementedGreetingServiceServer()
}

type GreetingService_HelloBiStreamsServer interface {
	Send(*HelloResponse) error
	Recv() (*HelloRequest, error)
	grpc.ServerStream
}
ビジネスロジックを実装

「一つリクエストを受信するごとに、それに対するレスポンスを一つ返す」というロジックを実装。

cmd/server/main.go
func (s *myServer) HelloBiStreams(stream hellopb.GreetingService_HelloBiStreamsServer) error {
	for {
		req, err := stream.Recv()
		if errors.Is(err, io.EOF) {
			return nil
		}
		if err != nil {
			return err
		}
		message := fmt.Sprintf("Hello, %v!", req.GetName())
		if err := stream.Send(&hellopb.HelloResponse{
			Message: message,
		}); err != nil {
			return err
		}
	}
}

クライアントコードの実装

自動生成されたコード
pkg/grpc/hello_grpc.pb.go
type GreetingServiceClient interface {
	// サービスが持つメソッドの定義
	Hello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error)
	// サーバーストリーミングRPC
	HelloServerStream(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (GreetingService_HelloServerStreamClient, error)
	// クライアントストリーミングRPC
	HelloClientStream(ctx context.Context, opts ...grpc.CallOption) (GreetingService_HelloClientStreamClient, error)
+	// 双方向ストリーミングRPC
+	HelloBiStreams(ctx context.Context, opts ...grpc.CallOption) (GreetingService_HelloBiStreamsClient, error)
}

type GreetingService_HelloBiStreamsClient interface {
	Send(*HelloRequest) error
	Recv() (*HelloResponse, error)
	grpc.ClientStream
}
クライアントの実装
cmd/client/main.go
+func HelloBiStreams() {
+	stream, err := client.HelloBiStreams(context.Background())
+	if err != nil {
+		fmt.Println(err)
+		return
+	}
+
+	sendNum := 5
+	fmt.Printf("Please enter %d names.\n", sendNum)
+
+	var sendEnd, recvEnd bool
+	sendCount := 0
+	for !(sendEnd && recvEnd) {
+		// 送信処理
+		if !sendEnd {
+			scanner.Scan()
+			name := scanner.Text()
+
+			sendCount++
+			if err := stream.Send(&hellopb.HelloRequest{
+				Name: name,
+			}); err != nil {
+				fmt.Println(err)
+				sendEnd = true
+			}
+
+			if sendCount == sendNum {
+				sendEnd = true
+				if err := stream.CloseSend(); err != nil {
+					fmt.Println(err)
+				}
+			}
+		}
+
+		// 受信処理
+		if !recvEnd {
+			if res, err := stream.Recv(); err != nil {
+				if !errors.Is(err, io.EOF) {
+					fmt.Println(err)
+				}
+				recvEnd = true
+			} else {
+				fmt.Println(res.GetMessage())
+			}
+		}
+	}
+}

11 gRPC におけるステータスコード

HTTP のレスポンスステータスコードとの違い

gRPC の場合は、「メソッドの呼び出しに成功した場合には、中で何が怒ろうとも HTTP レスポンスステータスコードは 200 OK を返す」ように固定されている。

その代わり、gRPC は「メソッド内の処理が正しく実行されたのか」「エラーが起きたとしたらどのようなエラーなのか」を表現するために独自のステータスコードを用意している。
HTTP/2 上では、レスポンスヘッダ内のフィールドに格納することで伝達している。

status = 200
content-type = application/grpc+proto
grpc-status = 4 # これ
grpc-message = timeout # これ

なぜ HTTP ステータスコードでエラーを表現しないのか

gRPC が「メソッドを呼び出し、戻り値を受け取る」ことに関心をおいている。それゆえに「gRPC が  HTTP/2 の上に実装されている」ということを意識しないくて言いように設計されているから。
そんため、「呼び出されたメソッドが正しく処理を実行したか」を知るために HTTP のステータスコードを見に行くというのは gRPC-like ではない。

gRPC エラーコード一覧

https://grpc.io/docs/guides/error/#error-status-codeshttps://grpc.io/docs/guides/error/#error-status-codes

Standard Error Model の実装

サーバサイドの実装

func (s *myServer) Hello(ctx context.Context, req *hellopb.HelloRequest) (*hellopb.HelloResponse, error) {
	err := status.Error(codes.Unknown, "unknown error occurred")
	return nil, err
}

クライアント側の実装

google.golang.org/grpc/statusパッケージには、クライアントメソッドから受け取ったエラーから、gRPC のステータスコード・メッセージを復元するための FromError 関数を持っています。

import (
+	"google.golang.org/grpc/status"
)

func Hello() {
	res, err := client.Hello(context.Background(), req)
	if err != nil {
-		fmt.Println(err)
+		if stat, ok := status.FromError(err); ok {
+			fmt.Printf("code: %s\n", stat.Code())
+			fmt.Printf("message: %s\n", stat.Message())
+		} else {
+			fmt.Println(err)
+		}
	} else {
		fmt.Println(res.GetMessage())
	}
}

Richer Error Model の実装

gRPC ステータスの details フィールド

発生したエラーの情報をメッセージの一文字列でしか伝えられないのは不便。
スタックトレースのように詳細な情報を付け加える手段として details フィールドというものも設定することができる.

details フィールドをステータスに付与するためのメソッドとして、WithDetails メソッドがある。

func (s *Status) WithDetails(details ...proto.Message) (*Status, error)

サーバーサイドの実装

func (s *myServer) Hello(ctx context.Context, req *hellopb.HelloRequest) (*hellopb.HelloResponse, error) {
	// (何か処理をしてエラーが発生した)
-	err := status.Error(codes.Unknown, "unknown error occurred")
+	stat := status.New(codes.Unknown, "unknown error occurred")
+	stat, _ = stat.WithDetails([スタックトレースにするProtobufのメッセージ])
+	err := stat.Err()

	return &hellopb.HelloResponse{/*(略)*/}, err
}

WithDetailsメソッドに渡すメッセージ型は、Protobuf 由来の構造体であれば何でも OK 。
gRPC 公式として推奨しているのは、Google が公開している Standard Set of Error Message Types を使うこと。

https://github.com/googleapis/googleapis/blob/master/google/rpc/error_details.protohttps://github.com/googleapis/googleapis/blob/master/google/rpc/error_details.proto

google/epc/error_details.proto
message RetryInfo {
  // Clients should wait at least this long between retrying the same request.
  google.protobuf.Duration retry_delay = 1;
}

message DebugInfo {
  // The stack trace entries indicating where the error occurred.
  repeated string stack_entries = 1;

  // Additional debugging information provided by the server.
  string detail = 2;
}

この proto ファイルに定義されている型を Go のコードの中で使うためには、本来ならば protoc コマンドでコードを生成させそれをインポートして使う必要がある。
しかし、例の proto ファイルから自動生成されたコードが、errdetails パッケージとして既に公開されている。

https://pkg.go.dev/google.golang.org/genproto/googleapis/rpc/errdetailshttps://pkg.go.dev/google.golang.org/genproto/googleapis/rpc/errdetails

$ go get -u google.golang.org/genproto/googleapis/rpc/errdetails
cmd/server/main.go
import(
	// (一部抜粋)
+	"google.golang.org/genproto/googleapis/rpc/errdetails"
)

func (s *myServer) Hello(ctx context.Context, req *hellopb.HelloRequest) (*hellopb.HelloResponse, error) {
	// (何か処理をしてエラーが発生した)
	stat := status.New(codes.Unknown, "unknown error occurred")
-	stat, _ = stat.WithDetails([スタックトレースにするProtobufのメッセージ])
+	stat, _ = stat.WithDetails(&errdetails.DebugInfo{
+		Detail: "detail reason of err",
+	})
	err := stat.Err()

	return &hellopb.HelloResponse{/*(略)*/}, err
}

クライアントサイドの実装

cmd/client/main.go
import (
	// (一部抜粋)
+	_ "google.golang.org/genproto/googleapis/rpc/errdetails"
)

func Hello() {
	// (一部抜粋)
	res, err := client.Hello(context.Background(), req)
	if err != nil {
		if stat, ok := status.FromError(err); ok {
			fmt.Printf("code: %s\n", stat.Code())
			fmt.Printf("message: %s\n", stat.Message())
+			fmt.Printf("details: %s\n", stat.Details())
		} else {
			fmt.Println(err)
		}
	} else {
		fmt.Println(res.GetMessage())
	}
}

12 インターセプタの導入 - サーバーサイド編

gRPC では、ハンドラ処理の前後に追加処理を挟むミドルウェアのことをインターセプタと呼ぶ。

Unary RPC のインターセプタ

Unary Interceptor の形

type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

ref. https://pkg.go.dev/google.golang.org/grpc#UnaryServerInterceptor

自作 Unary Interceptor の実装

func myUnaryServerInterceptor1(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
	log.Println("[pre] my unary server interceptor 1: ", info.FullMethod) // ハンドラの前に割り込ませる前処理
	res, err := handler(ctx, req) // 本来の処理
	log.Println("[post] my unary server interceptor 1: ", m) // ハンドラの後に割り込ませる後処理
	return res, err
}

インターセプタの導入

cmd/server/main.go
func main() {
	// (一部抜粋)
	s := grpc.NewServer(
		grpc.UnaryInterceptor(myUnaryServerInterceptor1),
	)
}

Stream RPC のインターセプタ

Stream Interceptor の形

type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error

ref. https://pkg.go.dev/google.golang.org/grpc#StreamServerInterceptor

自作 Stream Interceptor の実装

func myStreamServerInterceptor1(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	// ストリームがopenされたときに行われる前処理
	log.Println("[pre stream] my stream server interceptor 1: ", info.FullMethod)

	err := handler(srv, &myServerStreamWrapper1{ss}) // 本来のストリーム処理

	// ストリームがcloseされるときに行われる後処理
	log.Println("[post stream] my stream server interceptor 1: ")
	return err
}

type myServerStreamWrapper1 struct {
	grpc.ServerStream
}

func (s *myServerStreamWrapper1) RecvMsg(m interface{}) error {
	// ストリームから、リクエストを受信
	err := s.ServerStream.RecvMsg(m)
	// 受信したリクエストを、ハンドラで処理する前に差し込む前処理
	if !errors.Is(err, io.EOF) {
		log.Println("[pre message] my stream server interceptor 1: ", m)
	}
	return err
}

func (s *myServerStreamWrapper1) SendMsg(m interface{}) error {
	// ハンドラで作成したレスポンスを、ストリームから返信する直前に差し込む後処理
	log.Println("[post message] my stream server interceptor 1: ", m)
	return s.ServerStream.SendMsg(m)
}

ストリーミング RPC の流れ

ストリーミング処理の場合、リクエスト・レスポンスの送受信は以下のようなステップで実行される。

  1. ストリームを open する
  2. 以下を繰り返す
    1. ストリームからリクエストを受信する
    2. ハンドラ内で、リクエストに対するレスポンスを生成する
    3. ストリームを通じて、レスポンスを送信する
  3. ストリームを close する

そのため、単純に前処理・後処理といっても「ストリーム open/close ときの処理」なのか「ストリームから実際にデータを送受信するときの処理」なのかという選択肢が生まれる。

ストリーム open/close に着目した前処理・後処理

func myStreamServerInterceptor1(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	// 前処理をここに書く

	err := handler(srv, &myServerStreamWrapper1{ss}) // 本来のストリーム処理

	// 後処理をここに書く

	return err
}

メッセージの送受信に着目した前処理・後処理

type ServerStream interface {
	// (一部抜粋)
	RecvMsg(m interface{}) error
	SendMsg(m interface{}) error
}
  1. grpc.ServerStreamインターフェース型を満たす独自構造体を作成
  2. 独自構造体のRecvMsgSendMsgメソッドを、自分がやりたい処理を入れ込む形でオーバーライド

インターセプタの導入

cmd/server/main.go
func main() {
	// (一部抜粋)
	s := grpc.NewServer(
		grpc.StreamInterceptor(myStreamServerInterceptor1),
	)
}

複数個のインターセプタの導入

unary.go
func main() {
	s := grpc.NewServer(
-		grpc.UnaryInterceptor(myUnaryServerInterceptor1),
+		grpc.ChainUnaryInterceptor(
+			myUnaryServerInterceptor1,
+			myUnaryServerInterceptor2,
+		),
	)
}
stream.go
func main() {
	s := grpc.NewServer(
-		grpc.StreamInterceptor(myStreamServerInterceptor1),
+		grpc.ChainStreamInterceptor(
+			myStreamServerInterceptor1,
+			myStreamServerInterceptor2,
+		),
	)
}

13 インターセプタの導入 - クライアントサイド編

Unary RPC のインターセプタ

Unary Interceptor の形

type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

ref. https://pkg.go.dev/google.golang.org/grpc#UnaryClientInterceptor

自作 Unary Interceptor の実装

func myUnaryClientInteceptor1(ctx context.Context, method string, req, res interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
	fmt.Println("[pre] my unary client interceptor 1", method, req) // リクエスト送信前に割り込ませる前処理
	err := invoker(ctx, method, req, res, cc, opts...) // 本来のリクエスト
	fmt.Println("[post] my unary client interceptor 1", res) // リクエスト送信後に割り込ませる後処理
	return err
}

インターセプタの導入

func main() {
	// (一部抜粋)
	conn, err := grpc.Dial(
		address,
		grpc.WithUnaryInterceptor(myUnaryClientInteceptor1),

		grpc.WithTransportCredentials(insecure.NewCredentials()),
		grpc.WithBlock(),
	)
}

Stream RPC のインターセプタ

Stream Interceptor の形

type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)

ref. https://pkg.go.dev/google.golang.org/grpc#StreamClientInterceptor

自作 Unary Interceptor の実装

func myStreamClientInteceptor1(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
	// ストリームがopenされる前に行われる前処理
	log.Println("[pre] my stream client interceptor 1", method)

	// ストリームを生成 -> 返り値として返す
	// このストリームを用いて、クライアントは送受信処理を行う
	stream, err := streamer(ctx, desc, cc, method, opts...)
	return &myClientStreamWrapper1{stream}, err
}

type myClientStreamWrapper1 struct {
	grpc.ClientStream
}

func (s *myClientStreamWrapper1) SendMsg(m interface{}) error {
	// リクエスト送信前に割り込ませる処理
	log.Println("[pre message] my stream client interceptor 1: ", m)

	// リクエスト送信
	return s.ClientStream.SendMsg(m)
}

func (s *myClientStreamWrapper1) RecvMsg(m interface{}) error {
	err := s.ClientStream.RecvMsg(m) // レスポンス受信処理

	// レスポンス受信後に割り込ませる処理
	if !errors.Is(err, io.EOF) {
		log.Println("[post message] my stream client interceptor 1: ", m)
	}
	return err
}

func (s *myClientStreamWrapper1) CloseSend() error {
	err := s.ClientStream.CloseSend() // ストリームをclose

	// ストリームがcloseされた後に行われる後処理
	log.Println("[post] my stream client interceptor 1")
	return err
}
type ClientStream interface {
	// (一部抜粋)
	SendMsg(m interface{}) error
	RecvMsg(m interface{}) error
	CloseSend() error
}

インターセプタの導入

func main() {
	conn, err := grpc.Dial(
		address,
		grpc.WithStreamInterceptor(myStreamClientInteceptor1),

		grpc.WithTransportCredentials(insecure.NewCredentials()),
		grpc.WithBlock(),
	)
}

複数個のインターセプターの導入

func main() {
	conn, err := grpc.Dial(
		address,
-		grpc.WithUnaryInterceptor(myUnaryClientInteceptor1),
+		grpc.WithChainUnaryInterceptor(
+			myUnaryClientInteceptor1,
+			myUnaryClientInteceptor2,
+		),

		grpc.WithTransportCredentials(insecure.NewCredentials()),
		grpc.WithBlock(),
	)
}
func main() {
	conn, err := grpc.Dial(
		address,
-		grpc.WithStreamInterceptor(myStreamClientInteceptor1),
+		grpc.WithChainStreamInterceptor(
+			myStreamClientInteceptor1,
+			myStreamClientInteceptor2,
+		),

		grpc.WithTransportCredentials(insecure.NewCredentials()),
		grpc.WithBlock(),
	)
}

14 メタデータの送受信

通常の HTTP 通信の場合には、これらの情報をヘッダーフィールドに入れてやりとりしていましたが、gRPC ではメタデータというものを介して行う。