Raft(三):日志复制

日志复制

具体的日志复制算法,在 Raft 论文中说的非常详细:Raft paper(第5节、5.3节)。

在 LAB 2B 的实现中需要施加选举限制:Raft paper(第5.4.1节)。

需要注意的是,Leader 只能提交当前任期内的日志:Raft paper(第5.4.2节)。

日志加速回退

Raft paper(第5节)中提到,如果 Leader 向一个 Follower 追加日志失败,就表明 Leader 和 Follower 的日志有冲突,则 Leader 会回退一个 Log,之后再次向跟随者追加日志。

假设有这样一个场景,我们有5个服务器,有1个 Leader,这个 Leader 和另一个 Follower 困在一个网络分区中。这个 Leader 一直向它唯一的 Follower 发送 AppendEntries,因为没有过半服务器,所以没有一条 Log 会 commit。在另一个有多数服务器的网络分区中,系统会选出新的 Leader 并继续运行。旧的 Leader 和它的 Follower 可能会记录无限多的旧的任期的未 commit 的 Log。当旧的 Leader 和它的 Follower 重新加入到集群中时,这些 Log 需要被删除并覆盖。

如果 Leader 每次只能在附加日志RPC 失败后回退一个 Log 然后重试的话,可能会耗费大量的时间。因此,需要某些方法来加速日志的回退。

6.824 的课堂上,Robert 教授给出了一个方法:让 Leader 可以每次回退一整个任期的 Log,而不是只回退一个 Log。

我们将可能的场景分成3类,在这里假设我们只有一个 Leader(S2)和一个 Follower(S1),S2 要发送一条任期号为6的 AppendEntries 消息给 S1:

(1)场景一

S1没有任期6的 Log,因此我们需要回退一整个任期的 Log。

(2)场景二

S1 收到了任期4的旧 Leader 的多条 Log,但是作为新 Leader,S2 只收到了一条任期4的 Log。所以这里,我们需要覆盖 S1 中有关旧 Leader 的 Log。

(3)场景三

S1 与 S2 的 Log 不冲突,但是 S1 缺失了部分 S2 中的Log,这里我们回退掉所有空缺的 Log。

可以让 Follower 在回复 Leader 的 AppendEntries 消息中,携带3个额外的变量,来加速日志的恢复:

  • XTerm:Follower 与 Leader 冲突的 Log 对应的任期号。如果 Follower 在 PrevLogIndex 位置的任期号与 Leader 不匹配,它会拒绝 Leader 的 AppendEntries 消息,并将自己的任期号放在 XTerm 中。如果 Follower 在对应位置没有 Log,那么这里会返回 -1。

  • XIndex:Folower 中任期号为 XTerm 的第一条 Log 的槽位号。

  • XLen:如果 Follower 在 PrevLogIndex 位置没有 Log,那么 XTerm 会返回-1,XLen 表示空白的 Log 槽位数。

Leader 收到错误返回时,就可以根据上述信息加速日志回退:

  • 场景一:S1 会返回 XTerm=5,XIndex=2。S2 发现自己没有任期5的日志,它会将 S1 的 nextIndex 设置为 XIndex,也就是S1中,任期5的第一条Log对应的槽位号。

  • 场景二:S1 会返回 XTerm=4,XIndex=1。S2 发现自己有任期4的日志,它会将 S1 的 nextIndex 设置为 XIndex+1。

  • 场景三:S1 会返回 XTerm=-1,XLen=2。这表示 S1 中日志太短了,在冲突的位置没有 Log 条目,Leader 应该回退到 Follower 最后一条 Log 条目的下一条,也就是说 S2 会将 S1 的 nextIndex 设置为 nextIndex-XLen。

LAB 2B 中遇到的问题

最后说一说,在实现 6.824 的 LAB 2B 的过程中遇到的一些 Bug。

Bug 1

