龙空技术网

用SpringBoot集成Netty开发一个基于WebSocket的聊天室

Javaspring架构师 113

前言:

目前我们对“keepalivenetty”大致比较关切,我们都想要分析一些“keepalivenetty”的相关知识。那么小编也在网上网罗了一些有关“keepalivenetty””的相关资讯,希望小伙伴们能喜欢,姐妹们一起来了解一下吧!

前言

基于SpringBoot,借助Netty控制长链接,使用WebSocket协议做一个实时的聊天室。

项目效果

项目统一登录路径:

用户名随机生成,离线调用异步方法,数据写操作,登录显示历史聊天消息

GitHub

项目名:InChat

项目地址:...

项目介绍:基于Netty4与SpringBoot,聊天室WebSocket(文字图片)加API调用Netty长链接执行发送消息(在线数、用户列表)、Iot物联网-MQTT协议、TCP/IP协议单片机通信,异步存储聊天数据

代码实操讲解

随机命名工具类

publicclassRandomNameUtil{privatestaticRandom ran =newRandom();privatefinalstaticintdelta =0x9fa5-0x4e00+1;publicstaticchargetName(){return(char)(0x4e00+ ran.nextInt(delta)); }}

配置文件yml

spring:datasource:driver-class-name:com.mysql.jdbc.Driverusername:rootpassword:rooturl:jdbc:mysql://localhost:3306/nettychat?characterEncoding=utf-8&useSSL=falsejpa:show-sql:truenetty:port:8090#监听端口bossThread:2#线程数workerThread:2#线程数keepalive:true#保持连接backlog:100

数据库准备

SETFOREIGN_KEY_CHECKS=0;-- ------------------------------ Table structure for user_msg-- ----------------------------DROPTABLEIFEXISTS`user_msg`;CREATETABLE`user_msg`(`id`int(11)NOTNULLAUTO_INCREMENT,`name`varchar(255)DEFAULTNULL,`msg`varchar(255)DEFAULTNULL,`create_time`timestampNOTNULLDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP,`update_time`timestampNOTNULLDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP, PRIMARYKEY(`id`))ENGINE=InnoDBAUTO_INCREMENT=19DEFAULTCHARSET=utf8mb4;-- ------------------------------ Records of user_msg-- ----------------------------INSERTINTO`user_msg`VALUES('1','亪','今天不开心','2018-08-14 14:26:02','2018-08-14 14:26:02');INSERTINTO`user_msg`VALUES('2','祐','不错呀','2018-08-14 15:09:40','2018-08-14 15:09:40');INSERTINTO`user_msg`VALUES('3','搈','开心 开心','2018-08-14 15:09:40','2018-08-14 15:09:40');INSERTINTO`user_msg`VALUES('4','兇','可以的,后面再做个深入一点的','2018-08-14 15:18:35','2018-08-14 15:18:35');INSERTINTO`user_msg`VALUES('5','倎','开源这个项目','2018-08-14 15:18:35','2018-08-14 15:18:35');INSERTINTO`user_msg`VALUES('6','蝡','1-someting','2018-08-14 15:24:28','2018-08-14 15:24:28');INSERTINTO`user_msg`VALUES('7','弔','不行呀','2018-08-14 15:24:29','2018-08-14 15:24:29');INSERTINTO`user_msg`VALUES('8','習','可以的','2018-08-14 15:26:03','2018-08-14 15:26:03');INSERTINTO`user_msg`VALUES('9','蔫','开源这个项目','2018-08-14 15:26:03','2018-08-14 15:26:03');

dataObject与JPA数据DAO

@Data@Entity@DynamicUpdatepublicclassUserMsgimplementsSerializable{privatestaticfinallongserialVersionUID =4133316147283239759L;@Id@GeneratedValue(strategy = GenerationType.IDENTITY)privateInteger id;privateString name;privateString msg;privateDate createTime;privateDate updateTime;}

