龙空技术网

Netty-模拟Redis的客户端

stonevV9S 59

前言:

此时同学们对“netty判断客户端”可能比较着重,同学们都想要学习一些“netty判断客户端”的相关内容。那么小编同时在网摘上收集了一些有关“netty判断客户端””的相关文章,希望朋友们能喜欢,各位老铁们一起来学习一下吧!

因为redis是部署在服务器上的 我们只需要模拟客户端发送请求即可所以只需要编写客户端的代码就可以了,编写前我们需要知道 redis的请求规范,Redis的通信是需要遵循RESP协议的

例子:

set hello 123*3\r\n $3 \r\n set \r\n hello \r\n 123

建立 channel 通道后 发送命令给服务端 此时是写数据【输出】 在出战handler里增加逻辑 当接受响应后 此时数据需要【输入】 存入栈 handler 中 在入栈handler 里增加逻辑

Netty中 Redis 命令对应的类型

单行错误整形数字批量回复多个批量回复客户端

我们只需要编写客户端。请求搭建redis的服务器接受和传输数据即可就可以了

客户端还是常见的写法

public class RedisClient {    private static final String HOST = "xxxxxxxxx";    private static final int PORT = 6379;    public static void main(String[] args) {        NioEventLoopGroup bossGroup = new NioEventLoopGroup();        Bootstrap bootstrap = new Bootstrap();        bootstrap.group(bossGroup)                .channel(NioSocketChannel.class)                .handler(new ChannelInitializer<SocketChannel>() {                    @Override                    protected void initChannel(SocketChannel socketChannel) throws Exception {                        //  调用netty 提供的支持netty 协议的编解码器                        //    RedisBulkStringAggregator 和 RedisarrayAggregator 是协议中两种特殊格式的聚合器                        ChannelPipeline pipeline = socketChannel.pipeline();                        pipeline.addLast(new RedisDecoder());                        pipeline.addLast(new RedisBulkStringAggregator());                        pipeline.addLast(new RedisArrayAggregator());                        pipeline.addLast(new RedisEncoder());                        // 我们自己的处理器                        pipeline.addLast(new RedisClientHandler());                    }                });        try {            ChannelFuture syncFuture = bootstrap.connect(HOST, PORT).sync();            System.out.println("enter redis commands");            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));            for (; ; ) {                String input = bufferedReader.readLine();                //退出任务                if ("quit".equals(input)) {                    syncFuture.channel().close().sync();                    break;                }                syncFuture.channel().writeAndFlush(input).sync();            }            syncFuture.channel().closeFuture().sync();        } catch (Exception e) {            e.printStackTrace();        } finally {            bossGroup.shutdownGracefully();        }    }}
处理器

因为要读取和写入 所以我们需要集成混合处理器类 : ChannelDuplexHandler

我们需要判断五种请求类型 所以先封装一个类别输出的方法

RedisMessage Netty提供的redis的信息对象,以下五种类型都来自于它

    private void printRedisResponse(RedisMessage msg) {        if (msg instanceof SimpleStringRedisMessage) {            SimpleStringRedisMessage tmpMsg = (SimpleStringRedisMessage) msg;            System.out.println(tmpMsg.content());        } else if (msg instanceof ErrorRedisMessage) {            ErrorRedisMessage tmpMsg = (ErrorRedisMessage) msg;            System.out.println(tmpMsg.content());        } else if (msg instanceof IntegerRedisMessage) {            IntegerRedisMessage tmpMsg = (IntegerRedisMessage) msg;            System.out.println(tmpMsg.value());        } else if (msg instanceof FullBulkStringRedisMessage) {            FullBulkStringRedisMessage tmpMsg = (FullBulkStringRedisMessage) msg;            if (tmpMsg.isNull()) {                return;            }            System.out.println(tmpMsg.content().toString(CharsetUtil.UTF_8));        } else if (msg instanceof ArrayRedisMessage) {            ArrayRedisMessage tmpMsg = (ArrayRedisMessage) msg;            for (RedisMessage child : tmpMsg.children()) {                printRedisResponse(child);            }        }        // 如果都是不是应该抛出异常    }

写命令

    // 写    // 出栈handler 的常用方法    //需要处理redis 命令    @Override    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {        // 键盘传入的字符串 如 : keys *        String commandStr = (String) msg;        // 通过空格来分离出命令        String[] commandArr = commandStr.split("\\s+");        // 接受redis 命令的列表  RedisMessag保存命令的内容        List<RedisMessage> redisMessageList = new ArrayList<>(commandArr.length);        for (String cmdStr : commandArr) {            FullBulkStringRedisMessage message = new FullBulkStringRedisMessage(                    ByteBufUtil.writeUtf8(ctx.alloc(), cmdStr));            redisMessageList.add(message);        }        //将分离的命令 合并成一个完整的命令        RedisMessage request = new ArrayRedisMessage(redisMessageList);        //写入通道        ctx.write(request, promise);    }

读取命令

// 读取@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {    // 此时接受的时候这个值 就是我们存储redismessage 的对象    RedisMessage redisMessage = (RedisMessage) msg;    // 返回的结果是不同类型 分开进行处理    printRedisResponse(redisMessage);    //    内存管理使用了 引用计数的方式 所以我们需要释放资源    ReferenceCountUtil.release(redisMessage);}

标签: #netty判断客户端