文章目录

  • 五、IM开发核心之构建TCP网关(上)
    • 1、编写LimServer
    • 2、编写LimWebSocketServer
    • 3、使用snakeyaml动态配置文件
    • 4、大白话讲通信协议—详解主流通讯协议
      • 4.1、文本协议
      • 4.2、二进制协议
      • 4.3、xml协议
      • 4.4、可以落地使用的协议
    • 5、私有协议编解码—设计篇
    • 6、私有协议编解码—实现篇
      • 6.1、LimServer的编解码器
      • 6.2、LimWebSocketServer的编解码器
  • 六、IM开发核心之建构TCP网关(下)
    • 1、登录消息—保存用户NioSocketChannel
    • 2、分布式缓存中间件—Redisson快速入门操作
    • 3、用户登录网关层—保存用户Session
    • 4、用户退出网关层—离线删除用户Session
    • 5、服务端心跳检测
    • 6、RabbitMQ的安装、发布订阅、路由模式详解
    • 7、TCP接入RabbitMQ、打通和逻辑层的交互
    • 8、分布式TCP服务注册中心的选型
    • 9、TCP服务注册—Zookeeper注册TCP服务
    • 10、服务改造-TCP服务分布式改造
      • 10.1、 **广播模式**
      • 10.2、 **一致性Hash**
      • 10.3、 **构建路由层**
    • 11、即时通讯系统支持多端登录模式—应对多端登录的场景

项目源代码

五、IM开发核心之构建TCP网关(上)

1、编写LimServer

==LimServer ==

