为什么要数据分发

微服务中,每个服务都有独立的数据源,这使得数据同步成为难题。

拉模式or推模式?

拉模式存在的问题

  1. 由于网络延迟,拉取的数据不一定是最新的

  2. 如果频繁向另一服务拉取数据,会给服务造成压力,如果拉取频率过低,数据就会同步不及时。

推模式存在的问题

如何保证数据的一致性?或者说如何保证数据分发的事务性?

数据一致性分发

事务消息盒子(事务性发件箱)

本质是利用本地事务的事务性,保证了消息分发的最终一致性。

  1. 在数据库中新开一张发件表(OUTBOX table),用于存放要分发的数据相关信息。

  2. 在往本地表和发件表一起写数据的时候,开启本地事务,如果成功则一起提交,出错则一起回滚。

  3. 实现消息中继器(MessageRelay),定期拉取OUTBOX table中的数据,并发送到对应服务。

  4. 如果成功,则数据分发成功,否则记录重试次数(实现至少发送一次的功能,接收数据的服务可能需要做幂等处理),若重试次数达到阈值,分发失败,需人工干预。

Killbill Common Queue

是对事务性发件箱的开源实现。

上图中灰色框框起来的部分,就是组件对事务性发件箱的核心实现:

  • 事件分发线程(DispatcherThread)会从数据队列(DBQueue)中拿一个事件(Event),并将这个事件写入到事件表中,后续这个事件扔给事件总线(EventBus)处理。

  • 同一节点的事件总线(EventBus)拿到事件分发线程分发的事件,EventBus再将事件分发到对应的Handler,由Handler处理事件。

  • 如果事件处理成功,则标记成功,失败则记录失败次数,累计到阈值标记失败,由人工干预处理

  • 每一个节点事件分发线程都只负责自己节点分发的事件

reaper机制

事件分发线程有多个,假如在运行过程中有事件分发线程挂了,那这个线程中的事件怎么处理呢?

Killbill Common Queue引入了reaper机制:reaper会监控是否有已经写入数据库表但长时间未处理的事件,如果发现了,就讲这个事件收割,后续这个事件将由自己处理。

收割机机制,保证了killbill common queue的高可用性,相当于保证了事务性发件箱中的Message Relay的高可用性。

EventBus(PersistentBus)

EventBus实现了事件性发件箱的MessageRelay功能。

此外,EventBus的机制为事件机制,一开始会在EventBus中注册handler,handler绑定需要处理的事件,当EventBus中收到event时,就会发送给绑定该事件的handler处理。

async-event

是公司的一个组件,使用了hyperf的事件机制,实现了事务性发件箱的功能。

以下为核心功能dispatch实现:

在45-48行中,方法遍历了所有监听器(listener),把监听器名、事件名、事件中的数据存到发件表中。

在try中,调用listener去处理事件,如果处理成功则将发件表中的事件状态标记为完成。

在catch中,如果处理事件出错,就会记录重试次数。

async-event中存在一个定时任务,每十分钟拉取未处理成功的待处理的事件,然后丢给retry方法重试:

retry方法就是将事件进行重试,先反序列化事件,在将事件丢给对应的监听器处理,如果处理完成就标记完成,否则记录重试次数,如果重试次数达到阈值,则标记失败。

CDC-变更数据捕获( Change Data Capture, CDC )

每个数据库在变更数据是都有事务日志或提交日志。启动可以一个服务(Transaction log miner),用来订阅这个日志,当捕获到数据变更时,就将数据变更内容发送给mq(如果异常会重发至成功)。

变更数据捕获常用作于:

  • 数据迁移:常用于数据库备份、容灾等;

  • 数据分发:将一个数据源分发给多个下游,常用于业务解耦、微服务;

  • 数据采集:将分散异构的数据源集成到数据仓库中,消除数据孤岛,便于后续的分析。

相应开源项目:

  • 阿里 Canal:GitHub – alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件

  • Redhat Debezium:GitHub – debezium/debezium: Change data capture for a variety of databases. Please log issues at https://issues.redhat.com/browse/DBZ.

  • Zendesk Maxwell:GitHub – zendesk/maxwell: Maxwell’s daemon, a mysql-to-json kafka producer

  • Airnb SpinalTap:GitHub – airbnb/SpinalTap: Change Data Capture (CDC) service

  • FIink -CDC

内部组件实现:https://git.kkgroup.cn/brd/data-transfer-service

下面是cannal的工作原理

MySQL主备复制原理
  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)

  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)

  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理
  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议

  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

  • canal 解析 binary log 对象(原始为 byte 流)

CQRS-命令查询职责分离

一种设计模式,看了很多资料感觉有点高深了,简单来说就是:

  • 命令:CUD,会改变数据的操作

  • 查询:R,不会改变数据

CQRS将命令和查询划分为两个不同的对象,CQRS使用分离的接口将数据查询操作(Queries)和数据修改操作(Commands)分离开来,这也意味着在查询和更新过程中使用的数据模型也是不一样的。这样读和写逻辑就隔离开来了。

mysql的读写分离是在数据库层进行的,而CQRS也可以理解成一种读写分离,但是读写分离操作是在应用层进行的。

内部实现(es同步组件): https://git.kkgroup.cn/brd/elasticsearch-service

在写数据库时,将数据聚合并同步到es中,在查询聚合数据时到es查询。

要思考的问题

  1. 分发过程中要确保一定发送,如果发送失败就会重试。但由于网络抖动等原因,无法判断是否发送成功,会导致消息可能会发送多次。

  2. 由于会存在消息发送多次的情况,消费端就要做好消息去重或幂等机制

  3. 需要考虑是否有顺序性问题。比如两条消息的消费需要具备顺序性,或使用其他方式规避竟态并发带来的困扰。(没遇到过具体情况)

  4. 业务使用时需要理解最终一致性的最终俩个字,设计上需要容忍获取到中间态的数据。(没遇到过具体情况)

参考资料

简书:KillBill框架介绍

CSDN:如何解决微服务的数据一致性分发问题?

博客园:命令查询职责分离模式CQRS