接着上期讨论的gRPC unary服务我们跟着介绍gRPC streaming,包括: Server-Streaming, Client-Streaming及Bidirectional-Streaming。我们首先在.proto文件里用IDL描述Server-Streaming服务:
/* * responding stream of increment results */ service SumOneToMany { rpc AddOneToMany(SumRequest) returns (stream SumResponse) {} } message SumRequest { int32 toAdd = 1; } message SumResponse { int32 currentResult = 1; }
SumOneToMany服务中AddOneToMany函数接受一个SumRequest然后返回stream SumResponse,就这么简单。经过编译后产生了SumOneToManyGrpc.scala文件,在这个文件里提供了有关RPC操作的api。我们看看protoc把IDL描述的服务函数变成了什么样的scala函数:
def addOneToMany(request: SumRequest, responseObserver: StreamObserver[SumResponse]): Unit
调用scala函数addOneToMany需要传入参数SumRequest和StreamObserver[SumResponse],也就是说用户需要准备这两个入参数。在调用addOneToMany函数时用户事先构建这个StreamObserver传给server,由server把结果通过这个结构传回用户。gRPC是通过StreamObserver类型实例来实现数据streaming的。这个类型的构建例子如下:
val responseObserver = new StreamObserver[SumResponse] { def onError(t: Throwable): Unit = println(s"ON_ERROR: $t") def onCompleted(): Unit = println("ON_COMPLETED") def onNext(value: SumResponse): Unit = println(s"ON_NEXT: Current sum: ${value.currentResult}") }
server端通过onNext把结果不断传回给client端,因为这个responseObserver是在client端构建的。下面是SumManyToMany的实现:
class SumOne2ManyService extends SumOneToManyGrpc.SumOneToMany { override def addOneToMany(request: SumRequest, responseObserver: StreamObserver[SumResponse]): Unit = { val currentSum: AtomicInt = Atomic(0) (1 to request.toAdd).map { _ => responseObserver.onNext(SumResponse().withCurrentResult(currentSum.incrementAndGet())) } Thread.sleep(1000) //delay and then finish
responseObserver.onCompleted() } }
这个addOneToMany服务函数把 1-request.toAdd之间的数字逐个通过responseObserver返还调用方。 在客户端如下调用服务:
// get asyn stub
val client: SumOneToManyGrpc.SumOneToManyStub = SumOneToManyGrpc.stub(channel) // prepare stream observer
val streamObserver = new StreamObserver[SumResponse] { override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def onCompleted(): Unit = println("Done incrementing !!!") override def onNext(value: SumResponse): Unit = println(s"current value: ${value.currentResult}") } // call service with stream observer
client.addOneToMany(SumRequest().withToAdd(6),streamObserver)
Client-Streaming服务的IDL如下:
/* * responding a result from a request of stream of numbers */ service SumManyToOne { rpc AddManyToOne(stream SumRequest ) returns (SumResponse) {} }
传入stream SumRequest, 返回SumResponse。scalaPB自动产生scala代码中的addManyToOne函数款式如下:
def addManyToOne(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest]
调用方提供StreamObserver[SumResponse]用作返回结果,函数返回客方需要的StreamObserver[SumRequest]用以传递request流。注意:虽然在.proto文件中AddManyToOne的返回结果是单个SumResponse,但产生的scala函数则提供了一个StreamObserver[SumResponse]类型,所以需要谨记只能调用一次onNext。下面是这个服务的实现代码:
class Many2OneService extends SumManyToOneGrpc.SumManyToOne { val currentSum: AtomicInt = Atomic(0) override def addManyToOne(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] =
new StreamObserver[SumRequest] { val currentSum: AtomicInt = Atomic(0) override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}") override def o