public class LimServer {// 日志类private final static Logger logger = LoggerFactory.getLogger(LimServer.class);// 端口号private int port;// 端口号和两个Group的值都是从配置文件中取出来的EventLoopGroup mainGroup;EventLoopGroup subGroup;ServerBootstrap server;public LimServer(Integer port){this.port = port;// 两个GroupmainGroup = new NioEventLoopGroup();subGroup = new NioEventLoopGroup();// serverserver = new ServerBootstrap();server.group(mainGroup, subGroup).channel(NioServerSocketChannel.class)// 服务端可连接队列大小.option(ChannelOption.SO_BACKLOG, 10240)// 参数表示允许重复使用本地地址和端口.option(ChannelOption.SO_REUSEADDR, true)// 是否禁用Nagle算法 简单点说是否批量发送数据 true关闭 false开启。 开启的话可以减少一定的网络开销,但影响消息实时性.childOption(ChannelOption.TCP_NODELAY, true)// 保活开关2h没有数据服务端会发送心跳包.childOption(ChannelOption.SO_KEEPALIVE, true).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {}});}server.bind(port);}

Starter

public class Starter {public static void main(String[] args) throws FileNotFoundException { new LimServer(9000);}}

简单编写完这两部分后,用网络调试助手连接一下本机的9000端口,没有报错就是连接成功了

2、编写LimWebSocketServer

LimWebSocketServer

public class LimWebSocketServer {private final static Logger logger = LoggerFactory.getLogger(LimWebSocketServer.class);int port;EventLoopGroup mainGroup;EventLoopGroup subGroup;ServerBootstrap server;public LimWebSocketServer(int port) {this.port= port;mainGroup = new NioEventLoopGroup();subGroup = new NioEventLoopGroup();server = new ServerBootstrap();server.group(mainGroup, subGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 10240) // 服务端可连接队列大小.option(ChannelOption.SO_REUSEADDR, true) // 参数表示允许重复使用本地地址和端口.childOption(ChannelOption.TCP_NODELAY, true) // 是否禁用Nagle算法 简单点说是否批量发送数据 true关闭 false开启。 开启的话可以减少一定的网络开销,但影响消息实时性.childOption(ChannelOption.SO_KEEPALIVE, true) // 保活开关2h没有数据服务端会发送心跳包.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// websocket 基于http协议,所以要有http编解码器pipeline.addLast("http-codec", new HttpServerCodec());// 对写大数据流的支持pipeline.addLast("http-chunked", new ChunkedWriteHandler());// 几乎在netty中的编程,都会使用到此hanlerpipeline.addLast("aggregator", new HttpObjectAggregator(65535));/** * websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws * 本handler会帮你处理一些繁重的复杂的事 * 会帮你处理握手动作: handshaking(close, ping, pong) ping + pong = 心跳 * 对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同 */pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));}});}server.bind(port);logger.info("web start");}

Starter

public class Starter {public static void main(String[] args) throws FileNotFoundException { new LimServer(9000); new LimWebSocketServer(19000);}}

然后在启动,使用web.html验证

3、使用snakeyaml动态配置文件

<dependency><groupId>org.yaml</groupId><artifactId>snakeyaml</artifactId></dependency>
@Datapublic class BootstrapConfig {private TcpConfig lim;@Datapublic static class TcpConfig{// tcp 绑定的端口号private Integer tcpPort;// webSocket 绑定的端口号private Integer webSocketPort;// boss线程 默认=1private Integer bossThreadSize;//work线程private Integer workThreadSize;// 心跳超时时间单位msprivate Long heartBeatTime;// 登录模式private Integer loginModel;// redis配置文件private RedisConfig redis;/** * rabbitmq配置 */private Rabbitmq rabbitmq;/** * zk配置 */private ZkConfig zkConfig;/** * brokerId */private Integer brokerId;private String logicUrl;}}

将需要的配置文件中的数据做一个实体类,用于后面的接,收然后改造一下

LimServer

public class LimServer {// 日志类private final static Logger logger = LoggerFactory.getLogger(LimServer.class);// 端口号private int port;// 端口号和两个Group的值都是从配置文件中取出来的BootstrapConfig.TcpConfig config;EventLoopGroup mainGroup;EventLoopGroup subGroup;ServerBootstrap server;public LimServer(BootstrapConfig.TcpConfig config){this.config = config;// 两个GroupmainGroup = new NioEventLoopGroup(config.getBossThreadSize());subGroup = new NioEventLoopGroup(config.getWorkThreadSize());// serverserver = new ServerBootstrap();server.group(mainGroup, subGroup).channel(NioServerSocketChannel.class)// 服务端可连接队列大小.option(ChannelOption.SO_BACKLOG, 10240)// 参数表示允许重复使用本地地址和端口.option(ChannelOption.SO_REUSEADDR, true)// 是否禁用Nagle算法 简单点说是否批量发送数据 true关闭 false开启。 开启的话可以减少一定的网络开销,但影响消息实时性.childOption(ChannelOption.TCP_NODELAY, true)// 保活开关2h没有数据服务端会发送心跳包.childOption(ChannelOption.SO_KEEPALIVE, true).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {}});}public void start(){this.server.bind(config.getTcpPort());}}

LimWebSocket

public class LimWebSocketServer {private final static Logger logger = LoggerFactory.getLogger(LimWebSocketServer.class);BootstrapConfig.TcpConfig config;EventLoopGroup mainGroup;EventLoopGroup subGroup;ServerBootstrap server;public LimWebSocketServer(BootstrapConfig.TcpConfig config) {this.config = config;mainGroup = new NioEventLoopGroup();subGroup = new NioEventLoopGroup();server = new ServerBootstrap();server.group(mainGroup, subGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 10240) // 服务端可连接队列大小.option(ChannelOption.SO_REUSEADDR, true) // 参数表示允许重复使用本地地址和端口.childOption(ChannelOption.TCP_NODELAY, true) // 是否禁用Nagle算法 简单点说是否批量发送数据 true关闭 false开启。 开启的话可以减少一定的网络开销,但影响消息实时性.childOption(ChannelOption.SO_KEEPALIVE, true) // 保活开关2h没有数据服务端会发送心跳包.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();// websocket 基于http协议,所以要有http编解码器pipeline.addLast("http-codec", new HttpServerCodec());// 对写大数据流的支持pipeline.addLast("http-chunked", new ChunkedWriteHandler());// 几乎在netty中的编程,都会使用到此hanlerpipeline.addLast("aggregator", new HttpObjectAggregator(65535));/** * websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws * 本handler会帮你处理一些繁重的复杂的事 * 会帮你处理握手动作: handshaking(close, ping, pong) ping + pong = 心跳 * 对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同 */ pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));}});}public void start(){this.server.bind(this.config.getWebSocketPort());}}
lim:tcpPort: 9000webSocketPort: 19000bossThreadSize: 1workThreadSize: 8heartBeatTime: 3000 # 心跳超时时间,单位 msbrokerId: 1000loginModel: 3logicUrl: http://127.0.0.1:8000/v1

这两个将端口号和两个Group的大小都使用了配置文件动态配置

Starter

public class Starter {public static void main(String[] args) throws FileNotFoundException {if(args.length > 0){start(args[0]);}}private static void start(String path) throws FileNotFoundException {try {// 加载yml文件Yaml yaml = new Yaml();InputStream fileInputStream = new FileInputStream(path);// 搞一个实体BootstrapConfig bootstrapConfig = yaml.loadAs(fileInputStream, BootstrapConfig.class);// 启动new LimServer(bootstrapConfig.getLim()).start();new LimWebSocketServer(bootstrapConfig.getLim()).start();} catch (Exception e) {e.printStackTrace();System.exit(500);}}}


这样所以的配置文件,就可以通过修改yaml文件,然后对BootstrapConfig实体类修改,最后在Starter中配置一下即可

4、大白话讲通信协议—详解主流通讯协议

4.1、文本协议

