news 2026/6/10 16:00:04

easychat项目复盘---聊天部分netty篇

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
easychat项目复盘---聊天部分netty篇

前言:实时聊天需要让客户端与服务端有长链接

如图使用netty可以保证服务器和各个客户端保证持续链接实现实时聊天

如下我将展示一个示例代码,根据示例代码逐步讲解netty:
代码:

@Component public class NettyWebSocketStarter implements Runnable { private static final Logger logger = LoggerFactory.getLogger(NettyWebSocketStarter.class); @Resource private AppConfig appConfig; @Resource private HandlerWebSocket handlerWebSocket; /** * boss线程组,用于处理连接 */ private EventLoopGroup bossGroup = new NioEventLoopGroup(1); /** * work线程组,用于处理消息 */ private EventLoopGroup workerGroup = new NioEventLoopGroup(); /** * 资源关闭——在容器销毁时关闭 */ @PreDestroy public void close() { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } @Override public void run() { try { //创建服务端启动助手 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup); serverBootstrap.channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.DEBUG)) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) { ChannelPipeline pipeline = channel.pipeline(); //设置几个重要的处理器 // 对http协议的支持,使用http的编码器,解码器 pipeline.addLast(new HttpServerCodec()); //聚合解码 httpRequest/htppContent/lastHttpContent到fullHttpRequest //保证接收的http请求的完整性 pipeline.addLast(new HttpObjectAggregator(64 * 1024)); //心跳 long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit // readerIdleTime 读超时事件 即测试段一定事件内未接收到被测试段消息 // writerIdleTime 为写超时时间 即测试端一定时间内想被测试端发送消息 //allIdleTime 所有类型的超时时间 pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS)); pipeline.addLast(new HandlerHeartBeat()); //将http协议升级为ws协议,对websocket支持 pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 64 * 1024, true, true, 10000L)); pipeline.addLast(handlerWebSocket); } }); //启动 ChannelFuture channelFuture = serverBootstrap.bind(appConfig.getWsPort()).sync(); logger.info("Netty服务端启动成功,端口:{}", appConfig.getWsPort()); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }

netty有三大内容:事件循环组,handler(处理器),pipeline(管道)

事件循环组的含义就是无限循环去不断的处理io事件,不断循环就达到了实时的目的,因为他永不停歇,只要一发送信息就会处理,另一方就可以接受到信息


boss事件循环组就像老板一样,只负责招投资,见别的老板所以他只负责链接

woker就处理老板接受后的客户,客服的要求,要工人干什么就去干什么,解决其他的所有事情

以下就是woker事件循环组中的情况简略图

pipeline管道中有数个handler,每个handler均为双向联通的链表,pipeline就是信息传递的通道,handler就是具体的处理事情的处理器

所以代码书写先绑定group(事件循环组)与channle(通道)然后创建pipeline在里面添加handler

handler:
HttpServerCodec() new HttpObjectAggregator(64 * 1024)
new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS)
new HandlerHeartBeat()
new WebSocketServerProtocolHandler("/ws", null, true, 64 * 1024, true, true, 10000L)
以上除了handlerHeartBeat(自定义心跳处理) 其他均为常见handler 这里不做过多描写,看者可以ai查看用途 无需记忆只要有印象就可以

心跳处理器:
代码:

public class HandlerHeartBeat extends ChannelDuplexHandler { private static final Logger logger = LoggerFactory.getLogger(HandlerHeartBeat.class); @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent e = (IdleStateEvent) evt; if (e.state() == IdleState.READER_IDLE) { Attribute<String> attribute = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString())); String userId = attribute.get(); logger.info("用户{}没有发送心跳断开连接", userId); ctx.close(); } else if (e.state() == IdleState.WRITER_IDLE) { ctx.writeAndFlush("heart"); } } } }

心跳检测全过程讲解:
new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS) 当事件经过此处理器的时候,若客户端断开此处理器会向下传递IdleState.READER_IDLE事件 new HandlerHeartBeat()处理器检测事件,若断开就书写日志记录心跳断开,并且关闭通道上下文

再次回归到事件循环组,事件循环组会不断循环解决io问题,如果事件循环组没有结束,idlestatehandler会一直接收到消息,一但idlestatehandler接受不到则代表事件循环组已经结束,意味着客户端与服务器断开,则需要一个处理器去关闭通道,所以handlerheartbeat应运而生

HandlerWebSocket讲解:
代码:

@ChannelHandler.Sharable @Component("handlerWebSocket") public class HandlerWebSocket extends SimpleChannelInboundHandler<TextWebSocketFrame> { private static final Logger logger = LoggerFactory.getLogger(HandlerWebSocket.class); @Resource private ChannelContextUtils channelContextUtils; @Resource private RedisComponet redisComponet; /** * 当通道就绪后会调用此方法,通常我们会在这里做一些初始化操作 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // Channel channel = ctx.channel(); logger.info("有新的连接加入。。。"); } /** * 当通道不再活跃时(连接关闭)会调用此方法,我们可以在这里做一些清理工作 * * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { logger.info("有连接已经断开。。。"); channelContextUtils.removeContext(ctx.channel()); } /** * 读就绪事件 当有消息可读时会调用此方法,我们可以在这里读取消息并处理。 * * @param ctx * @param textWebSocketFrame * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception { //接收心跳 Channel channel = ctx.channel(); Attribute<String> attribute = channel.attr(AttributeKey.valueOf(channel.id().toString())); String userId = attribute.get(); redisComponet.saveUserHeartBeat(userId); } //用于处理用户自定义的事件 当有用户事件触发时会调用此方法,例如连接超时,异常等。 @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) { WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt; String url = complete.requestUri(); String token = getToken(url); if (token == null) { ctx.channel().close(); return; } TokenUserInfoDto tokenUserInfoDto = redisComponet.getTokenUserInfoDto(token); if (null == tokenUserInfoDto) { ctx.channel().close(); return; } /** * 用户加入 */ channelContextUtils.addContext(tokenUserInfoDto.getUserId(), ctx.channel()); } } private String getToken(String url) { if (StringTools.isEmpty(url) || url.indexOf("?") == -1) { return null; } String[] queryParams = url.split("\\?"); if (queryParams.length < 2) { return url; } String[] params = queryParams[1].split("="); if (params.length != 2) { return url; } return params[1]; } }

此HandlerWebSocket部分方法与心跳处理有重复之处,可以重构一下,主要内容为userEventTriggered,此方法主要为验证token,token对则加入频道,如果错则向上报错

下图为私聊简略图,用户1发送消息,服务器会将此消息发送给此频道中除了用户1外的所有人,群组消息同理

ChannelContextUtil:
代码:

@Component("channelContextUtils") public class ChannelContextUtils { private static final Logger logger = LoggerFactory.getLogger(ChannelContextUtils.class); @Resource private RedisComponet redisComponet; public static final ConcurrentMap<String, Channel> USER_CONTEXT_MAP = new ConcurrentHashMap(); public static final ConcurrentMap<String, ChannelGroup> GROUP_CONTEXT_MAP = new ConcurrentHashMap(); @Resource private ChatSessionUserMapper<ChatSessionUser, ChatSessionUserQuery> chatSessionUserMapper; @Resource private ChatMessageMapper<ChatMessage, ChatMessageQuery> chatMessageMapper; @Resource private UserInfoMapper<UserInfo, UserInfoQuery> userInfoMapper; @Resource private UserContactMapper<UserContact, UserContactQuery> userContactMapper; @Resource private UserContactApplyMapper<UserContactApply, UserContactApplyQuery> userContactApplyMapper; /** * 加入通道 * * @param userId * @param channel */ public void addContext(String userId, Channel channel) { try { String channelId = channel.id().toString(); AttributeKey attributeKey = null; if (!AttributeKey.exists(channelId)) { attributeKey = AttributeKey.newInstance(channel.id().toString()); } else { attributeKey = AttributeKey.valueOf(channel.id().toString()); } channel.attr(attributeKey).set(userId); List<String> contactList = redisComponet.getUserContactList(userId); for (String groupId : contactList) { if (groupId.startsWith(UserContactTypeEnum.GROUP.getPrefix())) { add2Group(groupId, channel); } } USER_CONTEXT_MAP.put(userId, channel); redisComponet.saveUserHeartBeat(userId); //更新用户最后连接时间 UserInfo updateInfo = new UserInfo(); updateInfo.setLastLoginTime(new Date()); userInfoMapper.updateByUserId(updateInfo, userId); //给用户发送一些消息 //获取用户最后离线时间 UserInfo userInfo = userInfoMapper.selectByUserId(userId); Long sourceLastOffTime = userInfo.getLastOffTime(); //这里避免毫秒时间差,所以减去1秒的时间 //如果时间太久,只取最近三天的消息数 Long lastOffTime = sourceLastOffTime; if (sourceLastOffTime != null && System.currentTimeMillis() - Constants.MILLISECOND_3DAYS_AGO > sourceLastOffTime) { lastOffTime = System.currentTimeMillis() - Constants.MILLISECOND_3DAYS_AGO; } /** * 1、查询会话信息 查询用户所有会话,避免换设备会话不同步 */ ChatSessionUserQuery sessionUserQuery = new ChatSessionUserQuery(); sessionUserQuery.setUserId(userId); sessionUserQuery.setOrderBy("last_receive_time desc"); List<ChatSessionUser> chatSessionList = chatSessionUserMapper.selectList(sessionUserQuery); WsInitData wsInitData = new WsInitData(); wsInitData.setChatSessionList(chatSessionList); /** * 2、查询聊天消息 */ //查询用户的联系人 UserContactQuery contactQuery = new UserContactQuery(); contactQuery.setContactType(UserContactTypeEnum.GROUP.getType()); contactQuery.setUserId(userId); List<UserContact> groupContactList = userContactMapper.selectList(contactQuery); List<String> groupIdList = groupContactList.stream().map(item -> item.getContactId()).collect(Collectors.toList()); //将自己也加进去 groupIdList.add(userId); ChatMessageQuery messageQuery = new ChatMessageQuery(); messageQuery.setContactIdList(groupIdList); messageQuery.setLastReceiveTime(lastOffTime); List<ChatMessage> chatMessageList = chatMessageMapper.selectList(messageQuery); wsInitData.setChatMessageList(chatMessageList); /** * 3、查询好友申请 */ UserContactApplyQuery applyQuery = new UserContactApplyQuery(); applyQuery.setReceiveUserId(userId); applyQuery.setLastApplyTimestamp(sourceLastOffTime); applyQuery.setStatus(UserContactApplyStatusEnum.INIT.getStatus()); Integer applyCount = userContactApplyMapper.selectCount(applyQuery); wsInitData.setApplyCount(applyCount); //发送消息 MessageSendDto messageSendDto = new MessageSendDto(); messageSendDto.setMessageType(MessageTypeEnum.INIT.getType()); messageSendDto.setContactId(userId); messageSendDto.setExtendData(wsInitData); sendMsg(messageSendDto, userId); } catch (Exception e) { logger.error("初始化链接失败", e); } } /** * 删除通道连接异常 * * @param channel */ public void removeContext(Channel channel) { Attribute<String> attribute = channel.attr(AttributeKey.valueOf(channel.id().toString())); String userId = attribute.get(); if (!StringTools.isEmpty(userId)) { USER_CONTEXT_MAP.remove(userId); } redisComponet.removeUserHeartBeat(userId); //更新用户最后断线时间 UserInfo userInfo = new UserInfo(); userInfo.setLastOffTime(System.currentTimeMillis()); userInfoMapper.updateByUserId(userInfo, userId); } public void closeContext(String userId) { if (StringTools.isEmpty(userId)) { return; } redisComponet.cleanUserTokenByUserId(userId); Channel channel = USER_CONTEXT_MAP.get(userId); USER_CONTEXT_MAP.remove(userId); if (channel != null) { channel.close(); } } public void sendMessage(MessageSendDto messageSendDto) { UserContactTypeEnum contactTypeEnum = UserContactTypeEnum.getByPrefix(messageSendDto.getContactId()); switch (contactTypeEnum) { case USER: send2User(messageSendDto); break; case GROUP: sendMsg2Group(messageSendDto); } } /** * 发送消息给用户 */ private void send2User(MessageSendDto messageSendDto) { String contactId = messageSendDto.getContactId(); sendMsg(messageSendDto, contactId); //强制下线 if (MessageTypeEnum.FORCE_OFF_LINE.getType().equals(messageSendDto.getMessageType())) { closeContext(contactId); } } /** * 发送消息到组 */ private void sendMsg2Group(MessageSendDto messageSendDto) { if (messageSendDto.getContactId() == null) { return; } ChannelGroup group = GROUP_CONTEXT_MAP.get(messageSendDto.getContactId()); if (group == null) { return; } group.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(messageSendDto))); //移除群聊 MessageTypeEnum messageTypeEnum = MessageTypeEnum.getByType(messageSendDto.getMessageType()); if (MessageTypeEnum.LEAVE_GROUP == messageTypeEnum || MessageTypeEnum.REMOVE_GROUP == messageTypeEnum) { String userId = (String) messageSendDto.getExtendData(); redisComponet.removeUserContact(userId, messageSendDto.getContactId()); Channel channel = USER_CONTEXT_MAP.get(userId); if (channel == null) { return; } group.remove(channel); } if (MessageTypeEnum.DISSOLUTION_GROUP == messageTypeEnum) { GROUP_CONTEXT_MAP.remove(messageSendDto.getContactId()); group.close(); } } private static void sendMsg(MessageSendDto messageSendDto, String reciveId) { if (reciveId == null) { return; } Channel sendChannel = USER_CONTEXT_MAP.get(reciveId); if (sendChannel == null) { return; } //相当于客户而言,联系人就是发送人,所以这里转换一下再发送,好友打招呼信息发送给自己需要特殊处理 if (MessageTypeEnum.ADD_FRIEND_SELF.getType().equals(messageSendDto.getMessageType())) { UserInfo userInfo = (UserInfo) messageSendDto.getExtendData(); messageSendDto.setMessageType(MessageTypeEnum.ADD_FRIEND.getType()); messageSendDto.setContactId(userInfo.getUserId()); messageSendDto.setContactName(userInfo.getNickName()); messageSendDto.setExtendData(null); } else { messageSendDto.setContactId(messageSendDto.getSendUserId()); messageSendDto.setContactName(messageSendDto.getSendUserNickName()); } sendChannel.writeAndFlush(new TextWebSocketFrame(JsonUtils.convertObj2Json(messageSendDto))); } private void add2Group(String groupId, Channel context) { ChannelGroup group = GROUP_CONTEXT_MAP.get(groupId); if (group == null) { group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); GROUP_CONTEXT_MAP.put(groupId, group); } if (context == null) { return; } group.add(context); } public void addUser2Group(String userId, String groupId) { Channel channel = USER_CONTEXT_MAP.get(userId); add2Group(groupId, channel); } }

这一段代码并不难理解,主要实现频道的加入与退出,以及消息的分发

实现消息分发的方法:

有两种较为熟知的方法,第一种为redis实现,第二种为rabbitmq

这里展示redis实现

代码实现:

@Component("messageHandler") public class MessageHandler<T> { private static final Logger logger = LoggerFactory.getLogger(MessageHandler.class); private static final String MESSAGE_TOPIC = "message.topic"; @Resource private RedissonClient redissonClient; @Resource private ChannelContextUtils channelContextUtils; @PostConstruct public void lisMessage() { RTopic rTopic = redissonClient.getTopic(MESSAGE_TOPIC); rTopic.addListener(MessageSendDto.class, (MessageSendDto, sendDto) -> { logger.info("收到广播消息:{}", JSON.toJSONString(sendDto)); channelContextUtils.sendMessage(sendDto); }); } public void sendMessage(MessageSendDto sendDto) { RTopic rTopic = redissonClient.getTopic(MESSAGE_TOPIC); rTopic.publish(sendDto); } }
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/10 11:23:05

聊聊 C 里的进制转换、移位操作与算术转换

前言 学 C 语言时&#xff0c;总绕不开 “进制怎么转”“移位操作符怎么用”“表达式为啥这么算” 这些问题 —— 它们不算多高深&#xff0c;但都是写代码、调 Bug 的基础。 比如写个简单的位运算&#xff0c;若搞不清二进制和十进制的转换逻辑&#xff0c;很容易算错结果&a…

作者头像 李华
网站建设 2026/6/10 1:51:59

Excalidraw镜像具备弹性伸缩能力,资源利用率更高

Excalidraw 镜像的弹性伸缩实践&#xff1a;如何实现高效资源利用 在现代远程协作日益频繁的背景下&#xff0c;可视化工具早已不再是“锦上添花”&#xff0c;而是团队运转的核心组件。工程师画架构图、产品经理做原型推演、设计师进行流程梳理——这些场景几乎都绕不开一个轻…

作者头像 李华
网站建设 2026/6/9 23:24:42

Excalidraw能否用于航空航天系统设计?高可靠性验证中

Excalidraw 在航空航天系统设计中的应用潜力与边界 在某次小型卫星姿态控制系统的联合评审会上&#xff0c;来自北京的结构工程师拖动着一个手绘风格的矩形框&#xff0c;实时标注“星敏感器安装位置需避开热变形区”&#xff0c;而远在慕尼黑的飞控团队立即在其旁边添加了红色…

作者头像 李华
网站建设 2026/6/8 18:46:09

4、Windows系统文件与网络操作全指南

Windows系统文件与网络操作全指南 在Windows系统中,我们经常需要对各种文件、文件夹进行操作,同时也会涉及到网络连接等相关设置。下面将详细介绍一些常见的操作方法。 1. 访问和操作“我的视频”文件夹 在Windows XP系统中,若要访问“我的视频”文件夹,可以通过以下方式…

作者头像 李华
网站建设 2026/6/10 13:20:50

8、Windows XP 使用指南:窗口、文件管理与媒体播放

Windows XP 使用指南:窗口、文件管理与媒体播放 1. 窗口操作基础 在使用电脑时,窗口操作是基础且常用的技能。当你需要让某个窗口保持打开状态(特别是当它在后台运行打印、计算等进程),但暂时又不会直接使用其功能时,可以将该窗口最小化。而当你在做其他事情的同时,还…

作者头像 李华
网站建设 2026/6/9 22:59:23

Excalidraw新增最近编辑者标记,协作责任明确

Excalidraw 新增最近编辑者标记&#xff0c;协作责任明确 在远程协作日益成为常态的今天&#xff0c;一个看似微小的设计改动&#xff0c;往往能带来巨大的效率提升。比如&#xff1a;你正在和团队共同绘制一张复杂的系统架构图&#xff0c;突然发现某个关键模块的位置被移动了…

作者头像 李华