*grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
clientStream, err := streamer(ctx, desc, cc, method, opts...)
if err != nil {
return nil, err
}
return newWrappedStream(clientStream), err
}
return grpc.WithStreamInterceptor(clientInterceptor)
}
func unaryRpc(conn *grpc.ClientConn) {
client := helloservice.NewHelloServiceClient(conn)
reply, err := client.Hello(context.Background(), &helloservice.String{Value: "hello"})
if err != nil {
log.Fatal(err)
}
log.Println("unaryRpc recv: ", reply.Value)
}
func streamRpc(conn *grpc.ClientConn) {
client := helloservice.NewHelloServiceClient(conn)
stream, err := client.Channel(context.Background())
if err != nil {
log.Fatal(err)
}
go func() {
for {
if err := stream.Send(&helloservice.String{Value: "hi"}); err != nil {
log.Fatal(err)
}
time.Sleep(time.Second)
}
}()
for {
recv, err := stream.Recv()
if err != nil {
if err == io.EOF {
break
}
log.Fatal(err)
}
fmt.Println("streamRpc recv: ", recv.Value)
}
}
可以结合我之前的文章,把本期的代码加进去运行调试
(搭建简单grpc服务可以参考这篇文章:https://waterflow.link/articles/1665674508275)
运行效果如下:
go run helloclient/main.go
invoker request time duration: 1
2022/10/14 23:17:35 unaryRpc recv: hello:hello
2022/10/14 23:17:35 ====== [Client Stream Interceptor] Receive a message (Type: *helloservice.String) at 2022-10-14T23:17:35+08:00
2022/10/14 23:17:35 ====== [Client Stream Interceptor] Send a message (Type: *helloservice.String) at 2022-10-14T23:17:35+08:00
2022/10/14 23:17:36 ====== [Client Stream Interceptor] Send a message (Type: *helloservice.String) at 2022-10-14T23:17:36+08:00
streamRpc recv: hello:hi
2022/10/14 23:17:36 ====== [Client Stream Interceptor] Receive a message (Type: *helloservice.String) at 2022-10-14T23:17:36+08:00
2022/10/14 23:17:37 ====== [Client Stream Interceptor] Send a message (Type: *helloservice.String) at 2022-10-14T23:17:37+08:00
streamRpc recv: hello:hi
2022/10/14 23:17:37 ====== [Client Stream Interceptor] Receive a message (Type: *helloservice.String) at 2022-10-14T23:17:37+08:00
2022/10/14 23:17:38 ====== [Client Stream Interceptor] Send a message (Type: *helloservice.String) at 2022-10-14T23:17:38+08:00
streamRpc recv: hello:hi
2022/10/14 23:17:38 ====== [Client Stream Interceptor] Receive a message (Type: *helloservice.String) at 2022-10-14T23:17:38+08:00
2、在 gRPC 客户端中编写拦截器
和 gRPC 客户端应用程序一样,gRPC 服务器应用程序提供两种类型的拦截器:
- UnaryServerInterceptor:提供了一个钩子来拦截服务器上一元 RPC 的执行。
- StreamServerInterceptor:提供了一个钩子来拦截服务器上流式 RPC 的执行。
1、UnaryServerInterceptor
为了创建 UnaryServerInterceptor,可以通过提供 UnaryServerInterceptor 函数值作为参数调用 UnaryInterceptor 函数,该参数返回为服务器设置 UnaryServerInterceptor 的 grpc.ServerOption 值。
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
if o.unaryInt != nil {
panic("The unary server interceptor was already set and may not be reset.")
}
o.unaryInt = i
})
}
然后使用返回的 grpc.ServerOption 值作为参数提供给 grpc.NewServer 函数以注册为 UnaryServerInterceptor。
UnaryServerInterceptor 函数的定义如下:
func(ctx context.Context, req interface{}, info *grpc.UnaryServer