在运行 LAB 2B 的测试后,第一个没有通过的是测试 TestRPCBytes2B。错误日志如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
S0 starts running                                                                                                                        
S1 starts running
S2 starts running
S2 starts election
S2 becomes Candidate
S2 change term from 0 to 1
S2 votes for S2
S0 receives VoteRequestRpc from S2
S0 becomes Follower
S0 changes term from 1 to 1
S0 votes for S2
S2 becomes Leader
S1 receives VoteRequestRpc from S2
S1 becomes Follower
S1 changes term from 1 to 1
S1 votes for S2
S2 sends hearbeat
S2 becomes Leader
S2 sends hearbeat

在错误日志中,S2 连续两次成为 Leader,导致 S2 中运行了两个 heartbeat ticker,因此 S2 就会发送比正常的 Leader 的两倍字节数,所以无法通过 TestRPCBytes2B 测试。

根据错误原因,可以定位错误是在请求投票的实现中,代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func (rf *Raft) startElection() {
// 省略。。。。

receivedVotes := 1
for i := range rf.peers {
if i == rf.me {
continue
}
go func(peerId int) {
rf.mu.Lock()
            // 已经不是候选者了,直接返回
if rf.role != CANDIDATE {
rf.mu.Unlock()
return
}
args := &RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
LastLogIndex: rf.lastLogIndex,
LastLogTerm: rf.lastLogTerm,
}
reply := &RequestVoteReply{}
rf.mu.Unlock()

if !rf.sendRequestVote(peerId, args, reply) {
return
}

rf.mu.Lock()
if reply.Term > rf.currentTerm {
rf.convertToFollower(reply.Term)
rf.mu.Unlock()
return
}
if !reply.VoteGranted {
rf.mu.Unlock()
return
}
receivedVotes++
if receivedVotes*2 > len(rf.peers) {
rf.convertToLeader()
}
rf.mu.Unlock()
}(i)
}

// 省略。。。。
}

我们把 Go 程中锁的使用分成两部分,第一部分:第10行(上锁)——第23行(解锁),第二部分:第29行(上锁)——第43行(解锁)。

假设所有的 Go 程都执行完第一部分后,再执行第二部分。这样所有的 Go 程都能发送请求投票RPC,因为此时当前的节点一定是跟随者。这样,当执行第二部分时,在得到的投票数达到半数服务器数量后,每多收到一票,就会调用 rf.convertToLeader() 转化为跟随者,从而导致上述的错误。

解决方法很简单,只需要在第二部分的开头增加一个对当前节点是否还是候选者的判断即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func (rf *Raft) startElection() {
// 省略。。。。

receivedVotes := 1
for i := range rf.peers {
if i == rf.me {
continue
}
go func(peerId int) {
rf.mu.Lock()
            // 已经不是候选者了,直接返回
if rf.role != CANDIDATE {
rf.mu.Unlock()
return
}
args := &RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
LastLogIndex: rf.lastLogIndex,
LastLogTerm: rf.lastLogTerm,
}
reply := &RequestVoteReply{}
rf.mu.Unlock()

if !rf.sendRequestVote(peerId, args, reply) {
return
}

rf.mu.Lock()
// 已经不是候选者了,直接返回
if rf.role != CANDIDATE {
rf.mu.Unlock()
return
}
if reply.Term > rf.currentTerm {
rf.convertToFollower(reply.Term)
rf.mu.Unlock()
return
}
if !reply.VoteGranted {
rf.mu.Unlock()
return
}
receivedVotes++
if receivedVotes*2 > len(rf.peers) {
rf.convertToLeader()
}
rf.mu.Unlock()
}(i)
}

// 省略。。。。
}

Bug 2

遇到的第二个 Bug 是由于使用负数索引导致的运行时崩溃,报错如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
Test (2A): initial election ...
... Passed -- 3.1 3 56 16914 0
Test (2A): election after network failure ...
2022/07/30 18:00:11 0 1 2 0 0 0
panic: runtime error: index out of range [-1]

goroutine 230 [running]:
6.824/raft.(*Raft).sendHearbeat.func1(0xc0000c81e0, 0x1)
/mnt/d/code/projects/distributed-system/6.824/src/raft/raft.go:613 +0x66f
created by 6.824/raft.(*Raft).sendHearbeat
/mnt/d/code/projects/distributed-system/6.824/src/raft/raft.go:571 +0x64
exit status 2
FAIL 6.824/raft 3.157s