  • 贴近人类书面表达的协议,如http协议
  • 特点:
    • 可读性好,便于调试
    • 扩展性也好(通过key:value扩展)
    • 解析效率一般

4.2、二进制协议

  • 一段的传输内容里面,其中的固定的一位或者几位表示固定的意思(就和我们上面netty中解决半包、黏包用的私有协议差不多),如ip协议
  • 特点:
    • 可读性差,难于调试
    • 扩展性不好(设计的好可以规避)
    • 解析效率高

4.3、xml协议

  • 特点
    • 标准协议,可以跨域互通
    • xml的优点,可读性好,扩展性好
    • 解析代价高
    • 有效数据传输率低(有大量的标签)

4.4、可以落地使用的协议

xmpp协议

  • 优点:基于xml协议,容易理解,使用广泛,易于扩展
  • 缺点:流量大,在移动端很耗电,交互过程复杂

mqtt协议

  • 优点:适配多平台,相比xmpp,数据包更小
  • 缺点:协议简单,公有协议无法自定义一些数据格式

私有协议(基于二进制协议)

  • 优点:随心所欲,定制化较强,流量小
  • 缺点:工作量巨大,扩展性差,需要考虑全面

5、私有协议编解码—设计篇

6、私有协议编解码—实现篇

6.1、LimServer的编解码器

ByteBufToMessageUtils

public class ByteBufToMessageUtils {public static Message transition(ByteBuf in){/** 获取command*/int command = in.readInt();/** 获取version*/int version = in.readInt();/** 获取clientType*/int clientType = in.readInt();/** 获取messageType*/int messageType = in.readInt();/** 获取appId*/int appId = in.readInt();/** 获取imeiLength*/int imeiLength = in.readInt();/** 获取bodyLen*/int bodyLen = in.readInt();if(in.readableBytes() < bodyLen + imeiLength){in.resetReaderIndex();return null;}byte [] imeiData = new byte[imeiLength];in.readBytes(imeiData);String imei = new String(imeiData);byte [] bodyData = new byte[bodyLen];in.readBytes(bodyData);MessageHeader messageHeader = new MessageHeader();messageHeader.setAppId(appId);messageHeader.setClientType(clientType);messageHeader.setCommand(command);messageHeader.setLength(bodyLen);messageHeader.setVersion(version);messageHeader.setMessageType(messageType);messageHeader.setImei(imei);Message message = new Message();message.setMessageHeader(messageHeader);if(messageType == 0x0){String body = new String(bodyData);JSONObject parse = (JSONObject) JSONObject.parse(body);message.setMessagePack(parse);}in.markReaderIndex();return message;}}

MessageDecoder(解码)

public class MessageDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx,ByteBuf in, List<Object> out) throws Exception {//请求头(指令// 版本// clientType// 消息解析类型// appId// imei长度// bodylen)+ imei号 + 请求体if(in.readableBytes() < 28){return;}Message message = ByteBufToMessageUtils.transition(in);if(message == null){return;}out.add(message);}}

MessageEncoder(编码)

public class MessageEncoder extends MessageToByteEncoder {@Overrideprotected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {if(msg instanceof MessagePack){MessagePack msgBody = (MessagePack) msg;String s = JSONObject.toJSONString(msgBody.getData());byte[] bytes = s.getBytes();out.writeInt(msgBody.getCommand());out.writeInt(bytes.length);out.writeBytes(bytes);}}}

接受消息的实体类

@Datapublic class Message {// 请求头private MessageHeader messageHeader;// 请求体private Object messagePack;@Overridepublic String toString() {return "Message{" +"messageHeader=" + messageHeader +", messagePack=" + messagePack +'}';}}
@Datapublic class MessageHeader {//消息操作指令 十六进制 一个消息的开始通常以0x开头//4字节private Integer command;//4字节 版本号private Integer version;//4字节 端类型private Integer clientType;/** * 应用ID *///4字节 appIdprivate Integer appId;/** * 数据解析类型 和具体业务无关,后续根据解析类型解析data数据 0x0:Json,0x1:ProtoBuf,0x2:Xml,默认:0x0 *///4字节 解析类型private Integer messageType = 0x0;//4字节 imel长度private Integer imeiLength;//4字节 包体长度private int length;//imei号private String imei;}
@Datapublic class MessagePack<T> implements Serializable {private String userId;private Integer appId;/** * 接收方 */private String toId;/** * 客户端标识 */private int clientType;/** * 消息ID */private String messageId;/** * 客户端设备唯一标识 */private String imei;private Integer command;/** * 业务数据对象,如果是聊天消息则不需要解析直接透传 */private T data;///** 用户签名*///private String userSign;}

加到这里面

6.2、LimWebSocketServer的编解码器

WebSocketMessageDecoder

public class WebSocketMessageDecoder extends MessageToMessageDecoder<BinaryWebSocketFrame> {@Overrideprotected void decode(ChannelHandlerContext ctx, BinaryWebSocketFrame msg, List<Object> out) throws Exception {ByteBuf content = msg.content();if (content.readableBytes() < 28) {return;}Message message = ByteBufToMessageUtils.transition(content);if(message == null){return;}out.add(message);}}

WebSocketMessageEncoder

public class WebSocketMessageEncoder extends MessageToMessageEncoder<MessagePack> {private static Logger log = LoggerFactory.getLogger(WebSocketMessageEncoder.class);@Overrideprotected void encode(ChannelHandlerContext ctx, MessagePack msg, List<Object> out){try {String s = JSONObject.toJSONString(msg);ByteBuf byteBuf = Unpooled.directBuffer(8+s.length());byte[] bytes = s.getBytes();byteBuf.writeInt(msg.getCommand());byteBuf.writeInt(bytes.length);byteBuf.writeBytes(bytes);out.add(new BinaryWebSocketFrame(byteBuf));}catch (Exception e){e.printStackTrace();}}}

加到这里面

这样一来,我们为LImServer和LImWebSocketServer提供了编解码器,这样我们的客户端只要按照我们的协议发送数据,我们就会拿到争取的数据,我们也可以将信息进行编码,发送给客户端,客户端也要遵守我们的编码规则,也就可以正常的拿到服务端发送给客户端的数据

六、IM开发核心之建构TCP网关(下)

1、登录消息—保存用户NioSocketChannel

这里也就是创建了一个Handler然后通过解析message中的command的命令,对应做出登录的逻辑,通过将每个用户登录进来的channel保存起来,维护每一个channel

2、分布式缓存中间件—Redisson快速入门操作

Redisson操作快速入门

3、用户登录网关层—保存用户Session

先考虑用什么Redis的数据结构,因为这个应用那个会支出多端登录,所以使用HashMap的数据结构,就可以使用一个key来存储多个端的session,这样比String类型更好

这里使用的是Redisson,所以要搞一些配置属性,修改BootStrap和Yaml文件,然后再创建Redis的管理类,最后将配置好的Redis放在Starter中去启动,在Handler中将设置好的UserSession保存到map中去。

4、用户退出网关层—离线删除用户Session

逻辑

