RocketMQ在开启Dledger时,使用DLedgerCommitLog,其他情况使用的是CommitLog来管理消息的存储。在Dledger模式下,消息写入时Leader节点还需要将消息转发给Follower节点,有过半的节点响应成功,消息才算写入成功。

Leader消息写入

Dledger下有DLedgerMemoryStore(基于内存存储)和DLedgerMmapFileStore(基于Mmap文件映射)两种方式写入,接下来以DLedgerMmapFileStore为例,看下消息的写入过程。
Leader节点在写入前会为消息构建DLedgerEntry对象,之后本地写入以及转发给Follower节点都会使用这个对象:

  1. 进行Leader节点校验和磁盘已满校验;

  2. 在DLedgerEntry对象中设置消息的index(为每条消息进行了编号),值为ledgerEndIndex + 1,ledgerEndIndex初始值为-1,新增一条消息ledgerEndIndex的值也会增1,ledgerEndIndex是随着消息的增加而递增的,写入成功之后会更新ledgerEndIndex的值,ledgerEndIndex记录最后一条成功写入消息的index:

  3. 将消息内容写入CommitLog文件;

  4. 更新MemberState中记录的LedgerEndIndex和LedgerEndTerm的值;

等待Follower节点响应

  • pendingAppendResponsesByTerm:key为Term的值,value是一个ConcurrentHashMap,这个ConcurrentHashMap中的KEY为消息的index(每条消息的编号,从0开始,后面会提到),ConcurrentMap的KEY为消息的index,value为此条消息转发给Follower节点的异步响应对象AppendEntryResponse:

在消息写入Leader节点之后,Leader节点需要向Follwer节点转发日志,这个过程是异步处理的,会开启一个线程来进行消息转发,所以这里会先为每个请求创建异步响应对象,主要处理逻辑如下:

  1. 如果集群中只有一个节点,设置处理状态为完成并返回响应即可;
  2. 如果集群中有多个节点,由于日志转发是异步进行的,所以会先创建响应对象AppendFuture,并将创建的对象加入到pendingAppendResponsesByTerm中,pendingAppendResponsesByTerm的数据就是在这里加入的,之后有另外一个线程会处理消息转发,当消息转发成功之后会从这里取出响应对象,并将其处理状态置为完成;

Leader与Follower的日志复制

Leader消息转发与Follower的处理是单独开启线程异步进行的,主要有以下几个线程:

  1. EntryDispatcher(运行于Leader节点):用于Leader节点向Follwer节点转发日志,Leader节点会为每个Follower节点创建一个EntryDispatcher转发器,一个EntryDispatcher负责一个节点的日志转发,多个节点之间是并行处理的;
  2. EntryHandler(运行于Follower节点):用于Follower节点处理Leader节点发送的日志;
  3. QuorumAckChecker(运行于Leader节点):用于Leader节点等待Follower节点同步;

EntryDispatcher(Leader日志转发)

EntryDispatcher中会启动一个线程,用于向Follower转发日志,处理逻辑如下:

  1. 校验节点的角色是否是Leader节点;
  2. 对消息的转发类型进行判断,有以下两种状态:
    • APPEND:消息追加,用于向Follower转发消息;
    • COMPARE:消息对比,一般出现在数据不一致的情况下,需要与Follower节点的日志进行对比;

EntryDispatcher中会记录向当前的Term与Leader ID,处于以下三种条件之一,会认为集群可能发送了变化,数据处于不一致的状态,此时会将推送类型更改为COMPARE:
(1)EntryDispatcher记录的Term与MemberState中记录的不一致;
(2)EntryDispatcher记录的LeaderId为空;
(3)EntryDispatcher记录的LeaderId与MemberState中记录的不一致;

Append

  • committedIndex:得到了集群中大多数节点的响应的消息的index,记为committedIndex,committedIndex之前的消息表示都已提交,可以被消费者消费;

