MIT-6.5840-Raft
Raft论文笔记
从中央集权(单master)到自由民主的伟大过渡!(虽然选举后仍然是中央集权,而且集权程度比GFS还严重😂😂😂
本来是想说写一写Raft论文的解读的,但是很遗憾我在这上面花了太久的时间——快一个月,所以就没有时间写论文解读了
所以这里分享自己在论文上做的一些笔记,因为我自己是一边读一边记的,所以应该有些地方的理解是不对的,仅供参考
Lab3实现思路
Raft虽然宣称是一个understandable的算法,论文阅读起来idea确实很简单,无非就是投票嘛,但实际实现起来却要面对无限的细节问题,实现起来极其麻烦,再次验证了Talk is cheap的不变真理
虽然从开始到第一次通过所有的测试只用了大概一个星期,但是在多次测试下证明那个实现是有问题的,有着很高的错误率
后面又花了一个星期才修完了所有的问题,这个过程非常痛苦,因为像Raft这样的并发程序几乎不可能使用调试器来进行调试,只能检查运行日志来排错。
会浪费一个星期在纠错上还有一个原因是测试的耗时比想象中多,我的实现在优化后虽然无法达到guidence里推荐的的360s内完成,但总体耗时也在380s左右,但仍然不是一个小数字,如果你想要测试上千上万次的话。虽然MIT的助教也提供了一个不错的并行化测试脚本,但是一次开启的测试数量也不能太多,太多会导致过多的时间花费在上下文切换中,导致一些对运行时间有严格限制的测试失败(3B,3C)。在我的机器上(WSL2,11th Gen Intel(R) Core(TM) i7-11600H @ 2.90GHz),单次最多并行运行80个测试。运行1000次测试要花费一个小时左右,而我们学校晚上还会断电,后续虽然把测试移动到云服务器上,但是从阿里云白嫖的2核服务器只能并行30个测试,要两天才能搞定10000次测试。
之所以要这么多测试是因为有些bug的出现是十分罕见的,有些要在三四千次的测试中才会出现一两次,不过好在因为有比较良好的运行记录,能很快定位到问题所在。
在第一次通过所有测试之后其实已经写了一篇实现思路记录,没有想到经过一个星期的修正后新的实现已经和第一次通过所有测试时相去甚远,所以只能重新写作一篇来记录一下正确的实现方案。(在新写作到这里时云服务器上的测试已经进行了5000次而无错误,但愿能完成10000次测试)
—-更新—-
成功通过了10000次测试,平均用时386.68s,和hint推荐的360s多了20秒,不过其实可以通过将心跳时间,选举超时时间等改低来加速(代价是容易在RPC count测试中失败),如果用一个非常激进的数值可以把用时降低到340s左右。所以这个结果我还是比较满意的
—-再次更新—-
在做lab4的时候遇到了一些奇怪的现象,一路排查下来发现居然是Raft的实现有一个小bug导致了在一些特殊情况下leader不能及时commit
在修复了这个问题之后,平均用时提升到了340s左右,总算满足了推荐的时间
—-又一次更新—-
好吧,不得不承认lab4暴露出了很多潜在的问题,这让我不得不修改了Raft的很多代码。所以最新版本的Raft实现可能和文章里描述的有些出入,而我后面补写的内容也基本是想到哪写到哪,所以看起来会有一些自相矛盾的地方。凑合着看吧
为了完成lab4,主要的修改如下:(之前完全没有的机制)
-
加入了空日志的支持,这使得我不得不引入externalIndex和internalIndex来区分上层状态机看到的下标和Raft之间传递的下标。同时也修改了InstallSnapshot的一些实现
-
因为有了空日志支持,所以现在在leader成功当选时会按照论文的描述添加一条空日志来立刻启动commit,这让Raft的运行速度大幅提高了
-
引入了通过无锁队列实现的apply,主要是为了修正可能出现的循环等待问题,这个问题在lab3的测试中不怎么明显
-
每一条日志都会在创建时记录自己的下标在日志中,这使得我们可以直接使用log[0].Index作为snapshotIndex
-
使用条件变量来代替原先的睡眠轮询,测量结果显示快了150%左右,能够很好地通过lab4的speed test
主要的变化都是apply和snapshot相关的,日志同步机制倒是一直没有太大变化。
碎碎念到此结束,接下来回顾下实现思路,就按照lab的各阶段设置来讲吧
Part 3A: leader election
在动手之前一定要将论文的Figure2彻底理解,事实证明很多错误都源自于没有熟悉这张图
在实现的时候就直接一把大锁保平安了,虽然这样做效率很差,但比起协调各种锁来说还是可以接受的
超时计时器
首先要实现超时计时器来启动选举流程,即Raft.ticker
超时计时器定期检查距上一次心跳时间是否超过了选举超时时间,如果超过了则启动选举,没有超过就按照论文描述的睡眠一个随机的时间。
为了方便配置,我将这些涉及到时间的参数都设置为一个全局常量来便于调整
package raft
import "time"
const (
TM_HeartBeatInterval time.Duration = 100 * time.Millisecond // heart beat interval
TM_ElectionTimeout time.Duration = 200 * time.Millisecond // election timeout
TM_RandomWaitingTime time.Duration = 300 * time.Millisecond // randomization, wait for 0~TM_RandomWaitingTime
MAX_RETRY_TIMES int = 3 // Max times a server can try during sending a RPC
)
如上图所示,我的心跳周期为100ms,选举超时为300ms,而随机等待的时间在0~300ms之间
超时计时器的代码使用类似下面的方式
for !rf.killed() {
rf.mu.Lock()
// Heart beat timeout
if rf.state != RS_Leader && time.Since(rf.lastHeartBeat) > TM_ElectionTimeout {
//election
}
rf.mu.Unlock()
// pause for a random amount of time
// milliseconds.
ms := (rand.Int63() % int64(TM_RandomWaitingTime))
time.Sleep(time.Duration(ms))
}
启动选举
我选择将选举在一个新的线程中进行,因为这样更方便失败时取消选举
取消选举的方式最开始的实现是在选举线程中进行各种状态检查,但后面我意识到在选举线程之外进行检查,然后使用一些手段通知选举线程取消的做法会更好一点
通知取消的方法我选择使用cancelToken来实现,虽然也可以用context包来做,但是使用原子变量来实现cancelToken会更轻量一点,因为我只有一个目的
实现的部分代码如下,后续还有很多地方我会用到相同的机制来实现任务的取消
cancelToken := int32(0)
rf.lastHeartBeat = time.Now() // reset election timer
go rf.election(&cancelToken)
for {
rf.mu.Lock()
if time.Since(rf.lastHeartBeat) > TM_ElectionTimeout || atomic.LoadInt32(&cancelToken) == 1 ||
rf.state != RS_Candiate || rf.killed() {
atomic.StoreInt32(&cancelToken, 1)
break
}
rf.mu.Unlock()
time.Sleep(30 * time.Millisecond)
}
在Raft.election
中,就可以简单地通过原子操作读取cancelToken的值来确定是否要退出选举,在效率上也比加锁再检查要好一点
选举
到了紧张刺激的选举环节
按照Figure 2的指导来就行
首先先自增任期号,然后将Raft.votedFor
设置为自己的编号,接着向其他的服务器发送RequestVote RPC,最后根据返回情况来确定是否能够成为Leader即可
逻辑上就这么多要做的,实际的复杂度都在接收到RequestVote RPC的处理上,不过有几个小细节可以说一下
RequestVote的发送应该要是多线程的,通过启动协程来发送请求,防止某一个线路阻塞而系统阻塞。测试框架似乎是会随机地让一些RPC无法正常送达,所以要有重试机制。但是切记不能无限重试(虽然论文中说是infinitely),这样会导致系统创建了太多的线程,导致某些烦人的超时问题。我的做法是设置固定的重试次数,设置为3
在统计票数时我直接使用一个计数器,然后在RPC返回确认投票的信息后原子地加1。这样的做法根据论文的说法其实是错误的。因为单纯的计数损失了具体是哪台机器投票的信息,这在后续如果要实现机器的增减时会有很严重的问题,不过好在整个lab3不要求实现集群配置的更改,所以在这里使用单纯的计数是没有问题的,不过还是要提一嘴这是有问题的做法,从长期来看的话
最后别忘了加上使用cancelToken来确定选举是否要取消的检查
处理RequestVote
接着才是这一部分最为重要的地方
不过还是按照Figure 2的指导来做即可
首先进行检查,看是否发送者的任期号比接收者的任期号低,如果低的话直接返回拒绝的回复
接着检查发送者的任期号是否高于接收者的任期号,如果高于则说明当前接收者的状态是过时的,是应该重置的。这时接收者将自己的任期号更新至与发送者相同,然后将Raft.votedFor
指向nil,同时将身份转向follower
if args.Term < rf.currentTerm {
...
} else if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.votedFor = nil
rf.state = RS_Follower
rf.persist()
log.Printf("[%v] receive a RequestVote with higher term, change its term to %v\n", rf.me, rf.currentTerm)
}
一个很难理解的地方可能是为什么将Raft.votedFor
设置为nil而不是直接投票给发送者。这是因为接收到来自更高任期的RequestVote RPC并不意味着发送者真的是up-to-date的,一台被网络分区的机器可能会自己启动多轮选举,然后将任期号提升到一个非常高的数字,在它重新加入集群时,其向其他服务器发送的RequestVote RPC就有着很高的任期号但很显然大家都不该为它投票,因为它的日志不是up-to-date的
还有一个问题是为什么发送者和接收者任期号相同时不需要重置Raft.votedFor
,这是因为我们希望一台机器在一个任期内只进行一次投票,所以Raft.votedFor
是和Raft.currentTerm
严格绑定的,只有在任期号更新是我们才会重置Raft.votedFor
(启动选举时其实也是这样,先自增并设nil,然后再投票给自己,只是逻辑上等价于自增然后设置votedFor为自己)。换句话说只要Raft.votedFor
从nil更变为其他值,在该任期内它就不该再被更改
再接下来就是检查日志是否发送方的日志是否up-to-date,这点和Figure 2说的一样做即可
isUpToDate := (args.LastLogTerm > lastLogTerm) || (args.LastLogTerm == lastLogTerm && args.LastLogIndex >= lastLogIndex)
只要Raft.votedFor
为nil或者等于接收者的ID(为了修正返回消息丢失的情况),并且满足up-to-date的条件,就可以把票投给接收者,返回确认的回复,同时刷新心跳计时,因为这是一个有效的响应
成为Leader后
这节有点涉及到后面几个部分的内容,因为我将心跳包发送和同步日志合并在了一起
在成为leader后,立刻启动一个协程,称为Raft.serveAsLeader()
,顾名思义就是启动leader相关的工作
它的代码其实很短,因为具体的工作分散到了另外的线程上
func (rf *Raft) heartBeat(cancelToken *int32) {
for {
if atomic.LoadInt32(cancelToken) == 1 {
return
}
rf.cond.Signal()
time.Sleep(TM_HeartBeatInterval)
}
}
func (rf *Raft) serveAsLeader(term int) {
cancelToken := int32(0)
go rf.commitCheck(&cancelToken)
go rf.heartBeat(&cancelToken)
for {
rf.mu.Lock()
rf.cond.Wait()
if atomic.LoadInt32(&cancelToken) == 1 || rf.killed() || rf.state != RS_Leader || rf.currentTerm != term {
atomic.StoreInt32(&cancelToken, 1)
log.Printf("[%v] lose it's power in term %v or dead, cancel all leader tasks", rf.me, term)
rf.mu.Unlock()
return
}
go rf.syncEntries(&cancelToken)
rf.mu.Unlock()
}
}
可以看到我使用了和前面讨论过的选举一样的设计方案,通过cancelToken来取消所有的相关工作,然后在主线程内定期检查状态的变化来确定是否要退出
不过有一点不同,我使用了条件变量(绑定到大锁上)和一个单独的心跳产生线程来实现定期的检查和发送AppendEntries RPC(Raft.syncEntries()
就是用于同步日志的,理想状态下,通过该函数能让其他机器与leader的日志完全一致,不过对于现在这部分来说,因为还没有日志被添加,所以发送的总是空的RPC,也就是心跳包)
这样做的目的是为了后续的实现日志追加的时候,能够通过触发条件变量的Signal操作立刻启动一次同步而无需等待下一次心跳的产生,这能大幅提高系统效率
至于AppendEntries RPC的接收方一侧的做法,这里就不写了,因为在同步状态时大体上是和RequestVote RPC的接收方一样
还有个小细节就是要让日志初始时有一条空日志,因为Raft为了实现方便,下标总是从1开始。初始时有一条空日志可以保证在开始时计算是否up-to-date的时候有日志可以参考
Part 3B: log
Raft的主要工作部分,主要是针对AppendEntries RPC的实现。并不轻松的一部分
日志同步
前面提过,心跳发送和日志同步是在同一个函数Raft.syncEntries
中实现的,区别就在于创建要发送的AppendEntriesArgs中的Entries条目是否为空
参照Figure 2,leader需要维护所有其他机器的nextIndex和matchIndex
nextIndex用于描述leader认为同步日志需要发送给该机器的日志条目应该从哪个下标开始。每次发送的AppendEntriesArgs中的Entries就等于leader的日志从nextIndex开始到末尾的子集
当leader当选后,会将认为所有的机器的nextIndex都是”last log index + 1”,此时如果没有新增日志,根据go创建子切片的规则,创建出来的就是空集,即心跳包。如果新增了日志,要发送的就是leader认为该机器同步要用的条目
和发送RequestVote RPC的处理一样,我们并行地发送请求,然后根据返回的结果进行不同的处理
注意在创建参数时,Entries项应该是对日志的深拷贝,因为发送RPC的期间是解锁的,如果只是简单的切片会导致发送RPC时和其他线程出现数据竞争
处理AppendEntries RPC
当一台服务器接收到AppendEntries RPC时,和任何RPC请求一样,首先检查任期号的大小,对任期号小于自己的请求返回拒绝和自己的任期号,这时不需要更新心跳时间,因为这不是一个有效请求
如果任期号大于等于自己,则认为这是一个有效的请求,将自身的任期号更新至与请求包含的任期号相同,如果有任期号有变化则还需要将Raft.votedFor
置为nil,同时将角色转变为follwer,然后更新心跳时间,因为这是一个有效的请求
更新完状态后就是重要的日志添加了
按照Figure 2,只接受PrevLogIndex
和PrevLogTerm
都相同的请求,否则就返回错误(其实还有一种特殊情况,用于快速修复日志的)
一旦确定要接受该请求,则将日志追加到本地的日志中,然后根据参数中的commitIndex
更新自身的commitIndex
,按照Figure 2,新的commitIndex
应该是min(leaderCommit, index of last new entry)
无论是成功还是失败都将自身状态返回给leader,以此让leader根据这些信息确定下一个nextIndex
处理AppendEntries的回复
leader需要根据follower的返回情况来确定该follower接下来的nextIndex
还是根据Figure 2的指导,行为如下
-
如果返回结果表示AppendEntries失败了,同时返回的任期号比自身还高,说明当前的状态是过时的,直接退出所有的leader任务(通过设置cancelToken为1来实现)
-
如果返回结果表示AppendEntries成功了,那么要就需要将nextIndex更新到上一次发送的最后一个条目的下一个条目的下标。同时更新matchIndex为上一次发送的最后一个条目,表示,让commit检查能够察觉到日志的同步
-
如果返回结果表示AppendEntries失败了,那么就说明现有的nextIndex不是该follower期待的next,需要将nextIndex减小,然后重试。关于nextIndex的减小,最简单的思路就是单次减小1,然后重试直到follower接收成功。但很明显这不是一个好的做法,具体的做法后面会详细介绍
在最开始的实现中,重试通过使用Raft.cond.Signal()
来立刻启动一次重试,但是后续的实践证明这样做会导致连环创建线程,导致同一时刻存在过多的线程,导致一些对时间要求很高的测试失败。所以为了修正这个问题,重试只能通过下一次心跳或者下一次日志追加来触发,效率上虽然低了一点,但是还是可取的
快速修复日志
在一些情况下,网络分区或其他原因会导致一些follower与leader的日志相差很多(测试框架中有的会相差200多条),如果只是简单的单次减小1直到follower接受会很费时。所以我们需要一个能够快速同步日志的方法,在尽可能少的RPC的情况下完成日志同步
论文第七页末尾到第八页开始的部分介绍了基本做法–>在AppendEntriesReply中的添加一些信息,以此让leader能更好地判断要减少nextIndex到什么地方。
论文中说希望follower能自行判断冲突的日志,然后告诉leader,让leader将nextIndex设置为冲突的开始的地方。但说实话我没有想清楚这要怎么实现
还有一个普遍的做法是用二分查找,即先尝试最开始的条目和最后的条目,然后逐渐缩小范围直到锁定第一条冲突的日志。但这样的做法感觉会让leader长期处于Raft.syncEntries
中(猜测,不负责),所以我也没有采用
最后我用的是一个很简单粗暴的方法,直接让follower返回上一次成功apply的条目的下标Raft.lastApplied
,根据日志一致性原则,只要是成功apply的条目,在所有的机器上,直到该条目为止的所有条目都认为是一致的。所以将这个位置作为修复的起点我认为是可行的,虽然这样这样会导致一些有效的日志被丢弃,但是毕竟简单实现才是我关心的地方
下面要介绍的才是真正的日志同步leader端处理follower回复的做法
我认为存在对于follower回复的返回值,存在五种需要处理的情况
以及它们可能的产生原因如下
-
接收者回复了一个更高的任期号
如果发送者本身就是一个被网络分区了的leader,当它再次加入集群时,就会遇见这种情况
-
接收者回复了相同的
prevLogTerm
,更小的prevLogIndex
如果接收者被网络分区了,长期没有与当前的leader同步,当它再次加入集群时,如果没有进行过新的选举,就会遇见这种情况
-
接收者回复了更低的
prevLogIndex
,发送者在这个index上有与回复的prevLogTerm
相同的任期号如果接收者被网络分区了,期间又进行了选举导致当前的任期号过时,新的leader在别的任期中选出,新的leader在发送请求时就会出现这种情况
-
接收者回复了更低的
prevLogIndex
,发送者在这个index上有与回复的prevLogTerm
不相同的任期号当一个leader被分区,但是仍然在向日志中添加内容,再次加入集群中就会出现这种情况
比如下面的示例中,S0是原本的leader,成功在任期2完成了#1和#2日志的添加并且成功commit,然后leader网络分区,新的leader在任期3,4的S1产生。这时S0又进行了追加,新的leader也进行了追加。当S0加入集群时,S1向S0发起日志同步,args.PrevLogIndex = 4,args.PrevLogTerm = 3,reply.PrevLogIndex = 3, args.PrevLogTerm = 2
这时就不能直接将nextIndex调整为4,因为S0上的条目和leader是不一致的,是需要删除的
S0: 2 2 2 S1: 2 2 3 3 4 4 S2: 2 2 3 3 4
-
接收者回复了一个比发送者认为的
prevLogIndex
还高的prevLogIndex
当一个leader被分区后,持续添加大量日志,再次加入选举了新leader的集群就会出现该情况
比如下面的示例中,S0是开始的leader,在任期2内成功commit了#1,随后其被网络分区,新的leader从S1中产生,当其重新加入集群中,S1向S0发起日志同步,args.PrevLogIndex = 1,args.PrevLogTerm = 2,reply.PrevLogIndex = 6,args.PrevLogTerm = 2
S0: 2 2 2 2 2 2 S1: 2 3 3 S2: 2 3
以上的例子都只是最可能造成对应回复情况的原因,实际上还会有其他原因,实践证明以上的几种判断足以覆盖所有的可能
虽然看上去有五种情况很吓人,但归根到底只有三种:
-
任期问题(0)
这种情况只能取消掉发送者的所有leader职能,因为发送者是处于一个过时的状态
-
日志只是缺失了,而不是冲突(1,2)
这种情况只需要将nextIndex调整到返回的prevLogIndex+1即可,因为接收者的日志只是缺失了而没有产生冲突
-
日志确实冲突了(3,4)
这种情况就要按前面所说的将nextIndex设置为返回的lastApplied+1,因为只有lastApplied和它前面的日志是可信的
在接收端要对这样的AppendEntries作特殊处理,我设置的条件是如果发送端发送的参数里的prevLogIndex小于接收者的最后一条日志的下标并且等于接收者的lastApplied
(len(rf.log) - 1 > args.PrevLogIndex && args.PrevLogIndex == rf.lastApplied)
,就认为这是一个修复包,强制接收该修复包,用参数中的Entries替换lastApplied之后的所有条目同时接收端还要做一个前缀检查,因为测试框架中有一类测试会随机延迟RPC的抵达时间。所以一个接收端可能会收到一个过时的修复包,当接收端在进行完修复后又追加了一条日志,这时如果收到了一个过时的修复包,会导致新追加的日志丢失了,进而导致系统的行为变得很奇怪。所以接收端在应用一个修复包之前要先检查是否是现有日志的前缀,如果是则拒绝接受该包,但是仍然返回True,因为事实上还是认同了该次修改,因为matchIndex是只增的,即使返回True也不会造成问题
日志提交和应用
检查一个日志是否提交和是否应用我分散到了两个线程上,由Raft.commitCheck()
和Raft.applyEntries()
来负责
如前面的Raft.serveAsLeader()
所展示的那样,Raft.commitCheck()
是独属于leader的线程,只能由leader启动,同时接收和Raft.heartBeat()
相同的cancelToken用于退出
Raft.commitCheck()
做的事情就是找到一个最大的N,使得一半以上的服务器(包括leader自身)的matchIndex大于等于这个N。出于简单考虑就直接用暴力搜索了,找到这样的N后更新leader的commitIndex,就代表下标为commitIndex以及该下标之前的所有日志都是已commit的,在这之后无论leader怎么变化都能在leader上找到commitIndex之前的条目(暂时不讨论snapshot),因此也就可以安全地应用到状态机上
Raft.applyEntries()
在任何一台机器启动时开启,它周期地检查Raft.lastApplied + 1
是否小于等于Raft.commitIndex
,如果小于就将对应的条目提交给一个队列,然后由另一个独立的线程检查队列是否为空,如果不为空则提交队列中的内容,这个真正提交条目的线程是不会抢占Raft.mu
锁的
之所以要将条目先提交给一个不会抢占Raft.mu
的队列,主要和后续的Snapshot机制有关。
上层的测试框架在调用Raft.Snapshot()
时,直到该函数return前不会接受Raft发来的apply message。测试框架使用的代码类似于如下的:
for msg := range rf.ApplyChan {
if ... { // some conditions
rf.Snapshot(...) // Call Snapshot()
}
... // other code...
}
如果直接在带锁Raft.mu
的环境下发送apply message到上层测试框架,因为go的channel在被接受之前会阻塞线程,如果这时上层测试框架调用了Raft.Snapshot()
,该函数需要获得Raft.mu
完成工作后才能返回,此时该机器就会陷入死锁的状态。
当然也可能会想到在无锁环境下发送apply message(事实上我之前就是这么做的),这样做会在特殊情况下造成apply out of order的问题。比如使用这样的代码:
for ... {
rf.mu.Lock()
if OK_TO_APPLY { // is ok to apply
applyMsg := ApplyMessage{
... // create apply message
}
rf.lastApplied = ... // update rf.lastApplied
rf.mu.Unlock()
rf.applyCh <- applyMsg
rf.mu.Lock()
}
rf.mu.Unlock()
... // sleep or other action
}
如果在内层循环的rf.mu.Unlock()
调用之后,没有立刻执行下一行代码,而是调度到了处理来自leader的InstallSnapshot请求的InstallSnapshot()
函数中。该函数会向上层应用发送snapshot包,上层应用根据该包调整状态机的并且调整下一个applyMsg的预期Index。此时系统重新调度回apply线程中,继续向上层发送了原来准备发送的条目,此时和上层应用预期的Index对不上,导致了apply out of order问题。
空日志
在Paper的P13有这么一段话:
The Leader Completeness Property guarantees that a leader has all committed entries, but at the start of its term, it may not know which those are. To find out, it needs to commit an entry from its term. Raft handles this by having each leader commit a blank no-op entry into the log at the start of its term.
该机制就算没有也不影响我通过lab3的测试,但是这个机制的缺失让我在lab4吃了一个大苦头
苦头来自于lab4中client与server通信,server只有在接收到Raft的apply message之后才会返回给client
问题在于我期待leader只commit它任期内添加的log。如果一个条目在添加后,leader下台,然后新的leader继续从该机器中产生。因为没有实现空日志机制,所以在leader以新的任期开始时,是无法commit它之前添加的日志的。
为了修复这个问题,我尝试将commit的限制去掉,但在以下的情况下会出现问题
Server
A [-----Term 1----]
B [-----Term 4-------]
C [-----Term 4-------]
D [-----Term 5-----------]
E [-----Term 4-------]
如上,假设B,C,E的日志由B在任期4添加,然后B在任期6完成了和C,E的日志同步,满足了commit要求,于是commit这些日志,然后apply它们。问题在于如果随后D重新加入集群,发起一个选举,它以更高的日志任期号赢得了选举(follower会向有更高任期日志的机器投票)。所以D赢得了选举,然后同步日志,commit,apply,然后apply err。
所以实现空日志条目是必须的
在leader当选后,立刻向日志中添加一个类型为LT_Noop
的日志。该日志正常参与日志同步,但是在apply时会被忽略。
下一个非常烦人问题就是,上层的测试框架认为日志应该是从1开始严格按照以1递增的顺序进行提交的。即上层测试框架接受apply message时希望是从一开始连续的。而空日志的存在使得apply时不能简单地以当前日志在raft本地日志中的下标提交
解决方案也很简单,当上下层进行下标交流的时候我们进行一次转换即可。比如将上层测试框架认为的下标称为externalIndex,Raft内部的下标成为internalIndex。
两者可以通过简单地遍历日志来转化,比如说Raft需要定位到externIndex为3的日志,就可以遍历日志,记录类型不为LT_Noop
的日志的个数,当记录到第3个时就可以确定要使用的就是该日志。同理,需要将internalIndex转换为externalIndex,就可以从头开始便利,记录过程中类型部位LT_Noop
的日志的个数。
如果没有snapshot的话这个机制的实现其实非常简单,但如果引入了snapshot,就无法再从头开始遍历所有的日志。所以需要记录一些额外的信息来补完这个机制。具体的实现个人感觉做法有些dirty,就不写了。
Part 3C: persistence
奖励关,虽然难度标注是hard,但实际上只要按照Figure 2在所有应该persist的状态改变时进行存储即可,编码和解码只要参考注释的做法就行。
只有一个小细节要注意
对于Raft.votedFor
,因为其是一个指针,在未投票的时候要置为nil,所以在保存时要注意为nil时就转换为-1进行存储,解码时只需要对-1的情况特殊处理就行(更好的做法是不使用指针,直接用-1代表未投票)
之所以我做起来这么简单,大概是因为我在Part 3B就已经实现了日志的快速同步,所以这里要做的工作就只是在涉及状态变化的时候进行持久化
Part 3D: snapshot
非常困难的一部分,事实证明我一周的修复工作修复的漏洞大部分都是由snapshot机制引起的。要正确地实现snapshot并不是一件简单的事情
下标转换
这部分其实是因为早期的设计没有考虑到删除日志压缩导致的。因为snapshot会删除不需要的日志,这时全局范围内下标相同的日志在不同的机器上的实际下标会不同。
如果在最开始就将一台机器的实际日志下标作为一个单独的变量来存储,就可以免去很多麻烦。
我这里的做法是在全局下标和本地下标之间做一个转换(全局下标指一个条目在系统范围上的下标,本地下标指一台机器访问日志实际使用的下标)
snapshot机制要持久化地记录当前的snapshotIndex。如果没有还没有snapshot,则该属性数值为0。如果snapshot记录到某个条目为止(包括该条目),则将snapshotIndex更新到该条目的下标。这时全局下标和本地下标就存在这样的转换关系本地下标 = 全局下标 - snapshotIndex
(更新:后面我将rf.snapshotIndex和log[0].Index合并了,每一条日志都记录自己的内部下标,刚好本地日志的下标就是snapshotIndex)
因此定义以下三个函数完成转换
func (rf *Raft) localIndex(globalIndex int) int {
return globalIndex - rf.snapshotIndex
}
func (rf *Raft) globalIndex(localIndex int) int {
return localIndex + rf.snapshotIndex
}
func (rf *Raft) globalLogLen() int {
return len(rf.log) + rf.snapshotIndex
}
所有的下标都以全局下标存储,在访问日志数组时使用将其转换为本地下标
snapshot的创建和从snapshot恢复
创建snapshot
测试框架会调用Raft.Snapshot()
来指示创建snapshot,测试框架提供该snapshot最后的index和snapshot本身。
要调整的属性是Raft.snapshotIndex
和Raft.snapshotTerm
,前者就等于给出的index,后者等于该index对应的日志的任期号。然后缩小日志到index,让index对应的日志成为日志数组编号为0的项,同时将其设置为空日志
为了实现更好的支持,Raft.Snapshot
应该要主动拒绝上层框架给出的index小于Raft.snapshotIndex
的请求,这样能避免出现切片的index out of range问题
最后就是持久化存储snapshot了,使用框架提供的功能,同时不要忘记把Raft.snapshotIndex
和Raft.snapshotTerm
也持久化存储,因为之后从snapshot恢复还需要它们
从snapshot恢复
最容易出问题的地方就是要注意Raft.Make()
函数。
测试框架要求该函数快速返回,只有在返回之后测试框架才会开始接收apply message,而apply channel如果没有接收数据就会一直阻塞。所以如果你在Raft.Make()
中直接使用channel发送apply message会导致整个系统锁死
正确的做法是使用协程启动apply,这样才能避免锁死。
类似的代码结构如下
func Make(...) *Raft {
...
rf.readPersist()
...
go func () {
rf.applyCh <- {Snapshot Apply Message}
...
start other tasks
e.g.
go applyEntries()
...
}
return rf
}
在发送完成后也要更新rf.commitIndex
和rf.lastApplied
为存储的rf.snapshotIndex
,别忘了也为本地日志的第0项日志设置新的任期号
InstallSnapshot
leader发送InstallSnapshot的条件是当它要发送的nextIndex对应的条目已经因为snapshot而删除,即nextIndex小于等于snapshotIndex时。
这时leader就需要通过InstallSnapshot RPC来使follower的状态和它同步
基本行为参照论文的Figure 13
为了支持空日志,InstallSnapshot的发送者还要告诉接收者当前snapshot的开始的internalIndex,这样接收者才能根据这个信息创建出本地日志中的首条日志。因此实际上我修改了RPC args的参数
区别在于该lab不要求我们实现snapshot的分段传输,也就是说可以不管除了第一条以外的前五条。
按照第六条,如果日志长度大于要应用的snapshot并且剩余的都是有效日志,那么就保留这些日志否则就直接删除这些日志。
然后要将snapshot通过channel发送给上层状态机,注意发送时不该加锁,不然有可能因为channel阻塞而导致系统锁死。
InstallSnapshot RPC的接收方也要更新Raft.lastApplied
和Raft.commitIndex
。因为一旦接收了Raft的snapshot apply message,测试代码也会认为接下来要apply的下标也从接收的snapshot开始。
完成lab4后更新:
好吧,我不得不承认把InstallSnapshot的发送给状态机的过程想的太简单了。这样简单地不加锁就发送的方案实际上非常蠢,害我足足花费了一天的时间来定位这个错误的实现。
错误的根源在于如何避免apply out of order问题,如果不加锁就发送snapshot给上层应用,接下来apply线程又给上层发送了残存在队列里的applyMsg,这就会造成apply out of order问题
如果直接带着大锁进行apply snapshot,就会有很高的概率陷入循环等待。考虑这样一个情况:
- Raft收到InstallSnapshot RPC, 获取的
rf.mu
锁,随后向管道发送snapshot(未解锁),直到上层接收前不会解锁 - 上层应用同时受到一个客户端请求,获取了
kv.mu
锁, 上层应用处理客户端请求时调用Raft.Start
,该函数尝试获取rf.mu
,但该锁被1获取 - 上层应用的apply线程在处理上一个applyMsg,尝试获取
kv.mu
锁,但该锁被2获取,在处理完该applyMsg之前不会接收snapshot
这就导致了一个循环等待 1 -> 3 -> 2 -> 1
事实上这也是为什么我们要引入额外的无锁的队列来进行apply的根本原因。
正确的做法应该是先清空队列,然后将snapshot也加入到无锁队列中,让其走完整的正常的apply流程
后记
从开始读Raft论文开始到现在花了快一个月在这上面
毫无疑问这门课的lab是我尝试过的公开课里面最难的一个
不光是要自己阅读论文,还要写一些之前很少接触的并行编程。跟这些相比学go反而是最轻松的一部分
为了排查一些错误花了很多时间,测试虽然可以并行运行但还是很慢。我其实有点怀疑自己为了修这些不怎么重要的bug花这么多时间是不是在逃避后面的lab(虽然肯定不能简单的说它们不重要)
总的来说不是一次很开心的lab经历,因为对着一大堆日志debug真的不好玩。。。