  1. 先删除掉Channel
  2. 再删除掉Redis中存储的session

5、服务端心跳检测

和上面那个netty入门时候说的心跳检测差不多,没有读操作或者写操作,或者全操作就会触发userEventTriggered,然后进行一些你规定好的操作,这里我们实现的就是没有操作的每10秒触发一次心跳检测,检测你上次ping的时间和当前时间,如果超过了你规定的超时时间,就认为该用户已经离线了,触发离线逻辑


// 离线public static void offLineUserSession(NioSocketChannel channel){// 删除sessionString userId = (String) channel.attr(AttributeKey.valueOf(Constants.UserId)).get();Integer appId = (Integer) channel.attr(AttributeKey.valueOf(Constants.AppId)).get();Integer clientType = (Integer) channel.attr(AttributeKey.valueOf(Constants.ClientType)).get();String imei = (String) channel.attr(AttributeKey.valueOf(Constants.Imei)).get();SessionScoketHolder.remove(appId, userId, clientType, imei);// 修改redis中的session的ConnectStateRedissonClient redissonClient = RedisManager.getRedissonClient();RMap<String, String> map= redissonClient.getMap(appId + Constants.RedisConstants.UserSessionConstants + userId);// 获取sessionString sessionStr = map.get(clientType.toString() + ":" + imei);if(!StringUtils.isBlank(sessionStr)){// 将session转换为对象UserSession userSession = JSONObject.parseObject(sessionStr, UserSession.class);// 修改连接状态为离线userSession.setConnectState(ImConnectStatusEnum.OFFLINE_STATUS.getCode());// 再写入redis中map.put(clientType.toString() + ":" + imei, JSONObject.toJSONString(userSession));}}

触发离线逻辑,和上面那个登出的区别就是修改Redis中的session状态变成离线,那个是直接删除了

6、RabbitMQ的安装、发布订阅、路由模式详解

安装教程

如果自己有腾讯云、阿里云的虚拟的话,可以直接搞一个docker的RabbitMQ,这样更加方便,教程啥的网上搜一下就好

快速入门

7、TCP接入RabbitMQ、打通和逻辑层的交互

实现一个Mq的工具类

public class MqFactory {// ConnectionFactoryprivate static ConnectionFactory factory = null;// 这里一个存放channel的mapprivate static ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();public static void init(BootstrapConfig.Rabbitmq rabbitmq){// 如果连接为空才进行初始化if(factory == null){factory = new ConnectionFactory();factory.setHost(rabbitmq.getHost());factory.setUsername(rabbitmq.getUserName());factory.setPassword(rabbitmq.getPassword());factory.setPort(rabbitmq.getPort());factory.setVirtualHost(rabbitmq.getVirtualHost());}}// 通过channel名字来获取不同的channelpublic static Channel getChannel(String channelName) throws IOException, TimeoutException {Channel channel = channelMap.get(channelName);if(channel == null){channel = getConnection().createChannel();channelMap.put(channelName, channel);}return channel;}// 获取connectionprivate static Connection getConnection() throws IOException, TimeoutException {Connection connection = factory.newConnection();return connection;}}

创建一个MqReciver类

@Slf4jpublic class MessageReciver {private static String brokerId;public static void startReciverMessage() {try {Channel channel = MqFactory.getChannel(Constants.RabbitConstants.MessageService2Im+ brokerId);// 绑定队列channel.queueDeclare(Constants.RabbitConstants.MessageService2Im + brokerId,true,false, false, null);// 绑定交换机channel.queueBind(Constants.RabbitConstants.MessageService2Im+ brokerId,Constants.RabbitConstants.MessageService2Im,brokerId);channel.basicConsume(Constants.RabbitConstants.MessageService2Im + brokerId, false, new DefaultConsumer(channel){// 获取到rabbitmq中的信息@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try {String msgStr = new String(body);log.info(msgStr);}}});} catch (Exception e) {throw new RuntimeException(e);}}}


这样的话就TCP就可以从RabiitMq中获取到消息了,这样的话,只要逻辑层往对应的交换机中投递消息,TCP就可以收到了,也就是打通了和逻辑层的交互

8、分布式TCP服务注册中心的选型

如果要接入多个服务,单台机器肯定是撑不住这么大的并发的,要考虑分布式,这就不得不考虑服务发现的问题,所以要引入服务注册来解决这个问题

现在的开发,都会把服务做拆分,就会引出网关和逻辑服务直接发现的问题,这里有一种土办法,把服务的ip地址配置在网关中,A服务有这几个ip,B服务有这几个ip,如果采用这种方案,我们在增加或者删除一台服务器的时候,所有的网关服务都要手动的去修改配置,去重启,这样不太长久,不现实

如果是http服务器,我们可以通过反向代理,把请求转发给可用的服务,但是这个方案在即时通讯系统中是行不通的,因为它是有状态服务,和普通的http服务不一样,有状态服务会保留用户活跃的信息,比如我们客户和A服务器建立了连接Channel,用户的信息保存在了A服务器里面,可以互通数据,当我们用B服务器,用户和B服务器之间没有交互数据的,我们拿不到Channel里面的信息

CAP理论