索引越界具体发生在代码的613行:

1
if reply.XTerm == rf.log[reply.XIndex-1].Term

通过日志可以知道,发生错误时心跳 RPC 得到的返回消息是:

1
2
3
4
5
6
7
Reply {
Term: 0
Success: false
XTerm: 0
XIndex: 0
XLen: 0
}

根据 Term=0 和 Success=false 可以看出跟随者似乎没有执行任何对心跳消息的处理就直接返回了。可以推断出,跟随者应该是已经”死掉“了,因此才没有执行对心跳消息的处理。

解决方法是,在发送心跳的函数中增加对跟随者节点状态的判断,代码如下:

1
2
3
4
if reply.Term < rf.currentTerm {
rf.mu.Unlock()
return
}

只要 reply.Term 小于 Leader 的任期号,就可以知道跟随者没有对 Leader 发送的心跳进行处理,那么跟随者自然就已经”死掉了“。

Bug 3

第三个遇到的问题是 TestRejoin2B 测试失败,失败的日志如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
S0 sends hearbeat                                                                                                                                                           
S0 sends hearbeat to 2: &{Term:3
LeaderId:0 PrevLogIndex:3
PrevLogTerm:3 Entries:[]
LeaderCommit:3}
S0 sends hearbeat to 1: &{Term:3
LeaderId:0 PrevLogIndex:2
PrevLogTerm:2 Entries:[{Command:104
Term:3 CommandIndex:3}]
LeaderCommit:3}
S2 receives AppendEntriesRpc from
S0
S2 appends logs: []
S2 changes commitIndex from 2 to
3
S2 reply AppendEntriesRpc from
S0: &{Term:3 Success:true XTerm:0
XIndex:0 XLen:0}
S2 applies msg{Command:
%!s(int=104), Index: 3}
S2 change lastApplied from 2 to 3
S1 appends a log: {Command:105
Term:2 CommandIndex:3}
S1 sends hearbeat
S1 sends hearbeat to 2: &{Term:2
LeaderId:1 PrevLogIndex:1
PrevLogTerm:1
Entries:[{Command:103 Term:2
CommandIndex:2} {Command:105
Term:2 CommandIndex:3}]
LeaderCommit:2}
S2 receives AppendEntriesRpc from
S1
S2 reply AppendEntriesRpc from
S1: &Reply{Term: 3, Success:
false}
S1 becomes Follower
S1 changes term from 3 to 3
S1 sends hearbeat to 0: &{Term:3
LeaderId:1 PrevLogIndex:2
PrevLogTerm:2
Entries:[{Command:105 Term:2
CommandIndex:3}] LeaderCommit:2}
S0 receives AppendEntriesRpc from
S1
S0 becomes Follower
S0 appends logs: [{Command:105
Term:2 CommandIndex:3}]
S0 reply AppendEntriesRpc from S1:
&{Term:3 Success:true XTerm:0
XIndex:0 XLen:0}

在上面的日志中,S0 是 Leader 并且和 S2 可以正常通信,S1 被困在一个单独的网络分区中,并且自认为自己是 Leader。

当 S1 恢复正常后,S1 向 S2 发送 AppendEntries,然后收到了拒绝回复,并根据回复中的任期号重置自己的任期号,将自己转化为跟随者。到这里一切都还正常,但是接着 S1 继续向 S0 发送了 AppendEntries,导致 S0 退化成了跟随者,并且被追加了一条来自跟随者的日志,从而导致错误。

错误原因很明显,S1 在发送 AppendEntries 之前没有判断自己是否还是 Leader,因此在 S1 转变为跟随者之后,S1 仍然向 S0 发送了RPC消息。

那么,只需要在发送 AppendEntries 之前加一个小的条件判断即可:

1
2
3
4
5
// 已经不是领导人了,直接返回
if rf.role != LEADER {
rf.mu.Unlock()
return
}

Bug 4

