package main
func main() {
res := hello("hsaki")
fmt.Println(res)
}
func hello(name string) string {
return fmt.Sprintf("Hello, %s!", name)
}
「main 関数の中から hello という Procedure(手続き)を Call(呼び出し)している」と表現できる。
この例であげた Procedure Call は「main 関数と呼び出し Procedure である hello 関数が、同じローカル上にある」パターンです。
これに対して Remote Procedure Call は、呼び出し元(=main 関数)と呼び出される Procedure(=hello 関数)が別の場所・別のサーバー上にあるパターンのことを指している。
RPC のやり方でサービス間通信を行うために、様々なプロコトルの中の一つ、Google が開発・提案した RPC のプロトコルが gRPC。
gRPC では、HTTP/2 の POST リクエストとそのレスポンスを使っている。
POST /path/to/hello
Request Body
200 OK
Response Body
リクエストボディ、レスポンスボディはそのままプレーンテキストで記述されるのではなく、Protocol Buffers というシリアライズ方式を用いてバイナリに変換したもの。
// protoのバージョンの宣言
syntax = "proto3";
// protoファイルから自動生成させるGoのコードの置き先
// (詳細は4章にて)
option go_package = "pkg/grpc";
// packageの宣言
package myapp;
// サービスの定義
service GreetingService {
// サービスが持つメソッドの定義
rpc Hello (HelloRequest) returns (HelloResponse);
}
// 型の定義
message HelloRequest {
string name = 1;
}
message HelloResponse {
string message = 1;
}
proto ファイルを記述する Protocol Buffer Language には、proto2
とproto3
の2種類のバージョンがある。
proto ファイルでは「他の proto ファイルで定義された型を使って記述する」ということもできるようになっており、その際に「パッケージ名.型名」という形で他の proto ファイル内の型を参照することになる。
Protocol Buffers に組み込みで用意されている型以外にも、Google が定義してパッケージとして公開した便利な型の集合「Well Known Types」がある。
https://developers.google.com/protocol-buffers/docs/reference/google.protobuf
時刻を表すTimestamp
型や、引数・戻り値なしを表現するためのEmpty
型のような便利な型がgoogle.protobuf
というパッケージに多数定義されている。
// Timestamp型を使ってMyMessage型を定義した例
// Timestamp型を記述しているprotoファイルをimport
import "google/protobuf/timestamp.proto";
message MyMessage {
string message = 1;
// パッケージ名"google.protobuf" + 型名"Timestamp"で記述
google.protobuf.Timestamp create_time = 2;
}
$ brew install protobuf
proto ファイルからコードを自動生成させるために、protoc
コマンドを使用。
Go のパッケージをインストール。
google.golang.org/grpc
: Go で gRPC を扱うためのパッケージgoogle.golang.org/grpc/cmd/protoc-gen-go-grpc
: protoc コマンドが Go のコードを生成するのに利用./src
├─ api
│ └─ hello.proto # protoファイル
├─ pkg
│ └─ grpc # ここにコードを自動生成させる
├─ go.mod
└─ go.sum
$ cd api
$ protoc --go_out=../pkg/grpc --go_out=paths=source_relative \
--go-grpc_out=../pkg/grpc --go-grpc_opt=paths=source_relative \
hello.proto
以下のファイルが生成される。
hello.pb.go
:proto ファイルから自動生成されたリクエスト/レスポンス型を定義した部分のコードhello_grpc.pb.go
:proto ファイルから自動生成されたサービス部分のコードproto ファイルの記述がどのような Go のコードに変換されるのかについて
package main
import (
// (一部抜粋)
+ hellopb "mygrpc/pkg/grpc"
)
func main() {
// 1. 8080番portのLisnterを作成
port := 8080
listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
panic(err)
}
// 2. gRPCサーバーを作成
s := grpc.NewServer()
// 3. gRPCサーバーにGreetingServiceを登録
hellopb.RegisterGreetingServiceServer(s, NewMyServer())
// 4. 作成したgRPCサーバーを、8080番ポートで稼働させる
go func() {
log.Printf("start gRPC server port: %v", port)
s.Serve(listener)
}()
// 5.Ctrl+Cが入力されたらGraceful shutdownされるようにする
quit := make(chan os.Signal, 1)
signal.Notify(quit, os.Interrupt)
<-quit
log.Println("stopping gRPC server...")
s.GracefulStop()
}
// 自作サービス構造体のコンストラクタを定義
func NewMyServer() *myServer {
return &myServer{}
}
func (s *myServer) Hello(ctx context.Context, req *hellopb.HelloRequest) (*hellopb.HelloResponse, error) {
// リクエストからnameフィールドを取り出して
// "Hello, [名前]!"というレスポンスを返す
return &hellopb.HelloResponse{
Message: fmt.Sprintf("Hello, %s!", req.GetName()),
}, nil
}
// RegisterGreetingServiceServer関数の定義
// -> 第二引数はGreetingServiceServerインターフェース型
func RegisterGreetingServiceServer(s grpc.ServiceRegistrar, srv GreetingServiceServer)
// GreetingServiceServerインターフェース型の定義
type GreetingServiceServer interface {
// Helloメソッドを持つ
Hello(context.Context, *HelloRequest) (*HelloResponse, error)
mustEmbedUnimplementedGreetingServiceServer()
}
type myServer struct {
hellopb.UnimplementedGreetingServiceServer
}
curl
コマンドのようにターミナル上で gRPC のリクエストを送ることができる。
$ brew install grpcurl
gRPCurl を使うためには、リクエストを送る gRPC サーバーに「サーバーリフレクション」という設定がなされていることが前提。
import (
// (一部抜粋)
+ "google.golang.org/grpc/reflection"
)
func main() {
// (略)
hellopb.RegisterGreetingServiceServer(s, [サーバーに登録するサービス])
+ // 4. サーバーリフレクションの設定
+ reflection.Register(s)
// (略)
}
シリアライズ・デシリアライズを行うためには、proto ファイルによって書かれた「シリアライズのルール」を知る必要がある。
gRPC クライアントは、サーバーファイルと同じく protoc コマンドから自動生成されたコードを使用して作るため、その「シリアライズのルール」が既に組み込まれているのですが、gRPCurl コマンドは違う。
元から proto ファイルによるメッセージ型の定義を知らない gRPCurl コマンドは、代わりに「gRPC サーバーそのものから、proto ファイルの情報を取得する」ことで「シリアライズのルール」を知り通信する。
そしてその「gRPC サーバーそのものから、proto ファイルの情報を取得する」ための機能がサーバーリフレクション。
$ grpcurl -plaintext localhost:8080 list
grpc.reflection.v1alpha.ServerReflection
myapp.GreetingService
$ grpcurl -plaintext localhost:8080 list myapp.GreetingService
myapp.GreetingService.Hello
$ grpcurl -plaintext -d '{"name": "hsaki"}' localhost:8080 myapp.GreetingService.Hello
{
"message": "Hello, hsaki!"
}
import (
// (一部抜粋)
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
hellopb "mygrpc/pkg/grpc"
)
var (
scanner *bufio.Scanner
client hellopb.GreetingServiceClient
)
func main() {
fmt.Println("start gRPC Client.")
// 1. 標準入力から文字列を受け取るスキャナを用意
scanner = bufio.NewScanner(os.Stdin)
// 2. gRPCサーバーとのコネクションを確立
address := "localhost:8080"
conn, err := grpc.Dial(
address,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
if err != nil {
log.Fatal("Connection failed.")
return
}
defer conn.Close()
// 3. gRPCクライアントを生成
client = hellopb.NewGreetingServiceClient(conn)
for {
fmt.Println("1: send Request")
fmt.Println("2: exit")
fmt.Print("please enter >")
scanner.Scan()
in := scanner.Text()
switch in {
case "1":
Hello()
case "2":
fmt.Println("bye.")
goto M
}
}
}
func Hello() {
fmt.Println("Please enter your name.")
scanner.Scan()
name := scanner.Text()
req := &hellopb.HelloRequest{
Name: name,
}
res, err := client.Hello(context.Background(), req)
if err != nil {
fmt.Println(err)
} else {
fmt.Println(res.GetMessage())
}
}
「1 リクエスト-1 レスポンス」の通信方法。
クライアントから送られた 1 回のリクエストに対して、サーバーからのレスポンスが複数返ってくる通信方式.
例えば、「サーバー側からプッシュ通知を受け取る」場面など。
クライアントから複数回リクエストを送信し、サーバーがそれに対してレスポンスを 1 回返す通信方式。
例えば、クライアント側から複数回に分けてデータをアップロードして、すべて受け取った段階でサーバーが一回だけ OK と返すような用途など。
WebSocket のようなサーバー・クライアントともに任意のタイミングでリクエスト・レスポンスを送ることができる通信方式。
例えば、ping-pong のような 1 リクエスト-1 レスポンスをすべて受け取るまでレスポンスは返さないということも可能。
柔軟なストリーミング通信ができるのは、gRPC が HTTP/2 のプロトコル上で実現されているから。
HTTP/2 では、送受信するデータをフレームという単位に分割して扱っている。
全部で 10 種類のフレームタイプが定義されているが、その中で特によく使われるのが以下の 2 種類。
この 2 つのフレームを使って、以下のようにクライアントとサーバーが複数回に分けてデータを送信する。
このように、HTTP/2 では 1 つの送受信データを複数個のフレームに分割してやり取りする。
そのため、最後のフレームであることを、どこかのタイミングで知らせてあげる必要がある。
gRPC のストリーミングでは、送信する最後のフレームのフラグフィールドに END_STREAM
フラグをつけることで、もう送るデータがないことを相手に知らせる。
service GreetingService {
// サービスが持つメソッドの定義
rpc Hello (HelloRequest) returns (HelloResponse);
+ // サーバーストリーミングRPC
+ rpc HelloServerStream (HelloRequest) returns (stream HelloResponse);
}
type GreetingServiceServer interface {
// サービスが持つメソッドの定義
Hello(context.Context, *HelloRequest) (*HelloResponse, error)
+ // サーバーストリーミングRPC
+ HelloServerStream(*HelloRequest, GreetingService_HelloServerStreamServer) error
mustEmbedUnimplementedGreetingServiceServer()
}
// 自動生成された、サーバーストリーミングのためのインターフェース(for サーバー)
+type GreetingService_HelloServerStreamServer interface {
+ Send(*HelloResponse) error
+ grpc.ServerStream
+}
func (s *myServer) HelloServerStream(req *hellopb.HelloRequest, stream hellopb.GreetingService_HelloServerStreamServer) error {
resCount := 5
for i := 0; i < resCount; i++ {
if err := stream.Send(&hellopb.HelloResponse{
Message: fmt.Sprintf("[%d] Hello, %s!", i, req.GetName()),
}); err != nil {
return err
}
time.Sleep(time.Second * 1)
}
return nil
}
Send
メソッドを何度も実行することで何度もクライアントにレスポンスを返すことが可能。
return
分で nil
or error を返すことでストリームを終わらせることができる。
type GreetingServiceClient interface {
// サービスが持つメソッドの定義
Hello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error)
+ // サーバーストリーミングRPC
+ HelloServerStream(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (GreetingService_HelloServerStreamClient, error)
}
// 自動生成された、サーバーストリーミングのためのインターフェース(for クライアント)
type GreetingService_HelloServerStreamClient interface {
Recv() (*HelloResponse, error)
grpc.ClientStream
}
+func HelloServerStream() {
+ fmt.Println("Please enter your name.")
+ scanner.Scan()
+ name := scanner.Text()
+
+ req := &hellopb.HelloRequest{
+ Name: name,
+ }
+ stream, err := client.HelloServerStream(context.Background(), req)
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+
+ for {
+ res, err := stream.Recv()
+ if errors.Is(err, io.EOF) {
+ fmt.Println("all the responses have already received.")
+ break
+ }
+
+ if err != nil {
+ fmt.Println(err)
+ }
+ fmt.Println(res)
+ }
+}
Recv
メソッドでレスポンスを受け取るとき、これ以上受け取るレスポンスがないという状態なら、第一戻り値は nil
、第二戻り値の err
には io.EOF
が格納される。
var EOF = errors.New("EOF")
service GreetingService {
// サービスが持つメソッドの定義
rpc Hello (HelloRequest) returns (HelloResponse);
// サーバーストリーミングRPC
rpc HelloServerStream (HelloRequest) returns (stream HelloResponse);
+ // クライアントストリーミングRPC
+ rpc HelloClientStream (stream HelloRequest) returns (HelloResponse);
}
type GreetingServiceServer interface {
// サービスが持つメソッドの定義
Hello(context.Context, *HelloRequest) (*HelloResponse, error)
// サーバーストリーミングRPC
HelloServerStream(*HelloRequest, GreetingService_HelloServerStreamServer) error
+ // クライアントストリーミングRPC
+ HelloClientStream(GreetingService_HelloClientStreamServer) error
mustEmbedUnimplementedGreetingServiceServer()
}
// 自動生成された、クライアントストリーミングのためのインターフェース(for サーバー)
type GreetingService_HelloClientStreamServer interface {
SendAndClose(*HelloResponse) error
Recv() (*HelloRequest, error)
grpc.ServerStream
}
func (s *myServer) HelloClientStream(stream hellopb.GreetingService_HelloClientStreamServer) error {
nameList := make([]string, 0)
for {
req, err := stream.Recv()
if errors.Is(err, io.EOF) {
message := fmt.Sprintf("Hello, %v!", nameList)
return stream.SendAndClose(&hellopb.HelloResponse{
Message: message,
})
}
if err != nil {
return err
}
nameList = append(nameList, req.GetName())
}
}
type GreetingServiceClient interface {
// サービスが持つメソッドの定義
Hello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error)
// サーバーストリーミングRPC
HelloServerStream(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (GreetingService_HelloServerStreamClient, error)
+ // クライアントストリーミングRPC
+ HelloClientStream(ctx context.Context, opts ...grpc.CallOption) (GreetingService_HelloClientStreamClient, error)
}
// 自動生成された、クライアントストリーミングのためのインターフェース(for クライアント)
type GreetingService_HelloClientStreamClient interface {
Send(*HelloRequest) error
CloseAndRecv() (*HelloResponse, error)
grpc.ClientStream
}
+func HelloClientStream() {
+ stream, err := client.HelloClientStream(context.Background())
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+
+ sendCount := 5
+ fmt.Printf("Please enter %d names.\n", sendCount)
+ for i := 0; i < sendCount; i++ {
+ scanner.Scan()
+ name := scanner.Text()
+
+ if err := stream.Send(&hellopb.HelloRequest{
+ Name: name,
+ }); err != nil {
+ fmt.Println(err)
+ return
+ }
+ }
+
+ res, err := stream.CloseAndRecv()
+ if err != nil {
+ fmt.Println(err)
+ } else {
+ fmt.Println(res.GetMessage())
+ }
+}
service GreetingService {
// サービスが持つメソッドの定義
rpc Hello (HelloRequest) returns (HelloResponse);
// サーバーストリーミングRPC
rpc HelloServerStream (HelloRequest) returns (stream HelloResponse);
// クライアントストリーミングRPC
rpc HelloClientStream (stream HelloRequest) returns (HelloResponse);
+ // 双方向ストリーミングRPC
+ rpc HelloBiStreams (stream HelloRequest) returns (stream HelloResponse);
}
type GreetingServiceServer interface {
// サービスが持つメソッドの定義
Hello(context.Context, *HelloRequest) (*HelloResponse, error)
// サーバーストリーミングRPC
HelloServerStream(*HelloRequest, GreetingService_HelloServerStreamServer) error
// クライアントストリーミングRPC
HelloClientStream(GreetingService_HelloClientStreamServer) error
+ // 双方向ストリーミングRPC
+ HelloBiStreams(GreetingService_HelloBiStreamsServer) error
mustEmbedUnimplementedGreetingServiceServer()
}
type GreetingService_HelloBiStreamsServer interface {
Send(*HelloResponse) error
Recv() (*HelloRequest, error)
grpc.ServerStream
}
「一つリクエストを受信するごとに、それに対するレスポンスを一つ返す」というロジックを実装。
func (s *myServer) HelloBiStreams(stream hellopb.GreetingService_HelloBiStreamsServer) error {
for {
req, err := stream.Recv()
if errors.Is(err, io.EOF) {
return nil
}
if err != nil {
return err
}
message := fmt.Sprintf("Hello, %v!", req.GetName())
if err := stream.Send(&hellopb.HelloResponse{
Message: message,
}); err != nil {
return err
}
}
}
type GreetingServiceClient interface {
// サービスが持つメソッドの定義
Hello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error)
// サーバーストリーミングRPC
HelloServerStream(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (GreetingService_HelloServerStreamClient, error)
// クライアントストリーミングRPC
HelloClientStream(ctx context.Context, opts ...grpc.CallOption) (GreetingService_HelloClientStreamClient, error)
+ // 双方向ストリーミングRPC
+ HelloBiStreams(ctx context.Context, opts ...grpc.CallOption) (GreetingService_HelloBiStreamsClient, error)
}
type GreetingService_HelloBiStreamsClient interface {
Send(*HelloRequest) error
Recv() (*HelloResponse, error)
grpc.ClientStream
}
+func HelloBiStreams() {
+ stream, err := client.HelloBiStreams(context.Background())
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+
+ sendNum := 5
+ fmt.Printf("Please enter %d names.\n", sendNum)
+
+ var sendEnd, recvEnd bool
+ sendCount := 0
+ for !(sendEnd && recvEnd) {
+ // 送信処理
+ if !sendEnd {
+ scanner.Scan()
+ name := scanner.Text()
+
+ sendCount++
+ if err := stream.Send(&hellopb.HelloRequest{
+ Name: name,
+ }); err != nil {
+ fmt.Println(err)
+ sendEnd = true
+ }
+
+ if sendCount == sendNum {
+ sendEnd = true
+ if err := stream.CloseSend(); err != nil {
+ fmt.Println(err)
+ }
+ }
+ }
+
+ // 受信処理
+ if !recvEnd {
+ if res, err := stream.Recv(); err != nil {
+ if !errors.Is(err, io.EOF) {
+ fmt.Println(err)
+ }
+ recvEnd = true
+ } else {
+ fmt.Println(res.GetMessage())
+ }
+ }
+ }
+}
gRPC の場合は、「メソッドの呼び出しに成功した場合には、中で何が怒ろうとも HTTP レスポンスステータスコードは 200 OK
を返す」ように固定されている。
その代わり、gRPC は「メソッド内の処理が正しく実行されたのか」「エラーが起きたとしたらどのようなエラーなのか」を表現するために独自のステータスコードを用意している。
HTTP/2 上では、レスポンスヘッダ内のフィールドに格納することで伝達している。
status = 200
content-type = application/grpc+proto
grpc-status = 4 # これ
grpc-message = timeout # これ
gRPC が「メソッドを呼び出し、戻り値を受け取る」ことに関心をおいている。それゆえに「gRPC が HTTP/2 の上に実装されている」ということを意識しないくて言いように設計されているから。
そんため、「呼び出されたメソッドが正しく処理を実行したか」を知るために HTTP のステータスコードを見に行くというのは gRPC-like ではない。
https://grpc.io/docs/guides/error/#error-status-codes
func (s *myServer) Hello(ctx context.Context, req *hellopb.HelloRequest) (*hellopb.HelloResponse, error) {
err := status.Error(codes.Unknown, "unknown error occurred")
return nil, err
}
google.golang.org/grpc/status
パッケージには、クライアントメソッドから受け取ったエラーから、gRPC のステータスコード・メッセージを復元するための FromError
関数を持っています。
import (
+ "google.golang.org/grpc/status"
)
func Hello() {
res, err := client.Hello(context.Background(), req)
if err != nil {
- fmt.Println(err)
+ if stat, ok := status.FromError(err); ok {
+ fmt.Printf("code: %s\n", stat.Code())
+ fmt.Printf("message: %s\n", stat.Message())
+ } else {
+ fmt.Println(err)
+ }
} else {
fmt.Println(res.GetMessage())
}
}
details
フィールド発生したエラーの情報をメッセージの一文字列でしか伝えられないのは不便。
スタックトレースのように詳細な情報を付け加える手段として details
フィールドというものも設定することができる.
details
フィールドをステータスに付与するためのメソッドとして、WithDetails
メソッドがある。
func (s *Status) WithDetails(details ...proto.Message) (*Status, error)
func (s *myServer) Hello(ctx context.Context, req *hellopb.HelloRequest) (*hellopb.HelloResponse, error) {
// (何か処理をしてエラーが発生した)
- err := status.Error(codes.Unknown, "unknown error occurred")
+ stat := status.New(codes.Unknown, "unknown error occurred")
+ stat, _ = stat.WithDetails([スタックトレースにするProtobufのメッセージ])
+ err := stat.Err()
return &hellopb.HelloResponse{/*(略)*/}, err
}
WithDetails
メソッドに渡すメッセージ型は、Protobuf 由来の構造体であれば何でも OK 。
gRPC 公式として推奨しているのは、Google が公開している Standard Set of Error Message Types を使うこと。
https://github.com/googleapis/googleapis/blob/master/google/rpc/error_details.proto
message RetryInfo {
// Clients should wait at least this long between retrying the same request.
google.protobuf.Duration retry_delay = 1;
}
message DebugInfo {
// The stack trace entries indicating where the error occurred.
repeated string stack_entries = 1;
// Additional debugging information provided by the server.
string detail = 2;
}
この proto ファイルに定義されている型を Go のコードの中で使うためには、本来ならば protoc コマンドでコードを生成させそれをインポートして使う必要がある。
しかし、例の proto ファイルから自動生成されたコードが、errdetails
パッケージとして既に公開されている。
https://pkg.go.dev/google.golang.org/genproto/googleapis/rpc/errdetails
$ go get -u google.golang.org/genproto/googleapis/rpc/errdetails
import(
// (一部抜粋)
+ "google.golang.org/genproto/googleapis/rpc/errdetails"
)
func (s *myServer) Hello(ctx context.Context, req *hellopb.HelloRequest) (*hellopb.HelloResponse, error) {
// (何か処理をしてエラーが発生した)
stat := status.New(codes.Unknown, "unknown error occurred")
- stat, _ = stat.WithDetails([スタックトレースにするProtobufのメッセージ])
+ stat, _ = stat.WithDetails(&errdetails.DebugInfo{
+ Detail: "detail reason of err",
+ })
err := stat.Err()
return &hellopb.HelloResponse{/*(略)*/}, err
}
import (
// (一部抜粋)
+ _ "google.golang.org/genproto/googleapis/rpc/errdetails"
)
func Hello() {
// (一部抜粋)
res, err := client.Hello(context.Background(), req)
if err != nil {
if stat, ok := status.FromError(err); ok {
fmt.Printf("code: %s\n", stat.Code())
fmt.Printf("message: %s\n", stat.Message())
+ fmt.Printf("details: %s\n", stat.Details())
} else {
fmt.Println(err)
}
} else {
fmt.Println(res.GetMessage())
}
}
gRPC では、ハンドラ処理の前後に追加処理を挟むミドルウェアのことをインターセプタと呼ぶ。
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
ref. https://pkg.go.dev/google.golang.org/grpc#UnaryServerInterceptor
func myUnaryServerInterceptor1(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
log.Println("[pre] my unary server interceptor 1: ", info.FullMethod) // ハンドラの前に割り込ませる前処理
res, err := handler(ctx, req) // 本来の処理
log.Println("[post] my unary server interceptor 1: ", m) // ハンドラの後に割り込ませる後処理
return res, err
}
func main() {
// (一部抜粋)
s := grpc.NewServer(
grpc.UnaryInterceptor(myUnaryServerInterceptor1),
)
}
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
ref. https://pkg.go.dev/google.golang.org/grpc#StreamServerInterceptor
func myStreamServerInterceptor1(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// ストリームがopenされたときに行われる前処理
log.Println("[pre stream] my stream server interceptor 1: ", info.FullMethod)
err := handler(srv, &myServerStreamWrapper1{ss}) // 本来のストリーム処理
// ストリームがcloseされるときに行われる後処理
log.Println("[post stream] my stream server interceptor 1: ")
return err
}
type myServerStreamWrapper1 struct {
grpc.ServerStream
}
func (s *myServerStreamWrapper1) RecvMsg(m interface{}) error {
// ストリームから、リクエストを受信
err := s.ServerStream.RecvMsg(m)
// 受信したリクエストを、ハンドラで処理する前に差し込む前処理
if !errors.Is(err, io.EOF) {
log.Println("[pre message] my stream server interceptor 1: ", m)
}
return err
}
func (s *myServerStreamWrapper1) SendMsg(m interface{}) error {
// ハンドラで作成したレスポンスを、ストリームから返信する直前に差し込む後処理
log.Println("[post message] my stream server interceptor 1: ", m)
return s.ServerStream.SendMsg(m)
}
ストリーミング処理の場合、リクエスト・レスポンスの送受信は以下のようなステップで実行される。
そのため、単純に前処理・後処理といっても「ストリーム open/close ときの処理」なのか「ストリームから実際にデータを送受信するときの処理」なのかという選択肢が生まれる。
func myStreamServerInterceptor1(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// 前処理をここに書く
err := handler(srv, &myServerStreamWrapper1{ss}) // 本来のストリーム処理
// 後処理をここに書く
return err
}
type ServerStream interface {
// (一部抜粋)
RecvMsg(m interface{}) error
SendMsg(m interface{}) error
}
grpc.ServerStream
インターフェース型を満たす独自構造体を作成RecvMsg
・SendMsg
メソッドを、自分がやりたい処理を入れ込む形でオーバーライドfunc main() {
// (一部抜粋)
s := grpc.NewServer(
grpc.StreamInterceptor(myStreamServerInterceptor1),
)
}
func main() {
s := grpc.NewServer(
- grpc.UnaryInterceptor(myUnaryServerInterceptor1),
+ grpc.ChainUnaryInterceptor(
+ myUnaryServerInterceptor1,
+ myUnaryServerInterceptor2,
+ ),
)
}
func main() {
s := grpc.NewServer(
- grpc.StreamInterceptor(myStreamServerInterceptor1),
+ grpc.ChainStreamInterceptor(
+ myStreamServerInterceptor1,
+ myStreamServerInterceptor2,
+ ),
)
}
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
ref. https://pkg.go.dev/google.golang.org/grpc#UnaryClientInterceptor
func myUnaryClientInteceptor1(ctx context.Context, method string, req, res interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
fmt.Println("[pre] my unary client interceptor 1", method, req) // リクエスト送信前に割り込ませる前処理
err := invoker(ctx, method, req, res, cc, opts...) // 本来のリクエスト
fmt.Println("[post] my unary client interceptor 1", res) // リクエスト送信後に割り込ませる後処理
return err
}
func main() {
// (一部抜粋)
conn, err := grpc.Dial(
address,
grpc.WithUnaryInterceptor(myUnaryClientInteceptor1),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
}
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
ref. https://pkg.go.dev/google.golang.org/grpc#StreamClientInterceptor
func myStreamClientInteceptor1(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
// ストリームがopenされる前に行われる前処理
log.Println("[pre] my stream client interceptor 1", method)
// ストリームを生成 -> 返り値として返す
// このストリームを用いて、クライアントは送受信処理を行う
stream, err := streamer(ctx, desc, cc, method, opts...)
return &myClientStreamWrapper1{stream}, err
}
type myClientStreamWrapper1 struct {
grpc.ClientStream
}
func (s *myClientStreamWrapper1) SendMsg(m interface{}) error {
// リクエスト送信前に割り込ませる処理
log.Println("[pre message] my stream client interceptor 1: ", m)
// リクエスト送信
return s.ClientStream.SendMsg(m)
}
func (s *myClientStreamWrapper1) RecvMsg(m interface{}) error {
err := s.ClientStream.RecvMsg(m) // レスポンス受信処理
// レスポンス受信後に割り込ませる処理
if !errors.Is(err, io.EOF) {
log.Println("[post message] my stream client interceptor 1: ", m)
}
return err
}
func (s *myClientStreamWrapper1) CloseSend() error {
err := s.ClientStream.CloseSend() // ストリームをclose
// ストリームがcloseされた後に行われる後処理
log.Println("[post] my stream client interceptor 1")
return err
}
type ClientStream interface {
// (一部抜粋)
SendMsg(m interface{}) error
RecvMsg(m interface{}) error
CloseSend() error
}
func main() {
conn, err := grpc.Dial(
address,
grpc.WithStreamInterceptor(myStreamClientInteceptor1),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
}
func main() {
conn, err := grpc.Dial(
address,
- grpc.WithUnaryInterceptor(myUnaryClientInteceptor1),
+ grpc.WithChainUnaryInterceptor(
+ myUnaryClientInteceptor1,
+ myUnaryClientInteceptor2,
+ ),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
}
func main() {
conn, err := grpc.Dial(
address,
- grpc.WithStreamInterceptor(myStreamClientInteceptor1),
+ grpc.WithChainStreamInterceptor(
+ myStreamClientInteceptor1,
+ myStreamClientInteceptor2,
+ ),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
}
通常の HTTP 通信の場合には、これらの情報をヘッダーフィールドに入れてやりとりしていましたが、gRPC ではメタデータというものを介して行う。