MIT6.5840-2023-Lab2D: Raft-Log Compaction

2023-12-15 17:04:47

前置知识

见上一篇 Lab2A。

实验内容

实现 RAFT,分为四个 part:leader election、log、persistence、log compaction。

实验环境

OS:WSL-Ubuntu-18.04
golang:go1.17.6 linux/amd64

Part 2D: log compaction

重新启动的服务器会遍历完整的 Raft 日志以恢复其状态。然而,对于长期运行的服务来说,永远记住完整的Raft日志是不现实的。这部分 lab 修改 Raft 以实现快照,此时 Raft 将丢弃快照之前的日志条目。
基本按照此图实现:
在这里插入图片描述
在单个 InstallSnapshot RPC 中发送整个快照, 不要实现图中的偏移机制来分割快照。

rpc格式

type InstallSnapshotArgs struct {
	Term              int
	LeaderId          int
	LastIncludedIndex int
	LastIncludedTerm  int
	Data              []byte
}

type InstallSnapshotReply struct {
	Term int
}

在这里插入图片描述
充分利用持久化的 log 日志的第0个条目,用来保存 LastIncludedIndex\LastIncludedTerm 。
需要更改之前所有用到log的地方,因为使用了快照保存前部分的日志,日志索引都通过条目新增的 Index 属性保存。

InstallSnapshot

func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
	rf.mu.Lock()
	defer rf.mu.Unlock()

	if args.Term > rf.currentTerm { // all server
		rf.state = Follower
		rf.currentTerm = args.Term
		rf.votedFor = -1
		rf.resetTimeout()
		rf.persist()
	}

	if args.Term < rf.currentTerm {
		reply.Term = rf.currentTerm
		return
	}

	rf.resetTimeout()
	reply.Term = rf.currentTerm
	if rf.commitIndex >= args.LastIncludedIndex { // snapshot timeout
		return
	}

	if rf.getLastIndex() <= args.LastIncludedIndex {
		rf.log = make([]Entry, 1)
	} else {
		rf.log = rf.log[args.LastIncludedIndex-rf.getFirstIndex():]
	}
	rf.log[0].Term = args.LastIncludedTerm
	rf.log[0].Index = args.LastIncludedIndex
	rf.log[0].Command = nil

	w := new(bytes.Buffer)
	e := labgob.NewEncoder(w)
	e.Encode(rf.currentTerm)
	e.Encode(rf.votedFor)
	e.Encode(rf.log)
	raftstate := w.Bytes()
	rf.persister.Save(raftstate, args.Data)

	rf.lastApplied, rf.commitIndex = args.LastIncludedIndex, args.LastIncludedIndex
	rf.applyCh <- ApplyMsg{
		SnapshotValid: true,
		Snapshot:      args.Data,
		SnapshotTerm:  args.LastIncludedTerm,
		SnapshotIndex: args.LastIncludedIndex,
	}
	log.Println(rf.me, "installSnapshot")
}

发送部分

注意在每次 leader 失败重发日志的时候,都需要先检查 rf.nextIndex[i] <= lastIncludedIndex 来给 follower 安装快照。同时不要忘记在发送 rpc 前后检查自身的状态是否发生改变!

// InstallSnapshot begin
				lastIncludedIndex := rf.getFirstIndex()
				if rf.nextIndex[i] <= lastIncludedIndex {
					args := InstallSnapshotArgs{
						Term:              rf.currentTerm,
						LeaderId:          rf.me,
						LastIncludedIndex: lastIncludedIndex,
						LastIncludedTerm:  rf.getFirstTerm(),
						Data:              rf.persister.ReadSnapshot(),
					}
					rf.mu.Unlock()
					reply := InstallSnapshotReply{}
					if ok := rf.sendInstallSnapshot(i, &args, &reply); ok {
						rf.mu.Lock()
						log.Println("installSnapshot", rf.me, "sendInstallSnapshot to", i)
						if rf.state != Leader || args.Term != rf.currentTerm || reply.Term < rf.currentTerm {
							rf.mu.Unlock()
							return
						}

						if reply.Term > rf.currentTerm {
							rf.state = Follower
							rf.currentTerm = reply.Term
							rf.votedFor = -1
							rf.resetTimeout()
							rf.persist()
							rf.mu.Unlock()
							return
						}

						rf.matchIndex[i] = int(math.Max(float64(rf.matchIndex[i]), float64(args.LastIncludedIndex)))
						rf.nextIndex[i] = rf.matchIndex[i] + 1
						// rf.commit()
						rf.mu.Unlock()

					}
					return
				} // InstallSnapshot end

创建快照

func (rf *Raft) Snapshot(index int, snapshot []byte) {
	// Your code here (2D).
	rf.mu.Lock()
	defer rf.mu.Unlock()
	lastIncludedIndex := rf.getFirstIndex()
	if lastIncludedIndex >= index {
		return
	}

	// index: fake index
	// index-lastIncludedIndex: real index
	rf.log = rf.log[index-lastIncludedIndex:]
	rf.log[0].Command = nil

	w := new(bytes.Buffer)
	e := labgob.NewEncoder(w)
	e.Encode(rf.currentTerm)
	e.Encode(rf.votedFor)
	e.Encode(rf.log)
	raftstate := w.Bytes()
	rf.persister.Save(raftstate, snapshot)
}

注意事项

  • 死锁,强烈推荐使用 https://github.com/sasha-s/go-deadlock,能够找出目前持有锁的地方在哪;
  • 往 rf.applyCh 中写数据要提前释放锁?这个需要再研究一下;
  • rpc 前后状态判断;
  • 加了快照之后,所有日志的索引都变了,只要记住一点,所有保存的索引都是理论上的索引;
  • 还有很多细节忘记了,这个part卡了很久,直接看代码 https://github.com/BeGifted/MIT6.5840-2023 怎么写的可能会直观一点;

实验结果

跑一次 150 多秒,比很多其他博客的方案快 100 多秒,猜测可能是将心跳发送和日志复制分开来处理的结果,心跳是 leader 定时发送的,日志复制则是当 service 向 leader 添加日志后,leader 单独执行的 goroutine。具体还是看代码吧。
在这里插入图片描述

文章来源:https://blog.csdn.net/qq_43680965/article/details/135017459
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。