MIT6.5840-2023-Lab2D: Raft-Log Compaction
2023-12-15 17:04:47
前置知识
见上一篇 Lab2A。
实验内容
实现 RAFT,分为四个 part:leader election、log、persistence、log compaction。
实验环境
OS:WSL-Ubuntu-18.04
golang:go1.17.6 linux/amd64
Part 2D: log compaction
重新启动的服务器会遍历完整的 Raft 日志以恢复其状态。然而,对于长期运行的服务来说,永远记住完整的Raft日志是不现实的。这部分 lab 修改 Raft 以实现快照,此时 Raft 将丢弃快照之前的日志条目。
基本按照此图实现:
在单个 InstallSnapshot RPC 中发送整个快照, 不要实现图中的偏移机制来分割快照。
rpc格式
type InstallSnapshotArgs struct {
Term int
LeaderId int
LastIncludedIndex int
LastIncludedTerm int
Data []byte
}
type InstallSnapshotReply struct {
Term int
}
充分利用持久化的 log 日志的第0个条目,用来保存 LastIncludedIndex\LastIncludedTerm 。
需要更改之前所有用到log的地方,因为使用了快照保存前部分的日志,日志索引都通过条目新增的 Index 属性保存。
InstallSnapshot
func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
if args.Term > rf.currentTerm { // all server
rf.state = Follower
rf.currentTerm = args.Term
rf.votedFor = -1
rf.resetTimeout()
rf.persist()
}
if args.Term < rf.currentTerm {
reply.Term = rf.currentTerm
return
}
rf.resetTimeout()
reply.Term = rf.currentTerm
if rf.commitIndex >= args.LastIncludedIndex { // snapshot timeout
return
}
if rf.getLastIndex() <= args.LastIncludedIndex {
rf.log = make([]Entry, 1)
} else {
rf.log = rf.log[args.LastIncludedIndex-rf.getFirstIndex():]
}
rf.log[0].Term = args.LastIncludedTerm
rf.log[0].Index = args.LastIncludedIndex
rf.log[0].Command = nil
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.currentTerm)
e.Encode(rf.votedFor)
e.Encode(rf.log)
raftstate := w.Bytes()
rf.persister.Save(raftstate, args.Data)
rf.lastApplied, rf.commitIndex = args.LastIncludedIndex, args.LastIncludedIndex
rf.applyCh <- ApplyMsg{
SnapshotValid: true,
Snapshot: args.Data,
SnapshotTerm: args.LastIncludedTerm,
SnapshotIndex: args.LastIncludedIndex,
}
log.Println(rf.me, "installSnapshot")
}
发送部分
注意在每次 leader 失败重发日志的时候,都需要先检查 rf.nextIndex[i] <= lastIncludedIndex
来给 follower 安装快照。同时不要忘记在发送 rpc 前后检查自身的状态是否发生改变!
// InstallSnapshot begin
lastIncludedIndex := rf.getFirstIndex()
if rf.nextIndex[i] <= lastIncludedIndex {
args := InstallSnapshotArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
LastIncludedIndex: lastIncludedIndex,
LastIncludedTerm: rf.getFirstTerm(),
Data: rf.persister.ReadSnapshot(),
}
rf.mu.Unlock()
reply := InstallSnapshotReply{}
if ok := rf.sendInstallSnapshot(i, &args, &reply); ok {
rf.mu.Lock()
log.Println("installSnapshot", rf.me, "sendInstallSnapshot to", i)
if rf.state != Leader || args.Term != rf.currentTerm || reply.Term < rf.currentTerm {
rf.mu.Unlock()
return
}
if reply.Term > rf.currentTerm {
rf.state = Follower
rf.currentTerm = reply.Term
rf.votedFor = -1
rf.resetTimeout()
rf.persist()
rf.mu.Unlock()
return
}
rf.matchIndex[i] = int(math.Max(float64(rf.matchIndex[i]), float64(args.LastIncludedIndex)))
rf.nextIndex[i] = rf.matchIndex[i] + 1
// rf.commit()
rf.mu.Unlock()
}
return
} // InstallSnapshot end
创建快照
func (rf *Raft) Snapshot(index int, snapshot []byte) {
// Your code here (2D).
rf.mu.Lock()
defer rf.mu.Unlock()
lastIncludedIndex := rf.getFirstIndex()
if lastIncludedIndex >= index {
return
}
// index: fake index
// index-lastIncludedIndex: real index
rf.log = rf.log[index-lastIncludedIndex:]
rf.log[0].Command = nil
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
e.Encode(rf.currentTerm)
e.Encode(rf.votedFor)
e.Encode(rf.log)
raftstate := w.Bytes()
rf.persister.Save(raftstate, snapshot)
}
注意事项
- 死锁,强烈推荐使用
https://github.com/sasha-s/go-deadlock
,能够找出目前持有锁的地方在哪; - 往 rf.applyCh 中写数据要提前释放锁?这个需要再研究一下;
- rpc 前后状态判断;
- 加了快照之后,所有日志的索引都变了,只要记住一点,所有保存的索引都是理论上的索引;
- 还有很多细节忘记了,这个part卡了很久,直接看代码
https://github.com/BeGifted/MIT6.5840-2023
怎么写的可能会直观一点;
实验结果
跑一次 150 多秒,比很多其他博客的方案快 100 多秒,猜测可能是将心跳发送和日志复制分开来处理的结果,心跳是 leader 定时发送的,日志复制则是当 service 向 leader 添加日志后,leader 单独执行的 goroutine。具体还是看代码吧。
文章来源:https://blog.csdn.net/qq_43680965/article/details/135017459
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!