龙空技术网

netty客户端与多个远程服务器连接

lehoon 448

前言:

而今我们对“java与服务器连接”都比较关心,看官们都想要知道一些“java与服务器连接”的相关知识。那么小编在网摘上收集了一些有关“java与服务器连接””的相关内容,希望我们能喜欢,小伙伴们快快来学习一下吧!

java使用netty实现客户端与多个远程服务器建立连接并接收数据及发送数据。

基于netty-4.1.58开发。

代码部分FoFParseClientFactory连接工厂代码

/** * <p>Title: </p> * <p>Description: </p> * <p>Copyright: CopyRight (c) 2020-2035</p> * <p>Company: lehoon Co. LTD.</p> * <p>Author: lehoon</p> * <p>Date: 2021/12/3 14:25</p> */public final class FoFParseClientFactory {    private static Bootstrap bootstrap = new Bootstrap();    private static EventLoopGroup group = new NioEventLoopGroup();    private final static FoFParseClientFactory instance = new FoFParseClientFactory();    private FoFParseClientFactory() {        bootstrap.group(group)                .channel(NioSocketChannel.class)                .option(ChannelOption.SO_TIMEOUT, 60)                .option(ChannelOption.TCP_NODELAY, true);    }    public static FoFParseClientFactory getInstance() {        return instance;    }    public void newEmailParseTask(String host, int port, String userId, IMessageListener messageListener) throws Exception {        if (!TcpHelper.isRemoteAlive(host, port)) {            throw new Exception("邮件解析服务器未启动, 连接失败.");        }        try {            bootstrap.handler(new FoFNewNioChannelInitializer(userId, messageListener));            ChannelFuture channelFuture = bootstrap.connect(host, port);            channelFuture.sync();        } catch (Exception e) {            throw new Exception("与邮件服务器连接失败.");        }    }}

FoFNewNioChannelInitializer连接通道初始化代码

/** * <p>Title: </p> * <p>Description: </p> * <p>Copyright: CopyRight (c) 2020-2035</p> * <p>Company: lehoon Co. LTD.</p> * <p>Author: lehoon</p> * <p>Date: 2021/12/3 14:35</p> */public class FoFNewNioChannelInitializer extends ChannelInitializer<NioSocketChannel> {    private String userId;    private IMessageListener messageListener;    public FoFNewNioChannelInitializer() {    }    public FoFNewNioChannelInitializer(String userId) {        this.userId = userId;    }    public FoFNewNioChannelInitializer(String userId, IMessageListener messageListener) {        this.userId = userId;        this.messageListener = messageListener;    }    @Override    protected void initChannel(NioSocketChannel ch) throws Exception {        ch.pipeline().addLast(new LineEncoder(LineSeparator.WINDOWS));        ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));        ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));        ch.pipeline().addLast(new FoFParseResponseHandler(userId, messageListener));    }}
FoFParseResponseHandler数据包解析代码
/** * <p>Title: </p> * <p>Description: </p> * <p>Copyright: CopyRight (c) 2020-2035</p> * <p>Company: lehoon Co. LTD.</p> * <p>Author: lehoon</p> * <p>Date: 2021/12/3 14:47</p> */public class FoFParseResponseHandler extends ChannelInboundHandlerAdapter {    private String userId;    private IMessageListener messageListener;    public FoFParseResponseHandler() {    }    public FoFParseResponseHandler(String userId) {        this.userId = userId;    }    public FoFParseResponseHandler(String userId, IMessageListener messageListener) {        this.userId = userId;        this.messageListener = messageListener;    }    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        if (messageListener != null) messageListener.onConnect(userId);        if (StringUtils.isBlank(userId)) {            ctx.close();        }        String message = MessageHelper.emailParseRequest(userId);        ByteBuf byteBuf = Unpooled.copiedBuffer(message.getBytes());        ctx.writeAndFlush(byteBuf);        if (messageListener != null) messageListener.onWrite(message.getBytes(), message.getBytes().length);    }    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        super.channelRead(ctx, msg);        EmailParseResponse response = MessageHelper.emailParseResponse((String) msg);        if (response == null) {            return;        }        if (messageListener == null) return;        if(messageListener.onRead(userId, response) == MessageCallBackAction.SHUTDOWN) {            ctx.close();        }    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        super.exceptionCaught(ctx, cause);    }    @Override    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {        super.handlerRemoved(ctx);        if (messageListener != null) {            messageListener.onClose(userId);        }    }    public void setMessageListener(IMessageListener messageListener) {        this.messageListener = messageListener;    }}
客户端使用代码
public BusinessResult<String> startParseTask() {    String userId = getUserId();    if (StringUtils.isBlank(userId)) {        log.error("获取当前登陆用户失败, 启动邮件解析任务失败.");        return new BusinessResult<String>().fail(BusinessCode.AUTH_USER_NOT_EXIST.getCode(), "当前用户不存在, 发送邮件解析指令失败!");    }    if (StringUtils.isBlank(remoteHost) || remotePort <= 0 || remotePort > 65535) {        log.error("当前没有配置邮件解析服务器远程地址.");        return new BusinessResult<String>().fail(BusinessCode.SERVICE_UNAVAILABLE.getCode(), "当前没有配置邮件解析服务器远程地址!");    }    try {        FoFParseClientFactory.getInstance().newEmailParseTask(remoteHost, remotePort, userId, new IMessageListener() {            @Override            public void onConnect(String userId) {                log.info("与邮件服务器建立连接成功.");            }            @Override            public MessageCallBackAction onRead(String userId, EmailParseResponse response) {                log.info("接收到邮件解析数据:{}, {}", userId, response);                if (response.isDone()) {                    return MessageCallBackAction.SHUTDOWN;                }                return MessageCallBackAction.CONTINUE;            }            @Override            public void onWrite(byte[] data, int length) {                log.info("发送数据到邮件解析服务器成功, {}, {}", length, new String(data));            }            @Override            public void onClose(String userId) {                log.info("与邮件服务器断开连接.");            }        });        //开始标志        log.info("发送邮件解析任务成功.");        return new BusinessResult<String>().success();    } catch (Exception e) {        return new BusinessResult<String>().fail(BusinessCode.SYSTEM_EXCEPTION.getCode(), e.getMessage());    }}

标签: #java与服务器连接