目录

一、问题引出

二、架构图

三、实现方式


一、问题引出

在IM分布式系统的构建中遇到的问题:

Netty服务器通过客户端的连接信息来生成对应的Channel(可以理解为长连接的用户信息),Netty服务器通过Channel来进行消息转发。于是,提出初始构想:通过Redis来序列化Channel,再通过Netty服务器去获取Redis上的Channel,最后转发。但这个构思是错误的,因为Channel是硬件的连接信息,并不能被序列化。

最终构思解决Channel共享的方案有两个:

(1)GateWay网关来自定义负载均衡,当接收到Websocket消息时直接根据用户id进行路由,该方式完美兼容原始功能,原始功能采用Netty来开发Websocket,实现难度低,开发成本低。

(2)采用Netty高性能框架开发Websocket,通过MQ消息队列进行广播来实现Channel的共享,实现难度不大,开发成本较高。

二、架构图

最终,我选择第二种解决方案,IM系统架构如下:

三、实现方式

首先,我先搭建一个支持简单聊天的Netty-Websocket聊天服务器,之后,我先构建一个消息聊天对象如下:

package com.dragonwu.im.domain.dto;import com.dragonwu.common.basic.constant.Constants;import com.dragonwu.common.security.basic.domain.emums.LoginType;import com.dragonwu.im.domain.enums.FromUserType;import lombok.Getter;import lombok.Setter;import lombok.ToString;import org.apache.commons.lang.StringUtils;import org.springframework.data.annotation.CreatedDate;import org.springframework.data.annotation.Id;import org.springframework.data.mongodb.core.index.Indexed;import org.springframework.data.mongodb.core.mapping.Document;import java.io.Serializable;import java.time.LocalDateTime;import java.util.List;import java.util.Objects;/** * @author Dragon Wu * @since 2023/2/27 13:05 * 消息对象 */@Getter@Setter@ToString@Document("im_message") //集合名// {"msg":"你的消息","loginType":"你的类型","userId":"你的id","to":"接收者","group":"群接受对象","isCustomerService":"是否为客服","isVisitor":"是否为游客",// "isConnect":"是否为连接信息"}public class IMessage implements Serializable {@Id //存入mongo里的idprivate String id;//登录类型private String loginType;//用户id@Indexedprivate String userId;//发送时间private LocalDateTime sendTime;//发送人idprivate String to;//群发列表private List group;//发送者是否为客服private Boolean isCustomerService;//发送者是否为游客private Boolean isVisitor;//发送的消息private String msg;//是否为第一次连接信号private Boolean isConnect;@CreatedDate //创建时默认创建该时间字段private LocalDateTime createTime;/* 判断消息格式是否正确 */public boolean isMsgOK() {if (Objects.isNull(isVisitor)) {isVisitor = false;}if (Objects.isNull(to)) {to = Constants.EMPTY_STR;}if (Objects.isNull(isConnect)) {isConnect = false;}try {if ((!StringUtils.isEmpty(userId)) && (!StringUtils.isEmpty(msg))) {return ((!StringUtils.isEmpty(to)) || ((group != null) && (!group.isEmpty())) || isVisitor);}} catch (NullPointerException ignored) {}return false;}//获取发送者的类型public FromUserType getFromUserType() {LoginType exists = LoginType.isExists(loginType);if (Objects.isNull(isVisitor)) {isVisitor = false;}if (Objects.isNull(isCustomerService)) {isCustomerService = false;}if (Objects.isNull(exists) && isVisitor) {return FromUserType.VISITOR_TYPE;} else if ((exists == LoginType.USER_TYPE) && isCustomerService) {return FromUserType.CUSTOMER_SERVICE;} else if (exists == LoginType.USER_TYPE) {return FromUserType.USER_TYPE;} else if (exists == LoginType.CUSTOMER_TYPE) {return FromUserType.CUSTOMER_TYPE;}return null;}public void setNowAsSendTime() {sendTime = LocalDateTime.now();}public boolean isGroupChat() {if (group != null) {return StringUtils.isEmpty(to) && (!group.isEmpty()) && (!isVisitor);}return false;}}

有了这样的对象以后,我便可对发送过来的消息进行序列化与反序列化获取数据,通过消息对象中的数据是否正确与是否认证来决定消息的转发。

每个用户第一次发送isConnect型号时将其注册到Redis中,Key为用户名唯一,Value为ChannelId的asShort值。当用户在不同Netty服务器上时(此时发送与接收者都在线),我会先让服务器去Redis获取对应用户名的ChannelId,先在本地服务器中查找,若查询到该ChannelId的Channel则直接转发,否则为不在同一个Netty服务器上,发送Channel寻找的信号到MQ进行广播,其他服务器获取到广播后查询直接是否有该ChannelId的Channel,若有则转发;离线消息的话,直接以Zset的形式加入Redis即可,当用户上线时再拉取数据。最后,无论哪种情况发送的消息,都会被MQ进行集群负载均衡来存储到数据库中。

以上,为个人本次实践的总结,希望对遇到相似问题的开发者,有所帮助,有问题可联系共同探讨!