龙空技术网

java版gRPC实战之四:客户端流

程序员欣宸 150

前言:

如今我们对“java客户端和服务端”大致比较注意,你们都想要学习一些“java客户端和服务端”的相关知识。那么小编也在网摘上网罗了一些对于“java客户端和服务端””的相关知识,希望同学们能喜欢,我们一起来学习一下吧!

欢迎访问我的GitHub

本篇概览本文是《java版gRPC实战》系列的第四篇,前文掌握了服务端流,适合从服务端获取大量数据的场景,今天的目标是掌握客户端流类型的服务,包括服务提供方和使用方两侧的开发;先来看看官方资料对客户端流式RPC的介绍:客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦客户端完成写入消息,它等待服务器完成读取返回它的响应;本文由以下几部分组成:提前小结几个重要的知识点,稍后开发过程中要重点关注这几个地方;在proto文件中定义客户端流类型的gRPC接口,再通过proto生成java代码;开发服务端应用;开发客户端应用;验证;提前小结

为了突出重点,这里将几个关键的知识点提前给出:

客户端流的特点,是请求方以流的形式提交数据到响应方;一次RPC请求中,请求方可以通过流的方式源源不断地提交数据,直到调用了StreamObserver的onCompleted方法,才算提交数据完成;平时咱们调用方法时,方法内部用到的数据是通过入参传进来的,但这里不一样,客户端要传给服务端的数据和gRPC方法的入参没有关系,而是和方法的返回对象有关(执行返回对象的onNext方法可以将数据传给服务端);客户端在A线程上传完数据后,服务端的响应是在另一个线程B执行的,因此,如果A线程拿到服务端响应,就要B线程的异步响应方法执行完毕,等待的方法有多种,我用的是CountDownLatch;在服务端,开发者要编写的代码和以往web开发不同,不是将数据处理好返回,而是返回一个StreamObserver实例给上层框架,由框架负责处理的逻辑,开发者专注开发StreamObserver的实现即可,例如重写onNext方法,客户端通过流每上传一笔数据,onNext方法都会被外层框架执行一次;如果您用的是IDEA,记得勾选下图红框中的选框,否则运行应用的时候可能遇到lombok相关的问题:上面提到的这些,会在接下来的开发过程中充分体现出来;源码下载本篇实战中的完整源码可在GitHub下载到,地址和链接信息如下表所示():

名称

链接

备注

项目主页

该项目在GitHub上的主页

git仓库地址(https)

该项目源码的仓库地址,https协议

git仓库地址(ssh)

git@github.com:zq2599/blog_demos.git

该项目源码的仓库地址,ssh协议

这个git项目中有多个文件夹,《java版gRPC实战》系列的源码在grpc-tutorials文件夹下,如下图红框所示:grpc-tutorials文件夹下有多个目录,本篇文章对应的服务端代码在client-stream-server-side目录下,客户端代码在client-stream-client-side目录下,如下图:在proto文件中定义客户端流类型的gRPC接口首先要做的就是定义gRPC接口,打开mall.proto,在里面新增方法和相关的数据结构,需要重点关注的是AddToCart方法的入参ProductOrder前面添加了stream修饰,代表该方法是客户端流类型:

// gRPC服务,这是个在线商城的购物车服务service CartService {    // 客户端流式:添加多个商品到购物车    rpc AddToCart (stream ProductOrder) returns (AddCartReply) {}}// 提交购物车时的产品信息message ProductOrder {    // 商品ID    int32 productId = 1;    // 商品数量    int32 number = 2;}// 提交购物车返回结果的数据结构message AddCartReply {    // 返回码    int32 code = 1;    // 描述信息    string message = 2;}
双击下图红框中的task即可生成java代码:生成下图红框中的文件:接下来开发服务端;开发服务端应用在父工程grpc-turtorials下面新建名为client-stream-server-side的模块,其build.gradle内容如下:
// 使用springboot插件plugins {    id 'org.springframework.boot'}dependencies {    implementation 'org.projectlombok:lombok'    implementation 'org.springframework.boot:spring-boot-starter'    // 作为gRPC服务提供方,需要用到此库    implementation 'net.devh:grpc-server-spring-boot-starter'    // 依赖自动生成源码的工程    implementation project(':grpc-lib')    // annotationProcessor不会传递,使用了lombok生成代码的模块,需要自己声明annotationProcessor    annotationProcessor 'org.projectlombok:lombok'}
配置文件application.yml:
spring:  application:    name: client-stream-server-side# gRPC有关的配置,这里只需要配置服务端口号grpc:  server:    port: 9900
启动类ClientStreamServerSideApplication.java的代码就不贴了,普通的springboot启动类而已;重点是提供grpc服务的GrpcServerService.java,请结合前面小结的第五点来阅读代码,咱们要做的就是给上层框架返回一个匿名类,至于里面的onNext、onCompleted方法何时被调用是上层框架决定的,另外还准备了成员变量totalCount,这样就可以记录总数了:
package com.bolingcavalry.grpctutorials;import com.bolingcavalry.grpctutorials.lib.AddCartReply;import com.bolingcavalry.grpctutorials.lib.CartServiceGrpc;import com.bolingcavalry.grpctutorials.lib.ProductOrder;import io.grpc.stub.StreamObserver;import lombok.extern.slf4j.Slf4j;import net.devh.boot.grpc.server.service.GrpcService;@GrpcService@Slf4jpublic class GrpcServerService extends CartServiceGrpc.CartServiceImplBase {    @Override    public StreamObserver<ProductOrder> addToCart(StreamObserver<AddCartReply> responseObserver) {        // 返回匿名类,给上层框架使用        return new StreamObserver<ProductOrder>() {            // 记录处理产品的总量            private int totalCount = 0;            @Override            public void onNext(ProductOrder value) {                log.info("正在处理商品[{}],数量为[{}]",                        value.getProductId(),                        value.getNumber());                // 增加总量                totalCount += value.getNumber();            }            @Override            public void onError(Throwable t) {                log.error("添加购物车异常", t);            }            @Override            public void onCompleted() {                log.info("添加购物车完成,共计[{}]件商品", totalCount);                responseObserver.onNext(AddCartReply.newBuilder()                                                    .setCode(10000)                                                    .setMessage(String.format("添加购物车完成,共计[%d]件商品", totalCount))                                                    .build());                responseObserver.onCompleted();            }        };    }}
开发客户端应用在父工程grpc-turtorials下面新建名为client-stream-server-side的模块,其build.gradle内容如下:
plugins {    id 'org.springframework.boot'}dependencies {    implementation 'org.projectlombok:lombok'    implementation 'org.springframework.boot:spring-boot-starter'    implementation 'org.springframework.boot:spring-boot-starter-web'    implementation 'net.devh:grpc-client-spring-boot-starter'    implementation project(':grpc-lib')}
配置文件application.yml,设置自己的web端口号和服务端地址:
server:  port: 8082spring:  application:    name: client-stream-client-sidegrpc:  client:    # gRPC配置的名字,GrpcClient注解会用到    client-stream-server-side:      # gRPC服务端地址      address: 'static://127.0.0.1:9900'      enableKeepAlive: true      keepAliveWithoutCalls: true      negotiationType: plaintext
启动类ClientStreamClientSideApplication.java的代码就不贴了,普通的springboot启动类而已;正常情况下我们都是用StreamObserver处理服务端响应,这里由于是异步响应,需要额外的方法从StreamObserver中取出业务数据,于是定一个新接口,继承自StreamObserver,新增getExtra方法可以返回String对象,详细的用法稍后会看到:
package com.bolingcavalry.grpctutorials;import io.grpc.stub.StreamObserver;public interface ExtendResponseObserver<T> extends StreamObserver<T> {    String getExtra();}
重头戏来了,看看如何远程调用客户端流类型的gRPC接口,前面小结提到的2、3、4点都会涉及到,代码中已经添加详细注释:
package com.bolingcavalry.grpctutorials;import com.bolingcavalry.grpctutorials.lib.AddCartReply;import com.bolingcavalry.grpctutorials.lib.CartServiceGrpc;import com.bolingcavalry.grpctutorials.lib.ProductOrder;import io.grpc.stub.StreamObserver;import lombok.Getter;import lombok.extern.slf4j.Slf4j;import net.devh.boot.grpc.client.inject.GrpcClient;import org.springframework.stereotype.Service;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;@Service@Slf4jpublic class GrpcClientService {    @GrpcClient("client-stream-server-side")    private CartServiceGrpc.CartServiceStub cartServiceStub;    public String addToCart(int count) {                CountDownLatch countDownLatch = new CountDownLatch(1);                // responseObserver的onNext和onCompleted会在另一个线程中被执行,        // ExtendResponseObserver继承自StreamObserver        ExtendResponseObserver<AddCartReply> responseObserver = new ExtendResponseObserver<AddCartReply>() {            String extraStr;            @Override            public String getExtra() {                return extraStr;            }            private int code;            private String message;            @Override            public void onNext(AddCartReply value) {                log.info("on next");                code = value.getCode();                message = value.getMessage();            }            @Override            public void onError(Throwable t) {                log.error("gRPC request error", t);                extraStr = "gRPC error, " + t.getMessage();                countDownLatch.countDown();            }            @Override            public void onCompleted() {                log.info("on complete");                extraStr = String.format("返回码[%d],返回信息:%s" , code, message);                countDownLatch.countDown();            }        };                // 远程调用,此时数据还没有给到服务端        StreamObserver<ProductOrder> requestObserver = cartServiceStub.addToCart(responseObserver);                for(int i=0; i<count; i++) {            // 发送一笔数据到服务端            requestObserver.onNext(build(101 + i, 1 + i));        }        // 客户端告诉服务端:数据已经发完了        requestObserver.onCompleted();        try {            // 开始等待,如果服务端处理完成,那么responseObserver的onCompleted方法会在另一个线程被执行,            // 那里会执行countDownLatch的countDown方法,一但countDown被执行,下面的await就执行完毕了,            // await的超时时间设置为2秒            countDownLatch.await(2, TimeUnit.SECONDS);        } catch (InterruptedException e) {            log.error("countDownLatch await error", e);        }        log.info("service finish");        // 服务端返回的内容被放置在requestObserver中,从getExtra方法可以取得        return responseObserver.getExtra();    }    /**     * 创建ProductOrder对象     * @param productId     * @param num     * @return     */    private static ProductOrder build(int productId, int num) {        return ProductOrder.newBuilder().setProductId(productId).setNumber(num).build();    }}
最后做个web接口,可以通过web请求验证远程调用:
package com.bolingcavalry.grpctutorials;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;import java.util.List;@RestControllerpublic class GrpcClientController {    @Autowired    private GrpcClientService grpcClientService;    @RequestMapping("/")    public String printMessage(@RequestParam(defaultValue = "1") int count) {        return grpcClientService.addToCart(count);    }}
编码完成,开始验证;验证启动服务端ClientStreamServerSideApplication:启动客户端ClientStreamClientSideApplication:浏览器输入,响应如下,可见远程调用gRPC服务成功:下面是服务端日志,可见逐一处理了客户端的每一笔数据:下面是客户端日志,可见由于CountDownLatch的作用,发起gRPC请求的线程一直等待responseObserver.onCompleted在另一个线程被执行完后,才会继续执行:至此,客户端流类型的gRPC服务及其客户端开发就完成了,这种异步操作与咱们平时开发同步类型的web接口还是有差别的,希望本文能给您带来一些参考,下一篇咱们实战最后一种类型:双向流式;欢迎关注我的公众号:程序员欣宸

标签: #java客户端和服务端