publicinterfaceUserMsgRepositoryextendsJpaRepository{//本次未使用到自定义方法,JPA原生即可}

NoSQL模拟环境

我没有去配置虚拟机环境,就本地模拟了

保存用户名称与链接随机ID

@ComponentpublicclassLikeRedisTemplate{privateMap RedisMap =newConcurrentHashMap<>();publicvoidsave(Object id,Object name){ RedisMap.put(id,name); }publicvoiddelete(Object id){ RedisMap.remove(id); }publicObjectget(Object id){returnRedisMap.get(id); }}

聊天内容临时存储

@ComponentpublicclassLikeSomeCacheTemplate{privateSet SomeCache =newLinkedHashSet<>();publicvoidsave(Object user,Object msg){ UserMsg userMsg =newUserMsg(); userMsg.setName(String.valueOf(user)); userMsg.setMsg(String.valueOf(msg)); SomeCache.add(userMsg); }publicSetcloneCacheMap(){returnSomeCache; }publicvoidclearCacheMap(){ SomeCache.clear(); }}

异步任务处理

@ComponentpublicclassMsgAsyncTesk{@AutowiredprivateLikeSomeCacheTemplate cacheTemplate;@AutowiredprivateUserMsgRepository userMsgRepository;@AsyncpublicFuturesaveChatMsgTask()throwsException{// System.out.println("启动异步任务");Set set = cacheTemplate.cloneCacheMap();for(UserMsg item:set){//保存用户消息userMsgRepository.save(item); }//清空临时缓存cacheTemplate.clearCacheMap();returnnewAsyncResult<>(true); }}

netty核心

配置类

@Data@Component@ConfigurationProperties(prefix ="netty")publicclassNettyAccountConfig{privateintport;privateintbossThread;privateintworkerThread;privatebooleankeepalive;privateintbacklog;}

核心消息处理类

@Component@Qualifier("textWebSocketFrameHandler")@ChannelHandler.SharablepublicclassTextWebSocketFrameHandlerextendsSimpleChannelInboundHandler{publicstaticChannelGroup channels =newDefaultChannelGroup(GlobalEventExecutor.INSTANCE);@AutowiredprivateLikeRedisTemplate redisTemplate;@AutowiredprivateLikeSomeCacheTemplate cacheTemplate;@AutowiredprivateMsgAsyncTesk msgAsyncTesk;@OverrideprotectedvoidchannelRead0(ChannelHandlerContext ctx,

TextWebSocketFrame msg)throwsException{ Channel incoming = ctx.channel(); String uName = String.valueOf(redisTemplate.get(incoming.id()));for(Channel channel : channels) {//将当前每个聊天内容进行存储System.out.println("存储数据:"+uName+"-"+msg.text()); cacheTemplate.save(uName,msg.text());if(channel != incoming){ channel.writeAndFlush(newTextWebSocketFrame("["+ uName +"]"+ msg.text())); }else{ channel.writeAndFlush(newTextWebSocketFrame("[you]"+ msg.text() )); } } }@OverridepublicvoidhandlerAdded(ChannelHandlerContext ctx)throwsException{ System.out.println(ctx.channel().remoteAddress()); String uName = String.valueOf(RandomNameUtil.getName());//用来获取一个随机的用户名,可以用其他方式代替//新用户接入Channel incoming = ctx.channel();for(Channel channel : channels) { channel.writeAndFlush(newTextWebSocketFrame("[新用户] - "+ uName +" 加入")); } redisTemplate.save(incoming.id(),uName);//存储用户channels.add(ctx.channel()); }@OverridepublicvoidhandlerRemoved(ChannelHandlerContext ctx)throwsException{ Channel incoming = ctx.channel(); String uName = String.valueOf(redisTemplate.get(incoming.id()));//用户离开for(Channel channel : channels) { channel.writeAndFlush(newTextWebSocketFrame("[用户] - "+ uName +" 离开")); } redisTemplate.delete(incoming.id());//删除用户channels.remove(ctx.channel()); }@OverridepublicvoidchannelActive(ChannelHandlerContext ctx)throwsException{ Channel incoming = ctx.channel(); System.out.println("用户:"+redisTemplate.get(incoming.id())+"在线"); }@OverridepublicvoidchannelInactive(ChannelHandlerContext ctx)throwsException{ Channel incoming = ctx.channel(); System.out.println("用户:"+redisTemplate.get(incoming.id())+"掉线"); msgAsyncTesk.saveChatMsgTask(); }@OverridepublicvoidexceptionCaught(ChannelHandlerContext ctx, Throwable cause)throwsException{ Channel incoming = ctx.channel(); System.out.println("用户:"+ redisTemplate.get(incoming.id()) +"异常"); cause.printStackTrace(); ctx.close(); }}

定义Initializer

@Component@Qualifier("somethingChannelInitializer")publicclassNettyWebSocketChannelInitializerextendsChannelInitializer{@AutowiredprivateTextWebSocketFrameHandler textWebSocketFrameHandler;@OverridepublicvoidinitChannel(SocketChannel ch)throwsException{ ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(newHttpServerCodec()); pipeline.addLast(newHttpObjectAggregator(65536)); pipeline.addLast(newChunkedWriteHandler()); pipeline.addLast(newWebSocketServerProtocolHandler("/ws")); pipeline.addLast(textWebSocketFrameHandler);//这里不能使用new,不然在handler中不能注入依赖}}

启动创建Netty基本组件

@ComponentpublicclassNettyConfig{@AutowiredprivateNettyAccountConfig nettyAccountConfig;@Bean(name ="bossGroup", destroyMethod ="shutdownGracefully")publicNioEventLoopGroupbossGroup(){returnnewNioEventLoopGroup(nettyAccountConfig.getBossThread()); }@Bean(name ="workerGroup", destroyMethod ="shutdownGracefully")publicNioEventLoopGroupworkerGroup(){returnnewNioEventLoopGroup(nettyAccountConfig.getWorkerThread()); }@Bean(name ="tcpSocketAddress")publicInetSocketAddresstcpPost(){returnnewInetSocketAddress(nettyAccountConfig.getPort()); }@Bean(name ="tcpChannelOptions")publicMap, Object> tcpChannelOptions(){ Map, Object> options =newHashMap, Object>(); options.put(ChannelOption.SO_KEEPALIVE, nettyAccountConfig.isKeepalive()); options.put(ChannelOption.SO_BACKLOG, nettyAccountConfig.getBacklog());returnoptions; }@Autowired@Qualifier("somethingChannelInitializer")privateNettyWebSocketChannelInitializer nettyWebSocketChannelInitializer;@Bean(name ="serverBootstrap")publicServerBootstrapbootstrap(){ ServerBootstrap b =newServerBootstrap(); b.group(bossGroup(), workerGroup()) .channel(NioServerSocketChannel.class) .handler(newLoggingHandler(LogLevel.DEBUG)) .childHandler(nettyWebSocketChannelInitializer); Map, Object> tcpChannelOptions = tcpChannelOptions(); Set> keySet = tcpChannelOptions.keySet();for(@SuppressWarnings("rawtypes") ChannelOption option : keySet) { b.option(option, tcpChannelOptions.get(option)); }returnb; }}

服务启动协助类

@Data@ComponentpublicclassTCPServer{@Autowired@Qualifier("serverBootstrap")privateServerBootstrap serverBootstrap;@Autowired@Qualifier("tcpSocketAddress")privateInetSocketAddress tcpPort;privateChannel serverChannel;publicvoidstart()throwsException{ serverChannel = serverBootstrap.bind(tcpPort).sync().channel().closeFuture().sync().channel(); }@PreDestroypublicvoidstop()throwsException{ serverChannel.close(); serverChannel.parent().close(); }}

项目启动

@SpringBootApplication@EnableScheduling//启动异步任务publicclassNettychatApplication{publicstaticvoidmain(String[] args)throwsException{ ConfigurableApplicationContext context = SpringApplication.run(NettychatApplication.class, args);//注入NettyConfig 获取对应BeanNettyConfig nettyConfig = context.getBean(NettyConfig.class);//注入TCPServer 获取对应BeanTCPServer tcpServer = context.getBean(TCPServer.class);//启动websocket的服务tcpServer.start(); }}

GitHub

项目名:InChat

项目地址:...

项目介绍:基于Netty4与SpringBoot,聊天室WebSocket(文字图片)加API调用Netty长链接执行发送消息(在线数、用户列表)、Iot物联网-MQTT协议、TCP/IP协议单片机通信,异步存储聊天数据

写在最后:

码字不易看到最后了,那就点个关注呗,只收藏

不点关注的都是在耍流氓!

关注并私信我“架构”,免费送一些Java架构资

料,先到先得!

标签: #keepalivenetty #netty如何实现聊天室