  • 一致性
  • 可用性
  • 分区容错性

主流注册中心

  • Eureka:SpringCloud配套的,但是已经停止维护了,最好不要用
  • Consul:轻量级的注册中心,但是Java领域覆盖面不是很广
  • Kubernetes:配套使用K8S较好
  • Nacos:是首选的注册中心,它既可以支持AP、也可以支持CP
  • Zookeeper:临时节点和watch机制,创建连接会生成节点,节点发生改变会通知,感知力强

本系统采取Zookeeper作为注册中心

9、TCP服务注册—Zookeeper注册TCP服务

安装教程

ZKit

/** * @author li * 直接用来创建Zookeeper目录的 * @data 2023/4/19 * @time 14:37 */public class ZKit {private ZkClient zkClient;public ZKit(ZkClient client){this.zkClient = client;}// im-coreRoot/tcp/ip:portpublic void createRootNode(){boolean exists = zkClient.exists(Constants.ImCoreZkRoot);if(!exists){zkClient.createPersistent(Constants.ImCoreZkRoot);}boolean tcpExists = zkClient.exists(Constants.ImCoreZkRoot +Constants.ImCoreZkRootTcp);if(!tcpExists){zkClient.createPersistent(Constants.ImCoreZkRoot +Constants.ImCoreZkRootTcp);}boolean webExists = zkClient.exists(Constants.ImCoreZkRoot +Constants.ImCoreZkRootWeb);if(!webExists){zkClient.createPersistent(Constants.ImCoreZkRoot +Constants.ImCoreZkRootWeb);}}// ip:portpublic void createNode(String path){if(!zkClient.exists(path)){zkClient.createPersistent(path);}}}

RegistryZk

@Slf4jpublic class RegistryZk implements Runnable{private ZKit zKit;private String ip;private BootstrapConfig.TcpConfig tcpConfig;public RegistryZk(ZKit zKit, String ip, BootstrapConfig.TcpConfig tcpConfig) {this.zKit = zKit;this.ip = ip;this.tcpConfig = tcpConfig;}@Overridepublic void run() {// 注册Zookeeper// 先注册1级目录zKit.createRootNode();// 再注册2级目录String tcpPath = Constants.ImCoreZkRoot + Constants.ImCoreZkRootTcp+ "/" + ip + ":" + this.tcpConfig.getTcpPort();zKit.createNode(tcpPath);log.info("Registry zookeeper tcpPath success, msg=[{}]", tcpPath);String webPath = Constants.ImCoreZkRoot + Constants.ImCoreZkRootWeb+ "/" + ip + ":" + this.tcpConfig.getWebSocketPort();zKit.createNode(webPath);log.info("Registry zookeeper webPath success, msg=[{}]", webPath);}}

Starter

这样在我们启动这个Starter这个服务的时候,连接Zookeeper的客户端就可以查看到你创建的目录了

10、服务改造-TCP服务分布式改造

因为我们要实现的即时通讯系统是有状态的服务,所以我们要考虑的更多

比如每个netty1都维护着对应的用户,netty1维护着u1、u10、u100,netty2维护者u2、u20、u200,当我们u1向u2发送一条消息的时候,netty1中并没有连接netty2的Channel,消息就会丢失,这样目前还是不妥的,所以我们要加以改造

这里提供集中解决方法

10.1、 广播模式


这样实现起来很简单,可以使用RabiitMq,但是容易产生消息风暴,如果要发送100个消息,这样就会变成200个,造成一些浪费、无效的通讯

10.2、 一致性Hash


这种方式的实现方式是,u1这个用户根据一些id等的属性,会在注册的时候去做一个hash运算,直接就给他注册到算好的netty上,比如是u1,根据这个1算出它在netty1上,u2根据2算出它在netty2上,当u1要给u2发消息的时候,就会根据u2计算出u2在哪个netty2中,点对点的给他发送过去,不用发那么多份,但是缺点也很明显,重度的依赖服务的发现的稳定性,要及时的感知到netty2是否存在,netty2下线的时候要及时的通知

10.3、 构建路由层

通过构建路由层,比如说把注册的用户和对应的netty服务ip存储到里面,当u1给u2消息的时候,就会在路由层去寻找,然后发送,可靠性比较高,并且可以用mq解耦,路由层是无状态的可以水平拓展,可以扩展多个,缺点是很复杂,多了一层东西就多了代码,多了一些组件,并且需要独立维护路由层,也会依赖路由层的依赖性和可靠性

11、即时通讯系统支持多端登录模式—应对多端登录的场景


仿腾讯Im即时通讯系统的多端登录模式


做好一些配置的东西

多端登录在有状态的分布式下,推荐使用广播(或者一致性hash)的模式,因为你不知道某个用户到底在几个端登录,这样是最容易的方法了。

UserLoginMessageListener

public class UserLoginMessageListener {private final static Logger logger = LoggerFactory.getLogger(UserLoginMessageListener.class);private Integer loginModel;public UserLoginMessageListener(Integer loginModel){this.loginModel = loginModel;}// 监听用户登录public void listenerUserLogin(){RTopic topic = RedisManager.getRedissonClient().getTopic(Constants.RedisConstants.UserLoginChannel);// 使用Redisson的订阅模式做监听当有用户的某个端登录就会topic.addListener(String.class, new MessageListener<String>() {@Overridepublic void onMessage(CharSequence charSequence, String message) {logger.info("收到用户上线:" + message);UserClientDto userClientDto = JSONObject.parseObject(message, UserClientDto.class);// 获取所有的CHANNELSList<NioSocketChannel> nioSocketChannels= SessionScoketHolder.get(userClientDto.getAppId(), userClientDto.getUserId());for (NioSocketChannel nioSocketChannel : nioSocketChannels) {// 单端登录if(loginModel == DeviceMultiLoginEnum.ONE.getLoginMode()){// 获取clietTypeInteger clientType = (Integer) nioSocketChannel.attr(AttributeKey.valueOf(Constants.ClientType)).get();// 获取imei号String imei = (String)nioSocketChannel.attr(AttributeKey.valueOf(Constants.Imei)).get();if(!(clientType + ":" + imei).equals(userClientDto.getClientType() + ":" + userClientDto.getImei())){// TODO 踢掉客户端// 告诉客户端 其他端登录MessagePack<Object> messagePack = new MessagePack<>();messagePack.setToId((String) nioSocketChannel.attr(AttributeKey.valueOf(Constants.UserId)).get());messagePack.setUserId((String) nioSocketChannel.attr(AttributeKey.valueOf(Constants.UserId)).get());messagePack.setCommand(SystemCommand.MUTUALLOGIN.getCommand());nioSocketChannel.writeAndFlush(messagePack);}}else if(loginModel == DeviceMultiLoginEnum.TWO.getLoginMode()){if(userClientDto.getClientType() == ClientType.WEB.getCode()){continue;}Integer clientType = (Integer) nioSocketChannel.attr(AttributeKey.valueOf(Constants.ClientType)).get();if(clientType == ClientType.WEB.getCode()){continue;}// 获取imei号String imei = (String)nioSocketChannel.attr(AttributeKey.valueOf(Constants.Imei)).get();if(!(clientType + ":" + imei).equals(userClientDto.getClientType() + ":" + userClientDto.getImei())){// TODO 踢掉客户端MessagePack<Object> messagePack = new MessagePack<>();messagePack.setToId((String) nioSocketChannel.attr(AttributeKey.valueOf(Constants.UserId)).get());messagePack.setUserId((String) nioSocketChannel.attr(AttributeKey.valueOf(Constants.UserId)).get());messagePack.setCommand(SystemCommand.MUTUALLOGIN.getCommand());nioSocketChannel.writeAndFlush(messagePack);}}else if(loginModel == DeviceMultiLoginEnum.THREE.getLoginMode()){Integer clientType = (Integer) nioSocketChannel.attr(AttributeKey.valueOf(Constants.ClientType)).get();String imei = (String)nioSocketChannel.attr(AttributeKey.valueOf(Constants.Imei)).get();if(clientType == ClientType.WEB.getCode()){continue;}Boolean isSameClient = false;// 如果新登录的端和旧的端都是手机端,做处理if((clientType == ClientType.IOS.getCode()|| clientType == ClientType.ANDROID.getCode()) &&(userClientDto.getClientType() == ClientType.IOS.getCode()|| userClientDto.getClientType() == ClientType.ANDROID.getCode())){isSameClient = true;}// 如果新登录的端和旧的端都是电脑端,做处理if((clientType == ClientType.MAC.getCode()|| clientType == ClientType.WINDOWS.getCode()) &&(userClientDto.getClientType() == ClientType.MAC.getCode()|| userClientDto.getClientType() == ClientType.WINDOWS.getCode())){isSameClient = true;}if(isSameClient && !(clientType + ":" + imei).equals(userClientDto.getClientType() + ":" + userClientDto.getImei())){// TODO 踢掉客户端MessagePack<Object> messagePack = new MessagePack<>();messagePack.setToId((String) nioSocketChannel.attr(AttributeKey.valueOf(Constants.UserId)).get());messagePack.setUserId((String) nioSocketChannel.attr(AttributeKey.valueOf(Constants.UserId)).get());messagePack.setCommand(SystemCommand.MUTUALLOGIN.getCommand());nioSocketChannel.writeAndFlush(messagePack);}}}}});}}

这东西理解起来有点抽象

因为这是Redis的订阅模式,就要在启动Redis的时候一起启动了