Append状态下,Leader节点将消息转发给Follower节点进行同步,Leader方的发送逻辑如下:

  1. 校验推送类型是否是APPEND,如果不是终止处理;

  2. writeIndex为待转发消息的Index,默认值为-1,判断是否大于LedgerEndIndex,如果大于会发送COMMIT请求到Follower节点,通知Follower节点更新committedIndex(后面再说);

这里可以看出转发日志的时候也使用了一个计数器writeIndex来记录待转发消息的index,每次根据writeIndex的值从日志中取出消息进行转发,转发成后更新writeIndex的值(自增)指向下一条数据。

  1. 向Follower发送消息转发请求并处理请求响应结果;

  2. 更新writeIndex的值,做自增操作指向下一条待转发的消息index;

消息转发请求发送与响应结果处理

pendingMap:pendingMap是一个ConcurrentMap,KEY为消息的INDEX,value为该条消息向Follwer节点转发的时间,在发送消息转发请求后会加入到pendingMap中,请求响应成功之后会从pendingMap移除;

peerWaterMarksByTerm:记录了每个Follower节点的消息复制进度,KEY为当前的Term值,VALUE是一个ConcurrentMap,ConcurrentMap中的KEY为Follower节点的ID(peerId),VALUE为该节点已经同步完毕的最新的那条消息的index;

请求发送

  1. 根据消息的index从日志获取消息内容;
  2. 构建日志转发请求,在请求中设置了消息、当前Term、Leader节点的commitIndex(最后一条得到集群中大多数节点响应的消息index)等信息
  3. 发送请求给Follower节点;
  4. 将本条消息对应的index加入到pendingMap中记录消息的发送时间(key为消息的index,value为当前时间);

响应处理

  1. 如果响应状态为SUCCESS, 表示节点写入成功,然后做如下处理:
    (1)从pendingMap中移除本条消息index的信息;
    (2)更新当前这个Follower节点的复制进度,也就是peerWaterMarksByTerm中的值;
    (3)唤醒QuorumAckChecker线程,主要是为了统计是否有过半的节点写入成功(后面再说);
  2. 如果Follower响应状态为INCONSISTENT_STATE,表示Follower节点数据出现了不一致的情况,Leader节点会将状态改为COMPARE;

这里再区分一下pendingAppendResponsesByTerm和peerWaterMarksByTerm:
pendingAppendResponsesByTerm:记录的是转发给Follower节点的消息复制请求的异步响应对象AppendEntryResponse,因为要等待集群中大多数节点的响应,所以使用了异步处理,消息写入成功之后会将处理状态置为完成。
peerWaterMarksByTerm:记录的是每个Follower节点的消息复制进度,保存的是每个节点最后一条成功写入的消息的index。

Compare

compareIndex:需要比较的那条消息的index,初始值为-1,每次更改状态为COMPARE时都会重置为-1;
truncateIndex:要删除消息的index,在数据不一致时,会发送请求通知Follower节点将数据不一致的那条消息删除;

当出现数据不一致的情况时,日志转发状态会被置为Compare,然后Leader节点会发送Compare请求,通知Follower节点进行消息对比,找到数据不一致的那条消息的index。
判断数据不一致的条件如下,满足其中之一就会被认定数据不一致:
(1)Leader节点在调用checkAndFreshState检查的时候,发现当前Term与memberState记录的不一致或者LeaderId为空或者LeaderId与memberState记录的LeaderId不一致;
(2)Follower节点在处理消息APPEND请求在进行校验的时候(Follower节点请求校验链接),发现数据出现了不一致,会在请求的响应中设置不一致的状态INCONSISTENT_STATE,通知Leader节点;

Leader节点发送Compare请求