最后遇到的 Bug 是没有通过 TestFailNoAgree2B 测试,错误日志如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
S0 starts running                                                                                                                                                           
S1 starts running
S2 starts running
S3 starts running
S4 starts running
S0 starts election
S0 becomes Candidate
S0 change term from 0 to 1
S0 votes for S0
S1 receives VoteRequestRpc from
S0
S1 changes term from 0 to 1
S1 votes for S0
S2 receives VoteRequestRpc from
S0
S2 changes term from 0 to 1
S2 votes for S0
S0 becomes Leader
S0 sends hearbeat
S0 sends hearbeat to S4: &{Term:1
LeaderId:0 PrevLogIndex:0
PrevLogTerm:0 Entries:[]
LeaderCommit:0}
S4 receives AppendEntriesRpc from
S0
S4 changes term from 0 to 1
S4 appends logs: []
S4 reply AppendEntriesRpc from S0:
&{Term:1 Success:true XTerm:0
XIndex:0 XLen:0}
S0 sends hearbeat to S1: &{Term:1
LeaderId:0 PrevLogIndex:0
PrevLogTerm:0 Entries:[]
LeaderCommit:0}
S1 receives AppendEntriesRpc from
S0
S1 appends logs: []
S1 reply AppendEntriesRpc from
S0: &{Term:1 Success:true XTerm:0
XIndex:0 XLen:0}
S0 sends hearbeat to S3: &{Term:1
LeaderId:0 PrevLogIndex:0
PrevLogTerm:0 Entries:[]
LeaderCommit:0}
S3 starts election
S3 becomes Candidate
S3 change term from 0 to 1
S3 votes for S3
S0 appends a log: {Command:10
Term:1 CommandIndex:1}
S0 sends hearbeat
S0 sends hearbeat to S4: &{Term:1
LeaderId:0 PrevLogIndex:0
PrevLogTerm:0 Entries:[{Command:10
Term:1 CommandIndex:1}]
LeaderCommit:0}
S4 receives AppendEntriesRpc from
S0
S4 appends logs: [{Command:10
Term:1 CommandIndex:1}]
S4 reply AppendEntriesRpc from S0:
&{Term:1 Success:true XTerm:0
XIndex:0 XLen:0}
S0 sends hearbeat to S2: &{Term:1
LeaderId:0 PrevLogIndex:0
PrevLogTerm:0 Entries:[]
LeaderCommit:0}
S2 receives AppendEntriesRpc from
S0
S2 appends logs: []
S2 reply AppendEntriesRpc from
S0: &{Term:1 Success:true XTerm:0
XIndex:0 XLen:0}
S0 receives VoteRequestRpc from S3
S1 receives VoteRequestRpc from
S3
S2 receives VoteRequestRpc from
S3
S4 receives VoteRequestRpc from S3
S0 sends hearbeat to S1: &{Term:1
LeaderId:0 PrevLogIndex:0
PrevLogTerm:0 Entries:[{Command:10
Term:1 CommandIndex:1}]
LeaderCommit:0}
S1 receives AppendEntriesRpc from
S0
S1 appends logs: [{Command:10
Term:1 CommandIndex:1}]
S1 reply AppendEntriesRpc from
S0: &{Term:1 Success:true XTerm:0
XIndex:0 XLen:0}
S0 changes commitIndex from 0 to 1
S0 sends hearbeat to S2: &{Term:1
LeaderId:0 PrevLogIndex:0
PrevLogTerm:0 Entries:[{Command:10
Term:1 CommandIndex:1}]
LeaderCommit:1}
S2 receives AppendEntriesRpc from
S0
S2 appends logs: [{Command:10
Term:1 CommandIndex:1}]
S2 changes commitIndex from 0 to
1
S2 reply AppendEntriesRpc from
S0: &{Term:1 Success:true XTerm:0
XIndex:0 XLen:0}
S0 sends hearbeat to S3: &{Term:1
LeaderId:0 PrevLogIndex:0
PrevLogTerm:0 Entries:[{Command:10
Term:1 CommandIndex:1}]
LeaderCommit:1}
S3 receives AppendEntriesRpc from
S0
S3 becomes Follower
S3 appends logs: [{Command:10
Term:1 CommandIndex:1}]
S3 changes commitIndex from 0 to
1
S3 reply AppendEntriesRpc from
S0: &{Term:1 Success:true XTerm:0
XIndex:0 XLen:0}
S3 receives AppendEntriesRpc from
S0
S3 appends logs: []
S3 reply AppendEntriesRpc from
S0: &{Term:1 Success:true XTerm:0
XIndex:0 XLen:0}
S2 applies msg{Command:
%!s(int=10), Index: 1}
S2 change lastApplied from 0 to 1
S0 applies msg{Command:
%!s(int=10), Index: 1}
S0 change lastApplied from 0 to 1



