当前位置: 首页 > news >正文

6.5840 Lab 3: Raft

论文很重要 raft-zh_cn/raft-zh_cn.md at master · maemual/raft-zh_cn · GitHub

Part 3A: leader election (moderate)

十次test都过了

实现 Raft 的领导者选举和心跳机制(AppendEntries RPC,无日志条目)。第 3A 部分的目标是实现以下功能:

  1. 集群中能够成功选举出一个领导者。
  2. 如果没有节点故障,当前领导者能够保持其领导地位。
  3. 如果当前领导者发生故障,或者其与其他节点之间的通信中断,集群能够选举出新的领导者接替其位置。

首先定义raft结构体字段并初始化,论文中全部给出了

type LogEntry struct {Term         int         // 用于区分不同的Leader任期CommandValid bool        // 当前指令是否有效。如果无效,follower 可以拒绝复制Command      interface{} // 表示可以存储任意类型的指令。
}type Role intconst (Leader Role = iotaFollowerCandidate
)// A Go object implementing a single Raft peer.
type Raft struct {mu        sync.Mutex          // Lock to protect shared access to this peer's statepeers     []*labrpc.ClientEnd // RPC end points of all peerspersister *Persister          // Object to hold this peer's persisted stateme        int                 // this peer's index into peers[]dead      int32               // set by Kill()// Your data here (3A, 3B, 3C).// Look at the paper's Figure 2 for a description of what// state a Raft server must maintain.log []LogEntrycurrentTerm     intvotedFor        introle            RoleelectionStart   time.TimeelectionTimeout time.DurationcommitIndex int //已知已提交的最高的日志条目的索引(初始值为0,单调递增)lastApplied int //已知已应用到状态机的日志条目的索引(初始值为0,单调递增)nextIndex  []int //对于每一台服务器,发送到该服务器的下一个日志条目的索引(初始值为领导人最后的日志条目的索引+1)matchIndex []int //对于每一台服务器,已知的已经复制到该服务器的最高日志条目的索引(初始值为0,单调递增)
}func Make(peers []*labrpc.ClientEnd, me int,persister *Persister, applyCh chan ApplyMsg) *Raft {rf := &Raft{}rf.peers = peersrf.persister = persisterrf.me = merf.role = Followerrf.electionStart = time.Now()rf.votedFor = -1rf.currentTerm = 0rf.commitIndex = 0rf.lastApplied = 0rf.log = make([]LogEntry, 1)rand.Seed(time.Now().UnixNano())rf.electionTimeout = time.Duration(450+rand.Intn(150)) * time.Millisecondrf.nextIndex = make([]int, len(rf.peers))rf.matchIndex = make([]int, len(rf.peers))// Your initialization code here (3A, 3B, 3C).// initialize from state persisted before a crashrf.readPersist(persister.ReadRaftState())// start ticker goroutine to start electionsgo rf.ticker()return rf
}

在ticker()中,题目提到"不要使用Go的 time.Timer 或 time.Ticker ,它们很难正确使用",所以用原本代码框架中的time.Sleep()来实现定时操作,sleep的时间也是leader心跳的间隔时间,对于节点选举超时的定时器,用time.Since(rf.electionStart) >= rf.electionTimeout实现。

对不同的role的节点,执行不同操作,leader是心跳,其他则是开始选举,这里使用一把大锁保平安。