在COMPARE状态下,向Follower节点发送比较请求的处理逻辑如下:

  1. 校验当前状态是否是COMPARE或者TRUNCATE请求,如果不是终止处理;
  2. 设置compareIndex的值:
    (1)如果compareIndex值为-1(初始值),获取LedgerEndIndex值作为compareIndex的值进行更新,从最近的那条消息开始比较;
    (2)如果compareIndex的值大于LedgerEndIndex(超过最大值)或者小于LedgerBeginIndex(低于最小值),说明值不合法,同样从最近的那条消息也就是LedgerEndIndex的位置开始比较;
  3. 根据compareIndex的值获取对应的消息内容,然后构建COMPARE请求;
  4. 向Follower节点发送COMPARE请求;
  5. 等待COMPARE请求返回响应;

Leader节点对Compare请求的响应结果处理

在Follower节点的请求响应中,会返回Follower节点最后成功写入的消息的index设置在endIndex变量中,第一条写入的消息设置在beginIndex变量中,Leader节点拿compareIndex值与其进行对比,处理逻辑如下:

  1. 请求响应码为SUCCESS:
    (1)如果compareIndex与Follower返回请求中的EndIndex相等,表示没有数据不一致的情况,将状态更改为APPEND
    (1)其他情况,将truncateIndex的值置为compareIndex;

  2. 如果endIndex小于当前节点的ledgerBeginIndex,或者beginIndex大于ledgerEndIndex,也就是follower与leader的index不相交时, 将truncateIndex设置为Leader的BeginIndex,也就通知Follower节点从Leader节点第一条消息那个位置往后删除;

  3. compareIndex比Follower的BeginIndex小,将truncateIndex设置为Leader的BeginIndex,同样通知Follower节点从Leader节点第一条消息那个位置往后删除;

  4. 其他情况,说明还未找到不一致的消息位置,将compareIndex的值减一,从上一条消息开始继续对比;

  5. 如果truncateIndex的值不为-1,调用doTruncate方法进入消息删除的处理逻辑;

在doTruncate方法中,会构建TRUNCATE请求设置truncateIndex(要删除的消息的index),发送给Follower节点,通知Follower节点从数据不一致的那条消息开始删除,如果Follower删除成功,Leader节点会将状态改为APPEND,并更新节点的复制进度为出现数据不一致的那条消息的index,同时也更新了writeIndex,下次从writeIndex处重新给Follower节点发送APPEND请求进行消息写入。

总结
在Leader节点判断数据不一致时,会向Follower节点发送COMPARE请求,请求中会携带要比较那条日志的index,通知Follower节点对此条消息进行对比,并返回对比的结果,如果Follower节点发现数据并没有不一致,那么Leader节点收到响应后就更改为APPEND状态,继续日志转发;

如果Follower节点认为数据不一致,会返回BeginIndex和EndIndex,Leader节点会拿compareIndex的值进行对比,如果不在以上情况内,compareIndex的值减一,从上一条消息开始继续对比,直到找到数据不一致的那条消息的index。

EntryHandler(Follower节点)

EntryHandler用于Follower节点处理Leader发送的消息请求,主要有四种请求类型,分别为Append、Compare、Truncate和Commit类型。

APPEND请求处理

APPEND请求处理逻辑如下:
(1)计算writeIndex的值,Follower的LedgerEndIndex记录了最后一条成功写入消息的index,对其 + 1表示下一条待写入消息的index,也就是writeIndex的值;
(2)从请求中获取消息内容,将消息写入CommitLog文件;
(3)上面说过Leader节点发送Append请求时,也会将记录的commitIndex设置到请求中,这里会从中取出commitIndex更新到Follower本地,后面讲QuorumAckChecker时候会提到;

Compare请求处理

COMPARE的请求处理逻辑如下,compareIndex为需要比较的index,处理逻辑如下:
(1)校验请求中的类型是否是COMPARE;
(2)根据compareIndex的值从本地获取消息内容;
(3)将上一步获取到的消息内容,与请求中携带的消息内容做对比,如果一致进入下一步,如果不一致会进入异常处理;
(4)构建响应体,响应状态为SUCCESS,并在响应体中设置当前节点同步的消息的BeginIndex和EndIndex;
异常处理:会返回INCONSISTENT_STATE状态,表示数据不一致,响应体中也会设置当前节点同步的消息的BeginIndex和EndIndex;

