龙空技术网

Netty 使用 Google 的 ProtoBuf 来传输数据

一个程序员小哥哥 537

前言:

现时姐妹们对“proto生成java文件”可能比较关怀,小伙伴们都需要学习一些“proto生成java文件”的相关文章。那么小编同时在网络上网罗了一些关于“proto生成java文件””的相关内容,希望小伙伴们能喜欢,大家快快来了解一下吧!

使用 Google 的 ProtoBuf 来传输数据

ProtoBuf 是 google 的一个文件传输的协议。与平台语言无关。

编写 proto 文件 (用来生成 Java 文件 pojo 的)

syntax = "proto3"; // 表示协议的版本option java_outer_classname = "StudentPojo"; // 类名同时也是文件名字// ProtoBuf 是以 message 来管理数据的message Student {// 会在 java_outer_classname 的类中生成的内部类,他是真正的 传输的 pojo 对象  int32 id = 1; //  int32 => proto 类型,对应 Java 的 int 类型。(Student 内有一个属性 id,类型为 int32, 1 代表属性序号,并不是值)  string name = 2;}

生成 Java 的实体类 protoc.exe --java_out=. Student.proto,执行这个命令以后就会生成一个指定的 Java 文件。然后把这个文件 copy 到自己的项目的工作路径。

使用 Netty 来实现 ProtoBuf 的数据传输

引入 maven 的依赖

<!-- protoBuf --><dependency>    <groupId>com.google.protobuf</groupId>    <artifactId>protobuf-java</artifactId>    <version>3.21.5</version></dependency>
添加 ProtoBuf 处理器到 server 和 client
pipeline.addLast(new ProtobufEncoder()); // ProtoBuf 的编码器pipeline.addLast(new ProtobufDecoder(StudentPojo.Student.getDefaultInstance())); // ProtoBuf 的解码器
发送消息进行通讯
StudentPojo.Student student = StudentPojo.Student.newBuilder().setId(4).setName("孙悟空").build();log.info("发送的数据 => {}", student);ctx.writeAndFlush(student);

这样就是 netty 使用 ProtoBuf 的关键代码。

完整代码服务端

package com.netty.codec;import com.utils.LoggerUtils;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.protobuf.ProtobufDecoder;import io.netty.handler.codec.protobuf.ProtobufEncoder;import org.slf4j.Logger;public class GoogleProtobufCodecServer {    /**     * 初始化服务     */    public void init() throws InterruptedException {        EventLoopGroup boosGroup = new NioEventLoopGroup();        EventLoopGroup workGroup = new NioEventLoopGroup();        try {            ServerBootstrap serverBootstrap = new ServerBootstrap();            ChannelFuture channelFuture = serverBootstrap                    .group(boosGroup, workGroup)                    .channel(NioServerSocketChannel.class)                    .childHandler(new ChannelInitializer<SocketChannel>() {                        @Override                        protected void initChannel(SocketChannel ch) {                            // 添加处理器                            ChannelPipeline pipeline = ch.pipeline();                            // ProtoBuf 编解码器                            pipeline.addLast(new ProtobufEncoder());                            pipeline.addLast(new ProtobufDecoder(StudentPojo.Student.getDefaultInstance()));                            // 自定义处理器                            pipeline.addLast(new ProtoBufHandler());                        }                    })                    // 绑定端口                    .bind(6666).sync();            channelFuture.channel().closeFuture().sync();        } finally {            boosGroup.shutdownGracefully();            workGroup.shutdownGracefully();        }    }    /**     * 自定义处理器     *     * @author L     */    private static class ProtoBufHandler extends SimpleChannelInboundHandler<StudentPojo.Student> {        Logger log = LoggerUtils.getLogger(ProtoBufHandler.class);        /**         * 通道初始化完成以后         */        @Override        public void channelActive(ChannelHandlerContext ctx) {            StudentPojo.Student student = StudentPojo.Student.newBuilder().setId(4).setName("孙悟空").build();            log.info("发送的数据 => {}", student);            ctx.writeAndFlush(student);        }        /**         * 接收到消息以后         *         * @param ctx the {@link ChannelHandlerContext} which this {@link SimpleChannelInboundHandler}         *            belongs to         * @param msg the message to handle         */        @Override        protected void channelRead0(ChannelHandlerContext ctx, StudentPojo.Student msg) {            log.info("客户端发送的数据 => id={},name={}", msg.getId(), msg.getId());        }    }    /**     * 代码允许     */    public static void main(String[] args) throws InterruptedException {        new GoogleProtobufCodecServer().init();    }}
客户端
package com.netty.codec;import io.netty.bootstrap.Bootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.protobuf.ProtobufDecoder;import io.netty.handler.codec.protobuf.ProtobufEncoder;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class GoogleProtobufCodecClient {    Logger log = LoggerFactory.getLogger(GoogleProtobufCodecClient.class);    public void init() throws InterruptedException {        EventLoopGroup clientGroup = new NioEventLoopGroup();        try {            // 创建一个 bootstrap 而不是 serverBootstrap            Bootstrap bootstrap = new Bootstrap();            // 设置相关管参数            bootstrap                    // 设置线程组                    .group(clientGroup)                    // 设置客户端通道的实现类(反射)                    .channel(NioSocketChannel.class)                    // 设置处理器                    .handler(new ChannelInitializer<NioSocketChannel>() {                        @Override                        protected void initChannel(NioSocketChannel nioSocketChannel) {                            // 添加自己的 handler 处理器                            ChannelPipeline pipeline = nioSocketChannel.pipeline();                            pipeline.addLast(new ProtobufEncoder());                            pipeline.addLast(new ProtobufDecoder(StudentPojo.Student.getDefaultInstance()));                            pipeline.addLast(new ChannelInboundHandlerAdapter() {                                @Override                                public void channelRead(ChannelHandlerContext ctx, Object msg) {                                    log.info("服务端消息 => {}", msg);                                }                            });                        }                    });            log.info("客户端准备 OK");            // 启动客户端去链接服务端,涉及到 netty 的异步模型            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();            // 给通道关闭进行监听            channelFuture.channel().closeFuture().sync();        } finally {            clientGroup.shutdownGracefully();        }    }    public static void main(String[] args) throws InterruptedException {        new GoogleProtobufCodecClient().init();    }}

标签: #proto生成java文件