func (rf *Raft) ticker() {for rf.killed() == false {// Your code here (3A)// Check if a leader election should be started.rf.mu.Lock()// 如果是 Follower 或 Candidate,检查是否超时if rf.role == Follower {if time.Since(rf.electionStart) >= rf.electionTimeout {// 超时,开始选举rf.BecomeCandidate()rf.startElection()}}if rf.role == Candidate {if time.Since(rf.electionStart) >= rf.electionTimeout {rf.electionStart = time.Now()rf.startElection()}}// 如果是 Leader,定期发送心跳if rf.role == Leader {rf.sendHeartbeat()}rf.mu.Unlock()// pause for a random amount of time between 50 and 350// milliseconds.ms := 40 + (rand.Int63() % 100)time.Sleep(time.Duration(ms) * time.Millisecond)}
}

实现追加条目(AppendEntries)RPC以及请求投票(RequestVote)RPC,参数和返回值的字段以及方法的逻辑在论文中均有记录,这里要注意的是什么时候要进行选举超时定时器的重置rf.electionStart = time.Now(),对每个节点发送心跳和请求投票都需要用groutine,不同发送操作是异步的。若成为Leader,则之前必须是候选人。

在节点处理投票请求时注意对方和自己都是候选人的情况

请求投票(RequestVote)RPC

type RequestVoteArgs struct {Term         int // 候选人的任期号CandidateID  int // 请求选票的候选人的 IDLastLogIndex int // 候选人的最后日志条目的索引值LastLogTerm  int // 候选人的最后日志条目的任期值
}type RequestVoteReply struct {Term        int  // 当前任期号,以便于候选人去更新自己的任期号VoteGranted bool // 候选人赢得了此张选票时为真
}func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {// Your code here (3A, 3B).rf.mu.Lock()defer rf.mu.Unlock()if args.Term < rf.currentTerm {reply.Term = rf.currentTermreply.VoteGranted = falsereturn}if args.Term > rf.currentTerm {rf.BecomeFollower(args.Term, -1)}if rf.votedFor == -1 || rf.votedFor == args.CandidateID {DPrintf("candidate %d get vote from %d", args.CandidateID, rf.me)rf.votedFor = args.CandidateIDrf.electionStart = time.Now()reply.VoteGranted = true} else {//处理两个节点同为候选人的情况reply.VoteGranted = false}
}func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {ok := rf.peers[server].Call("Raft.RequestVote", args, reply)return ok
}

 追加条目(AppendEntries)

type AppendEntriesArgs struct {Term         int        // 领导人的任期号LeaderID     int        // 领导人的 IDPrevLogIndex int        // 紧邻新日志条目之前的那个日志条目的索引值PrevLogTerm  int        // 紧邻新日志条目之前的那个日志条目的任期值Entries      []LogEntry // 准备存储的日志条目(表示心跳时为空;一次性发送多个是为了提高效率)LeaderCommit int        // 领导人已经提交的日志的索引值
}type AppendEntriesReply struct {Term    int  // 当前的任期号,用于领导人更新自己Success bool // 如果 Follower 包含了匹配上 `PrevLogIndex` 和 `PrevLogTerm` 的日志条目时为 true
}func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {rf.mu.Lock()defer rf.mu.Unlock()if args.Term < rf.currentTerm {reply.Term = rf.currentTermreply.Success = falsereturn}rf.votedFor = args.LeaderIDrf.electionStart = time.Now()reply.Success = truereturn}func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {return rf.peers[server].Call("Raft.AppendEntries", args, reply)
}

开始发送心跳和开始选举

func (rf *Raft) BecomeFollower(term, votedFor int) {rf.role = Followerrf.currentTerm = termrf.votedFor = votedForrf.electionStart = time.Now()DPrintf("Pod %d become follower by %d when term %d ", rf.me, votedFor, rf.currentTerm)
}func (rf *Raft) BecomeCandidate() {rf.role = Candidaterf.currentTerm++rf.votedFor = rf.merf.electionStart = time.Now()DPrintf("Pod %d become candidate when term %d ", rf.me, rf.currentTerm)
}func (rf *Raft) BecomeLeader() {rf.role = Leaderfor i := range rf.peers {rf.nextIndex[i] = len(rf.log) // 领导者的 nextIndex 为日志最后一条之后rf.matchIndex[i] = 0          // 初始 matchIndex 为 0}DPrintf("Pod %d become leader when term %d ", rf.me, rf.currentTerm)
}func (rf *Raft) sendHeartbeat() {term := rf.currentTermleaderCommit := rf.commitIndex// 遍历所有 Follower,发送 AppendEntries RPCfor i := range rf.peers {if i != rf.me {go func(server int) {prevLogIndex := rf.nextIndex[server] - 1prevLogTerm := 0if prevLogIndex >= 0 {prevLogTerm = rf.log[prevLogIndex].Term}args := &AppendEntriesArgs{Term:         term,LeaderID:     rf.me,PrevLogIndex: prevLogIndex,PrevLogTerm:  prevLogTerm,Entries:      nil, // 心跳中无日志条目LeaderCommit: leaderCommit,}reply := &AppendEntriesReply{}if rf.sendAppendEntries(server, args, reply) {if !reply.Success && reply.Term > rf.currentTerm {rf.BecomeFollower(reply.Term, -1)}}}(i)}}rf.electionStart = time.Now()
}func (rf *Raft) startElection() {// 统计自己的票数votes := 1term := rf.currentTerm// 获取当前日志信息lastLogIndex := len(rf.log) - 1lastLogTerm := 0if lastLogIndex >= 0 {lastLogTerm = rf.log[lastLogIndex].Term}// 遍历所有其他节点,发送 RequestVote RPCfor i := range rf.peers {if i != rf.me {go func(server int) {args := &RequestVoteArgs{Term:         term,CandidateID:  rf.me,LastLogIndex: lastLogIndex,LastLogTerm:  lastLogTerm,}reply := &RequestVoteReply{}if rf.sendRequestVote(server, args, reply) {if reply.Term > rf.currentTerm || !reply.VoteGranted {rf.BecomeFollower(reply.Term, -1)return}if reply.VoteGranted {votes++if votes > len(rf.peers)/2 && rf.role == Candidate {rf.BecomeLeader()rf.sendHeartbeat()}}}}(i)}}
}

Part 3B: log (hard)


抱歉!

由于本人实习的原因,这个轮子项目搁浅了,估计不会再更新


http://www.mrgr.cn/news/95340.html

相关文章:

  • Jmeter分布式集群压测
  • C++学习之QT中HTTP正则表达式
  • 【算法】DFS、BFS、floodfill、记忆化搜索
  • slq-labs日志
  • 调用feapder作为子程序时setting.py文件不起作用
  • 基于 EMA12 指标结合 iTick 外汇报价 API 、股票报价API、指数报价API的量化策略编写与回测
  • 实用工具--OfficeAI 助手 v0.3.20(长期免费,2025-03-18 本地支持WPSWord联动)
  • AsyncHttpClient使用说明书
  • MySQL0基础学习记录-下载与安装
  • RocketMQ面试题:基础部分
  • go命令使用
  • 超硬核区块链算法仿真:联盟链PBFT多线程仿真实现 :c语言完全详解版
  • 【Linux】应用层自定义协议 + 序列化和反序列化
  • 【leetcode hot 100 994】腐烂的橘子
  • 算法及数据结构系列 - 二分查找
  • PocketBase: Small but mighty backend in a single file
  • 【AI】AI编程助手:Cursor、Codeium、GitHub Copilot、Roo Cline、Tabnine
  • 论文阅读:2024-NAACL Semstamp、2024-ACL (Findings) k-SemStamp
  • #pandas #python#数据标注 pd.crosstab()
  • 算法刷题记录——LeetCode篇(1) [第1~100题](持续更新)