Truncate请求处理

Truncate请求用于Follower节点根据Leader节点发送的truncateIndex,将truncateIndex位置后面的消息从本地删除。

COMMIT请求处理

前面讲到Leader节点会向Follower节点发送COMMIT请求,COMMIT请求主要是更新Follower节点本地的committedIndex的值,记录集群中最新的那条获取大多数响应的消息的index,committedIndex之前的消息都已提交,已提交的消息可以被消费者消费,下面讲QuorumAckChecker的时候还会说到。
需要注意,Leader节点除了专门向Follower节点发送COMMIT请求外,Leader节点在发送Append请求时也会设置这个提交点,Follower节点处理APPEND的请求时会顺带更新。

QuorumAckChecker(Leader节点)

QuorumAckChecker用于Leader节点等待Follower节点复制完毕,处理逻辑如下:

  1. 如果pendingAppendResponsesByTerm的个数大于1,对其进行遍历,如果KEY的值与当前Term不一致,说明数据已过期,将过期数据置为完成状态并从pendingAppendResponsesByTerm中移除;

  2. 如果peerWaterMarksByTerm个数大于1,对其进行遍历,同样找出与当前TERM不一致的数据,进行清理;

以上两步主要是为了清理过期的数据。

  1. 从peerWaterMarksByTerm中获取当前Term的数据,里面记录了每个Follower节点的日志复制进度,然后对所有的复制进度进行排序,取出处于中间位置的那个进度值,也就是消息的index值,这里不太好理解,举个例子,假如一个Leader有5个Follower节点,当前Term为1:
{   "1" : { // TERM的值,对应peerWaterMarksByTerm中的Key    "节点1" : "1", // 节点1复制到第1条消息    "节点2" : "1", // 节点2复制到第1条消息    "节点3" : "2", // 节点3复制到第2条消息    "节点4" : "3", // 节点4复制到第3条消息    "节点5" : "3"  // 节点5复制到第3条消息   }}

对所有Follower节点的复制进度倒序排序之后的list如下:

[3, 3, 2, 1, 1]

取5 / 2 的整数部分为2,也就是下标为2处的值,对应节点3的复制进度(消息index为2),记录在quorumIndex变量中,节点4和5对应的消息进度大于消息2的,所以对于消息2,集群已经有三个节点复制成功,满足了集群中大多数节点复制成功的条件。

如果要判断某条消息是否集群中大多数节点已经成功写入,一种常规的处理方法,对每个节点的复制进度进行判断,记录已经复制成功的节点个数,这样需要每次遍历整个节点,效率比较低,所以这里RocketMQ使用了一种更高效的方式来判断某个消息是否获得了集群中大多数节点的响应。

  1. quorumIndex之前的消息都已经获得集群中大多数节点响应,所以此时可以更新提交点,更新当前Leader节点记录的committedIndex的值;

  2. 从pendingAppendResponsesByTerm中移除已经写入成功消息的数据,主要是清理数据;

  3. 处理pendingAppendResponsesByTerm中的超时数据,这一步主要是为了处理超时的数据;

持久化

QuorumAckChecker中可知Leader节点在某个消息的写入得到集群中大多数Follower节点的响应之后,会更新committedIndex的值,上面也提到过,Follower节点在收到Leader节点的APPEND或者Commit请求的时候,也会将请求中设置的Leader节点的committedIndex更新到本地。之后Broker停止或者FLUSH的时候,会将ledgerEndIndex和committedIndex写入到文件(ChecktPoint)进行持久化:

  • ledgerEndIndex:Leader或者Follower节点最后一条成功写入的消息的index;

  • committedIndex:如果某条消息转发给Follower节点之后得到了集群中大多数节点的响应成功,将对应的index记在committedIndex表示该index之前的消息都已提交,已提交的消息可以被消费者消费,Leader节点会将值设置在APPEND请求中发送给Follower节点进行更新或者发送COMMIT请求进行更新;