6.824/raft.(*Raft).applyMsgToStateMachine(0xc0000e02d0)

FAIL 6.824/raft 0.767s

在这个场景中,S0 收到了 S1 和 S2 的投票,在第20行成为了 Leader。之后 S0 在43行发送附加日志给 S3,附加日志的请求内容为:

1
2
S0 sends hearbeat to S1: &{Term:1 LeaderId:0 PrevLogIndex:0 
PrevLogTerm:0 Entries:[] LeaderCommit:0}

之后,S0在51行收到了客户端的请求,并添加了一条日志:

1
S0 appends a log: {Command:10 Term:1 CommandIndex:1}    

接着,S0 在第109行将追加的日志发送给 S3:

1
2
3
4
S0 sends hearbeat to S3: &{
Term:1 LeaderId:0 PrevLogIndex:0
PrevLogTerm:0 Entries:[{Command:10 Term:1 CommandIndex:1}]
    LeaderCommit:1}

但是 S3 在第118行首先收到了第二条附加日志RPC:

1
2
3
4
S3 receives AppendEntriesRpc from S0                                                                  
S3 becomes Follower
S3 appends logs: [{Command:10 Term:1 CommandIndex:1}]
S3 changes commitIndex from 0 to 1

S3 在125行才收到第一条附加日志RPC:

1
2
S3 receives AppendEntriesRpc from S0                                                                                                       
S3 appends logs: []

由于RPC请求的乱序到达,S3 首先会收到第二条附加日志RPC,它会追加一条日志。之后才会收到一条附加日志RPC,这这个RPC的 Entries 是空的,这回导致 S3 覆盖掉之前追加的日志。

但是 S3 此时已经将 CommitIndex 修改为1,之后 S3 会应用 ComminIndex 位置的日志到状态机中,但是对应位置的日志是空的,这就导致了错误。

很明显,在收到第一条附加日志RPC后,我们不能覆盖掉之前的追加的日志,因此在追加日志的实现中应该加一些判断:

1
2
3
4
5
6
7
8
logIndex, entriesIndex := args.PrevLogIndex+1, 0
for logIndex <= rf.lastLogIndex && entriesIndex < len(args.Entries) && rf.log[logIndex-1] == args.Entries[entriesIndex] {
logIndex++
entriesIndex++
}
if !(entriesIndex == len(args.Entries) && logIndex <= rf.lastLogIndex && rf.log[logIndex-1].Term == args.Term) {
rf.log = append(rf.log[:logIndex-1], args.Entries[entriesIndex:]...)
}

首先,我们扫描发送过来的日志和本地的日志,找到第一条不匹配的日志的位置X。在之前的实现中,没有第6行的判断,直接执行第7行的追加日志操作,这可能导致已有的日志被覆盖。

如果剩下的需要追加的日志为空,并且X对应的位置存在日志,如果X对应的日志的任期与当前的 Leader 的任期号相等,说明X对应的日志是当前Leader发送过来的,那么就不应该覆盖掉它。

除了上述情况,都可以直接执行 rf.log = append(rf.log[:logIndex-1], args.Entries[entriesIndex:]...) 把剩下的日志复制过去。

在解决完所有 Bug 后并发跑500次测试,全部顺利通过:

参考


Raft(三):日志复制
https://night-cruise.github.io/2022/07/31/Raft-3/
作者
Night Cruise
发布于
2022年7月31日
许可协议