MIT 6.5840 分布式系统
MapReduce
要把大象装冰箱(处理 PB 级数据),分三步:
- Split (切分): 把大文件切成无数个 64MB 的小块 (Input Splits)。
- Map (映射): 把小块转化成 <Key, Value> 对。
- Reduce (归约): 把相同 Key 的数据聚在一起处理。
场景: 统计单词频率 (Word Count)。
Map 阶段:
- Worker A 读文件 1,发现 "Apple",输出 <Apple, 1>。
- Worker B 读文件 2,发现 "Apple",输出 <Apple, 1>。
- 问题: 这两个 <Apple, 1> 分散在不同机器上,Reduce 怎么求和?
Partitioning (分区):
- 系统预先规定:hash("Apple") % nReduce。假设结果是 0。
- 所有 Worker 都知道:只要遇到 "Apple",就扔进 0 号桶。
- 只要遇到 "Banana",就扔进 1 号桶。
Shuffle 阶段:
- Reduce Worker 0 启动时,它会去所有 Map Worker 那里,把 0 号桶 的数据全部拉过来。
- 这样,所有的 "Apple" 最终都会汇聚到 Reduce Worker 0 手里。
Coordinator (以前叫 Master):
- 数量:只有 1 个进程。
- 身份: 总包工头、大管家。
- 职责: 它不干累活(不读写数据),只负责分配任务、记账(谁在干什么、干完没有)和计时(谁超时了)。
Worker:
- 数量:N 个进程 (由你决定)。
- 在测试脚本中,通常会启动 3~5 个 Worker 进程并发跑。
- 身份: 通用打工人。
- 职责: 也就是你写的 worker.go。它启动后会是个死循环,不断问 Coordinator:“老板,现在有啥活?”
- 如果处于 Map 阶段,Coordinator 会给它派 Map 任务。
- 如果 Map 阶段全结束了,Coordinator 会给它派 Reduce 任务。
- Worker 自己不知道也不关心现在的阶段,它只听命行事。
[ 只有 1 个进程 ]
+-----------------+
| Coordinator | <---- 保存着任务表 (Excel)
+-----------------+ 1. Map任务状态: [Done, Idle, InProgress...]
^ ^ ^ | 2. Reduce任务状态: [Idle, Idle...]
: : : |
(RPC调用): : : | (RPC回复: "去做 Map任务 #3")
"求派活" : : : |
: : : v
+-----------+ +-----------+ +-----------+
| Worker A | | Worker B | | Worker C | <--- N 个进程
+-----------+ +-----------+ +-----------+
| | |
(1) 读取输入文件 (1) 读取输入 (可能是闲置
pg-xx.txt pg-yy.txt 或正在处理)
| |
[执行 MapF] [执行 MapF]
| |
(2) 生成中间文件 (2) 生成中间文件
mr-3-0 mr-4-0
mr-3-1 mr-4-1
... ...
| |
+---------------+
|
-------------------------------------
| W A I T (同步屏障) | <--- Map全部做完才能进下一步
-------------------------------------
|
+-----------+ +-----------+
| Worker A | | Worker B | <--- 还是那批人,现在转职做 Reduce
+-----------+ +-----------+
| |
(3) 读取中间文件 (3) 读取中间文件
所有 mr-*-0 所有 mr-*-1
| |
[执行 ReduceF] [执行 ReduceF]
| |
(4) 写最终结果 (4) 写最终结果
mr-out-0 mr-out-1对于worker 既要执行map reduce 任务 也要向coordinator发送心跳包
使用两个goruntimes
// 创建一个channel用于通知心跳goroutine任务已完成
doneChan := make(chan bool)
// 启动心跳goroutine
go func() {
for {
select {
case <-doneChan:
// worker退出
return
case <-time.After(1 * time.Second): // 每秒发送一次心跳
SendHeartbeat(&worker)
}
}
}()单独一个线程 定时发送心跳包 如果收到doneChan的话就退出 这个chennel是原来的线程与心跳包线程进行通信的通道
并行map reduce的结果总是小于线性 map reduce
< yet 365
---
> yet 383
21617c22072
< yielded 9
---
> yielded 12
21623a22079
> yoked 1
21625c22081
< yonder 31
---
> yonder 33
21627,21630c22083,22086
< you 6270
< young 321
< younger 31
< youngest 10
---
> you 7005
> young 365
> younger 36
> youngest 28
21632c22088
< your 1305
---
> your 1477
21634,21637c22090,22093
< yours 32
< yourself 155
< yourselves 11
< youth 61
---
> yours 34
> yourself 172
> yourselves 14
> youth 105
21650c22106虽然我最初在所有coordinator的方法上都加了大锁 但是还是出现了数据丢失的问题 可以看出:结果数据均小于应有的数字
// worker is not busy ask for a task
RequestTask(&worker)
if !worker.has_work_ {
time.Sleep(time.Second)
continue
}
if !worker.is_reduce_task_ {
// map task
file, err := os.Open(worker.map_file_)
if err != nil {
log.Fatalf("cannot open %v", worker.map_file_)
}
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", worker.map_file_)
}
file.Close()
//fmt.Println(string(content[0:20]))
kva := mapf(worker.map_file_, string(content))
TaskDone(&worker)
if(!worker.can_write_files_) {
// abort all tasks
//log.Println("abort")
continue
}
// write to files
//log.Println("mid write to files")
ofiles := []*os.File{}
for i := 0; i < int(worker.nReduce_); i++ {
file_name := "mr-mid-" + strconv.Itoa(int(worker.map_id_))+ "-" + strconv.Itoa(i)
ofile, _ := os.Create(file_name)
ofiles = append(ofiles, ofile)
}
for _, k := range kva {
fmt.Fprintf(ofiles[ihash(k.Key) % int(worker.nReduce_)], "%v %v\n", k.Key, k.Value)
}
for _, ofile := range ofiles {
ofile.Close()
}目前为了实现abort逻辑 我选择了 先完成map或者reduce任务 然后通知coordinator 如果coordinator认定了这个worker就是应该完成操作的worker(不是之前被放弃的——任务已经分配给别人了)那么就能写文件 但是这里出现了竞态条件 对于coordinator 他认为任务完成了 但是其实文件还没有真正被写完 如果这个时候进入reduce 那么就会读取还没有完全写完的脏数据 导致最后统计结果偏小
Worker 告诉 Coordinator "我做完了" 和 Worker "实际把数据落盘" 之间的时间差 导致了时序错乱
解决方法 使用原子文件重命名
os.Rename("tmp-mr-out-1", "mr-out-1") (这是一个原子瞬间动作)
只要 Coordinator 认为任务完成了,磁盘上的文件就一定是完整的
如果这个worker 突然崩溃了
假如一个分布式节点突然断电 如果原来是直接写在结果文件中的 那么会留下一个损坏的文件
解决方法 还是原子文件重命名
- 崩溃发生在写临时文件时:
- 如果 Worker 在写入 mr-mid-tmp-xxx 的过程中挂掉了(断电、网络断开、进程崩溃)。
- 磁盘上只会留下一个写了一半的垃圾文件(Garbage/Partial File)。
- 但是,最终的文件名 mr-mid-X-Y 根本就没有被创建(或者如果之前有旧的,也没被碰过)。
- Coordinator 发现这个 Worker 超时(不再发送心跳),就会把任务重新分配给另一个 Worker。
- 新的 Worker 会重新创建一个新的临时文件开始写。
- 结论:系统状态是干净的,没有脏数据污染最终结果。
- 崩溃发生在重命名那一瞬间:
- 在操作系统(尤其是 POSIX 标准的文件系统)中,Rename 是原子的。
- 这意味着:要么改名成功,文件瞬间变成最终文件;要么完全没发生,文件还是临时文件名。
- 绝对不会出现“改名改了一半”或者“文件损坏”的情况。
- 这就像一个开关,只有 0 和 1,没有中间状态。
这就是所谓的“Commit Point”(提交点)
在 MapReduce Worker 代码中,os.Rename 就相当于数据库事务中的 COMMIT 命令:
- Rename 之前:所有的计算和 IO 都是“草稿”,随时可以扔掉,对外界(Coordinator 和 Reducer)不可见。
- Rename 之后:数据瞬间生效,对外界可见。
如果worker1被抛弃了 新指派的worker2完成了工作 这时worker1又完成了怎么办
paper中写道
We rely on atomic commits of map and reduce task outputs to achieve this property. Each in-progress task writes its output to private temporary files. A reduce task produces one such file, and a map task produces R such files (one per reduce task). When a map task completes, the worker sends a message to the master and includes the names of the R temporary files in the message. If the master receives a completion message for an already completed map task, it ignores the message. Otherwise, it records the names of R files in a master data structure. When a reduce task completes, the reduce worker atomically renames its temporary output file to the final output file. If the same reduce task is executed on multiple machines, multiple rename calls will be executed for the same final output file. We rely on the atomic rename operation provided by the underlying file system to guarantee that the final file system state contains just the data produced by one execution of the reduce task.
对于map 系统记录下第一个发出done的worker创建的那些文件 忽略后面的
对于reduce每次都会先创建临时文件 然后进行原子重命名 操作系统保证了最后的结果是一次操作的输出
coordinator的幂等性 和 map reduce方法的确定性
Coordinator 检查 Task X 的状态。发现已经是 Completed 了,于是什么都不做(或者仅返回一个确认,但不修改内部状态)。
公式表达:
HandleDone(Task X) = HandleDone(HandleDone(Task X))
对于 map 和 reduce 方法来说 不包含随机数 每次执行的结果一定是一样的所以就可以把一个任务分配给多个worker执行 不会影响最后结果
Google 的实现指出,如果你的函数是不确定的(比如包含随机数),MapReduce 框架只能保证输出结果是“某一次”执行的结果,但无法保证多次运行结果一致(弱一致性)。但在标准使用场景下,我们都假设函数是确定性的,这样就可以放心地让 Straggler 和 Backup Worker 互相覆盖文件,而不用担心数据错乱
RAFT
在 CAP 定理中,Raft 是一个典型的 CP 系统:它优先保证一致性,在无法达成共识时,宁可牺牲可用性(A)。
Raft 是如何维持 CAP 的?
1. 强一致性 (Consistency)
Raft 保证的是 线性一致性(Linearizability),就像只有一台机器在工作一样:
- 日志匹配原则:如果两个节点的日志在某个位置 Index 和 Term 都相同,那么它们之前的日志也一定相同。
- 选举约束:不包含最新已提交日志的节点根本当不上 Leader。
- 过半数原则:任何一个已提交的日志,必然存在于过半数节点上。而任何两个“过半数”集合必有交集。这保证了新 Leader 选举时一定能看到旧 Leader 提交过的痕迹。
2. 分区容错性 (Partition Tolerance)
Raft 天生就是为了应对网络分区设计的:
- 脑裂处理:如果 5 个节点被切成了 3 个(分区 A)和 2 个(分区 B)。
- 分区 B 凑不够 3 票,选不出 Leader,无法工作。
- 分区 A 能选出 Leader,继续工作。
- 当网络恢复,分区 B 的节点发现分区 A 的 Term 更高,会立刻放弃抵抗,同步分区 A 的日志。
- 网络隔离:即使消息丢了、延迟了、乱序了,Raft 的序列号(Index)和任期(Term)都能确保数据最终对齐。
3. 可用性 (Availability) —— Raft 牺牲了它
Raft 在某些情况下会暂时不可用,以保全数据的一致性:
- 选举空窗期:当 Leader 挂了,到新 Leader 选出来的几十毫秒内,系统无法处理请求。
- 少数派分区:如果你所在的网络分区凑不够半数,系统会直接报错,拒绝服务。
- 这就是为什么说 Raft 是 CP 而不是 AP。 它坚信:“宁可不给结果,也绝不给错的结果。”
什么时候使用AP 什么时候使用CP
A. 使用 CP 的场景(一致性至上)
如果你觉得“宁可系统暂时不可用,也绝对不能给错数据”,就选 CP。
- 金融/支付:你的银行余额。如果因为断网导致你明明花了钱但余额没减,银行就亏死了。
- 分布式锁/配置中心:如 etcd (Kubernetes 的大脑)、ZooKeeper。如果两个节点同时抢到了同一个锁,或者读到了不同的配置,系统会发生逻辑灾难。
- 数据库元数据:TiDB 等分布式数据库的索引信息。
B. 使用 AP 的场景(可用性至上)
如果你觉得“系统必须秒回,数据旧一点点没关系”,就选 AP。
- 社交媒体:你在微博发个状态,由于网络分区,你的朋友晚了 5 秒才看到,或者点赞数暂时少显示了几个。这完全没问题,但如果微博因为断网直接打不开,用户就会跑光。
- 购物车/推荐系统:亚马逊发现,如果结账系统因为一致性检查慢了 1 毫秒,就会损失数百万美元。所以他们允许购物车数据暂时不一致,等联网后再合并。
- 缓存系统:如 CDN、DNS。你修改了域名解析,全球生效需要时间(不一致),但这比整个互联网搜不到该域名要好得多。
RAFT 的工作流程
第一阶段:初始化与选举(寻找权威)
所有节点启动时都是 Follower。
- 心跳超时:每个节点都有一个随机的闹钟。谁的闹钟先响,谁就变身 Candidate。
func (rf *Raft) ticker() {
for rf.killed() == false {
rf.mu.Lock()
state := rf.state
rf.mu.Unlock()
if state == StateLeader {
go rf.BroadCastAppendEntries()
time.Sleep(100 * time.Millisecond)
} else {
rf.mu.Lock()
elapsed := time.Since(rf.lastActiveTime)
timeout := rf.electionTimeout
rf.mu.Unlock()
if elapsed > timeout {
go rf.startElection()
}
time.Sleep(10 * time.Millisecond)
}
}
}如果是leader 就发送心跳包(其实是复制log 但是空的依然会发送)
- 拉票(RequestVote):Candidate 给所有人发邮件:“我是任期(Term)X,请投我一票。”
- 投票规则(C 的保证):
- 一个任期内只能投一张票。
- 最重要:如果你的日志还没我新(Term 小或长度短),我绝不投给你。这保证了 Leader 一定拥有所有可能已经提交的日志。
upToDate := args.LastLogTerm > lastTerm || (args.LastLogTerm == lastTerm && args.LastLogIndex >= lastIndex)
// Term 代表了“权威性”的代际。一个拥有更高 Term 日志的节点,意味着它经历了更晚近的选举周期。即使它的 Index 较短,它也可能包含了已经被提交(Commit)的最完整数据。
if (rf.voteFor == -1 || rf.voteFor == args.CandidateId) && upToDate {
rf.resetElectionTimerLocked()
// 只有当你**真的授予了选票**,才能重置计时器。如果不给票也重置,攻击者可以通过发假投票请求让全集群永远无法选出 Leader(心跳被抑制)。
rf.voteFor = args.CandidateId
rf.persist()
reply.Vote = true
} else {
reply.Vote = false
}
reply.Term = rf.currentTerm- 成为 Leader:拿到超过半数(Majority)的票,正式上位。
第二阶段:日志复制(达成共识)
一旦有了 Leader,系统开始处理请求。
- 接收指令:Leader 收到命令,先在自己的日志里写一笔,状态是“未提交”。
- 分发(AppendEntries):Leader 把这行日志发给所有 Follower。
- 一致性检查(C 的核心):
- Leader 发送第 10 号日志时,会带上第 9 号日志的信息(Index 9, Term 2)。
- Follower 检查自己的第 9 号日志。如果不匹配,Follower 会拒绝。
- Leader 会不断往前退,直到找到双方一致的地方,然后把 Follower 后面不一致的部分全部覆盖掉。(Leader 永远是对的)。
情况一:越过快照边界(保护逻辑)
if args.PrevLogIndex < rf.lastIncludedIndex {
reply.Success = false
reply.ConflictIndex = rf.lastIncludedIndex + 1
return
}- 场景描述:Leader 试图同步一段非常旧的日志,其
PrevLogIndex指向的位置已经在 Follower 的快照(Snapshot)里了。 - 为什么会发生:网络延迟。Leader 发出的
AppendEntriesRPC 在路上堵了很久,当它到达时,Follower 已经完成了垃圾回收,把旧日志做成了快照并删除了。 - 逻辑详解:
- Follower 无法检查快照里的日志 Term(因为已经删了),所以不能回复
Success。 - 应对方案:告诉 Leader,我现在的日志最早是从
lastIncludedIndex + 1开始的。Leader 收到后会意识到:“哦,你太旧了,AE 同步已经跟不上了,我得给你发InstallSnapshotRPC 了。”
- Follower 无法检查快照里的日志 Term(因为已经删了),所以不能回复
情况二:Follower 日志太短(日志缺失)
if args.PrevLogIndex >= rf.GetLogLenLocked() + 1 {
reply.ConflictIndex = rf.GetLogLenLocked() + 1
reply.ConflictTerm = -1
return
}- 场景描述:Leader 认为 Follower 应该有 100 条日志,所以从 101 条开始发(
PrevLogIndex=100)。但 Follower 实际只有 50 条日志。 - 核心逻辑:
- 不一致的原因是**“空隙”**(Gap)。
- 快速回退(Optimization):如果不加这段,Leader 只能一次次减 1(从 100 试到 99, 98... 直到 50),效率极低。
- 解决策略:Follower 直接告诉 Leader:“我根本没这么长,我现在的末尾是 50。你下次直接从 51(
GetLogLenLocked() + 1)开始试吧。” ConflictTerm = -1是一个约定,告诉 Leader:冲突不是因为任期不对,而是因为我日志太短了。
情况三:任期冲突(经典不一致)
if rf.log[rf.GetLogLocked(args.PrevLogIndex)].Term != args.PrevLogTerm {
reply.ConflictTerm = rf.log[rf.GetLogLocked(args.PrevLogIndex)].Term
index := args.PrevLogIndex
for index > rf.lastIncludedIndex + 1 && rf.log[rf.GetLogLocked(index-1)].Term == reply.ConflictTerm {
index--
}
reply.ConflictIndex = index
return
}- 场景描述:这是最经典的 Raft 冲突。Leader 和 Follower 在
PrevLogIndex处都有日志,但任期号不同。这说明 Follower 这里的日志是来自某个“被废黜”的旧 Leader 的。 - 逻辑详解:
- 记录冲突任期:Follower 说:“我这里
Index=100的地方任期是 2,而你说是 3,咱俩对不上。”(reply.ConflictTerm = 2)。 - 寻找该任期的起点:Follower 往回找,发现任期 2 的日志是从
Index=80开始的。 - 快速跳过整个任期:Follower 告诉 Leader:“我这里整个任期 2 估计都是错的,你下次直接从
Index=80试试看。”
- 记录冲突任期:Follower 说:“我这里
- 为什么这么做:在 Raft 中,如果一个 Index 处的 Term 不匹配,那么这个 Term 之后的所有日志一定都是错的。直接跳过一整个 Term 远比一条条回退快得多。
最后进行覆盖和追加
// 2. 如果通过检查,处理日志覆盖和追加
reply.Success = true
for i, entry := range args.Entries {
index := args.PrevLogIndex + 1 + i
if index < rf.GetLogLenLocked() + 1 {
// 如果已有的日志和新的冲突了,才删除后面的
// 为什么不直接用 rf.log = append(rf.log[:rf.GetLogLocked(index)], args.Entries[i:]...) 这种写法?也就是,**为什么必须先判断 Term 是否不匹配,如果不匹配才截断?** 如果我收到一个已经处理过的旧 RPC 包,直接截断会导致什么问题?
// - 假设 Leader 发送了 RPC-1(Index 10-11)和 RPC-2(Index 12-13)。RPC-2 先到,Follower 存下了 10-13。然后 RPC-1 后到。 **后果:** 如果你无脑截断 rf.log[:10] 然后追加 RPC-1 里的 10-11,那么 Follower 已经存好的 12-13 就会**丢失**。
if rf.log[rf.GetLogLocked(index)].Term != entry.Term {
rf.log = rf.log[:rf.GetLogLocked(index)]
rf.log = append(rf.log, entry)
}
// 如果 Term 相同 继续往后看下一条
} else {
// 超出本地长度了,直接追加剩下所有的
rf.log = append(rf.log, args.Entries[i:]...)
break
}
}
if len(args.Entries) > 0 {
rf.persist()
}
// 3. 更新 commitIndex
if args.LeaderCommit > rf.commitedIndex {
// 论文原文:commitIndex = min(leaderCommit, index of last new entry)
lastNewEntryIndex := args.PrevLogIndex + len(args.Entries)
rf.commitedIndex = min(args.LeaderCommit, lastNewEntryIndex)
}幂等性 :通过判断 Term 是否冲突来决定是否截断日志。这样即便 RPC 请求发生乱序、重传或重叠,Follower 也能保证日志的一致性,不会因为处理旧的 RPC 包而意外删除了已经接收到的、更靠后的正确日志
- 达成共识:Leader 收到过半数的成功回复。
func (rf *Raft) UpdateCommitIndexLocked() {
for i := rf.GetLogLenLocked(); i > rf.commitedIndex; i-- {
if rf.log[rf.GetLogLocked(i)].Term == rf.currentTerm {
count := 1
for p := range rf.peers {
if p == rf.me { continue }
if rf.matchIndex[p] >= i {
count++
}
}
if count > len(rf.peers) / 2 {
rf.commitedIndex = i
break
}
}
}
}
// 每次收到成功复制日志的回复都会调用这个 遍历所有follower的match index 超过半数旧把他commit注意这里判断了要commit的日志term和当前term 这是论文中figure8的情况 leader永远不能直接commit低于当前term的任何条目 只能等到高term有一条被commit了 才被顺带commit
第三阶段:提交与安全(不可逆转)
- 提交(Commit):Leader 发现过半数都记好了,把
commitIndex推高。 - 告知:Leader 在下次心跳中告诉大家:“第 10 号已经提交了。”
- Figure 8 约束:Leader 只能通过提交当前任期的日志来顺带提交旧日志。
如何保持强一致性——PrevLogIdx PrevLogTerm
可以把 PrevLogIndex 和 PrevLogTerm 想象成拼图的接口或者拉链的锁扣。
通俗解释:拉链原理
假设 Leader 要给 Follower 发送第 100 号日志。Leader 不能直接扔过去说:“这是第 100 号,你存下”。
因为 Follower 可能比较卡,它的日志才存到第 80 号。如果它直接存下第 100 号,中间就缺了 19 条,这就完蛋了。
所以,Leader 在发第 100 号日志时,必须带上第 99 号的信息作为“验证码”:
- PrevLogIndex (前一条日志的索引): 99
- PrevLogTerm (前一条日志的任期): 比如 Term 5
Leader 对 Follower 说:“我要给你发第 100 号日志。但在接收之前,请你检查一下你那里有没有第 99 号日志,且它的任期是不是 Term 5?”
- 情况 A (匹配):Follower 看了一眼,自己确实有第 99 号,也是 Term 5。于是回复:“匹配成功,我收下第 100 号。”(拉链拉上了)。
- 情况 B (缺失):Follower 发现自己只有第 80 号。它回复:“拒绝,我没有第 99 号。” Leader 收到拒绝后,下次就会试着发第 81 号(带上第 80 号做验证)。
- 情况 C (冲突):Follower 有第 99 号,但是是 Term 4(旧 Leader 留下的垃圾数据)。它回复:“拒绝,我有第 99 号,但Term不对。” Leader 收到后,知道这里数据不一致,下次会强行覆盖它。
图解
假设现在的状态如下:
Leader 的日志:
Index: 1 2 3 4 5
Term: 1 1 2 3 3
^ ^ ^ ^ ^Leader 想要发送 Index=4 和 Index=5 的日志给 Follower。
此时,AppendEntries RPC 参数会是:
- Entries:
[Log(Index:4, Term:3), Log(Index:5, Term:3)] - PrevLogIndex: 3 (新日志的前一条)
- PrevLogTerm: 2 (Index 3 处的任期)
场景 1:Follower 是健康的
Follower A 日志:
Index: 1 2 3
Term: 1 1 2Follower A 收到 RPC:
- 看
PrevLogIndex(3)。我有 Index 3 吗?有。 - 看
PrevLogTerm(2)。我 Index 3 的 Term 是 2 吗?是。 - 成功。把新的日志 4 和 5 接在后面。
场景 2:Follower 落后了 (缺失)
Follower B 日志:
Index: 1 2
Term: 1 1Follower B 收到 RPC:
- 看
PrevLogIndex(3)。我有 Index 3 吗?没有! - 失败。返回
Success = false。 - Leader 收到失败,会将
nextIndex减小,下次改为发送 Index 3(带上 PrevLogIndex: 2)。
场景 3:Follower 此时有脏数据 (冲突)
Follower C 日志:
Index: 1 2 3
Term: 1 1 1 <-- 注意这里是 1,Leader 期望是 2Follower C 收到 RPC:
- 看
PrevLogIndex(3)。我有 Index 3 吗?有。 - 看
PrevLogTerm(2)。我 Index 3 的 Term 是 2 吗?不是,我是 1。 - 失败。返回
Success = false。 - 这表示 Follower 的 Index 3 是错的,Leader 之后会一步步回退,最终把这个 Term 1 的错误日志覆盖掉。
代码实现逻辑
为什么需要 Index 0 (Dummy Entry)?
为了方便处理 PrevLogIndex,我们通常在 rf.log 初始化时放一个空的日志占位。rf.log[0] = LogEntry{Term: 0}
这样,如果 Leader 要发第一条真的日志(Index 1),它的 PrevLogIndex 就是 0,PrevLogTerm 就是 0。你就不用写一堆 if index == 0 的特殊判断逻辑了。
Leader 端怎么计算? (发送方)
Leader 维护了 nextIndex[],表示下一个要发给某个 Peer 的日志索引。
// 假设要发给 server p
nextIdx := rf.nextIndex[p]
// PrevLogIndex 就是我们要发的这一批的前一个
prevLogIndex := nextIdx - 1
prevLogTerm := rf.log[prevLogIndex].Term
// 要发的日志切片
entries := rf.log[nextIdx:]
args := AppendEntriesArgs{
PrevLogIndex: prevLogIndex,
PrevLogTerm: prevLogTerm,
Entries: entries,
// ... 其他字段
}Follower 端怎么检查? (接收方)
这是 AppendEntries 中最关键的逻辑:
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
// 1. Term 检查
if args.Term < rf.currentTerm { ... }
// 2. 一致性检查 (Consistency Check)
// 情况 A: 我连这个位置的日志都没有 (日志太短)
if args.PrevLogIndex >= len(rf.log) {
reply.Success = false
reply.Term = rf.currentTerm
return
}
// 情况 B: 我有这个位置的日志,但是 Term 不匹配
if rf.log[args.PrevLogIndex].Term != args.PrevLogTerm {
reply.Success = false
reply.Term = rf.currentTerm
// 优化:3C 会在这里做冲突优化
return
}
// 3. 到了这里,说明前面都匹配上了!
// 即使 entries 是空的(心跳),也说明至少到 PrevLogIndex 位置是一致的。
// 4. 处理日志追加和冲突覆盖 (Log Truncation and Append)
// 遍历 args.Entries,找到第一个不匹配的位置,删除它及之后的所有,然后把新的接上去。
// ... (后续实现)
reply.Success = true
}- PrevLogIndex: 这次要传的新货之前,最后那条旧货的编号。
- PrevLogTerm: 那条旧货的生产批次。
- 作用: 只有旧货对上了,才允许接新货。这是 Raft 保证所有节点日志最终一模一样的根本原因(数学归纳法)。
持久化与提交
持久化是一个节点自己的行为 作用是将 currentTerm、voteFor 和 log[] 存储到磁盘中 防止断电 或者用于断电后的恢复
提交则是达成共识以后 交给上层的kv server进行执行 提交的信息是不能改变的 但是持久化的信息可能被新的leader覆盖掉
一定要先持久化再回复RPC
提交的前提是半数以上的节点完成了持久化 而不是完成了日志的一致
LastApplied 和 LastCommited的区别
applied是上层kvserver最后应用到状态机的日志 也就是上一次被放进apply chan的那个日志
而last commit是最后一个在raft内部达成一致的日志条目
从这里可以看出 commit条目一定至少大于等于applied的条目(只有达成共识的日志才会被应用到kv中)
raft中专门有一个协程applier用来查看commit是否前进了 他会对apply落后的每个条目创建一个apply msg 放入apply chan 让上层的kvserver进行处理
新当选的Leader不能commit旧Term的日志
如果用一句话作为技术定义,那就是:Raft 只有当前任期的日志可以触发副本数统计(Counting replicas),而旧任期的日志只能通过“日志匹配原则(Log Matching Property)”被动地被带动提交。
1. 绝不“数数”
Leader 即使看到某条旧任期的日志已经存在于 99% 的节点上,他也绝对不能把自己的 commitIndex 移动到那里。
2. 绝不“违约”
Leader 必须等到有一条当前任期的日志被过半数节点确认。
- 比如 Leader 现在是 Term 4,他在同步一条 Term 2 的旧日志。
- 他必须等到老板发来新指令,他在本子上记下
(Index: 10, Term: 4)。 - 只有当这条
(Index: 10, Term: 4)到了过半数节点,他才能宣布 Index 10 及其之前的所有日志(包括那条 Term 2)全部生效。
3. 为什么必须是“当前任期”?
因为只有当前任期的日志能够代表这个 Leader 的“权威”。
- 高任期的日志(Term 4)天生就有“压制”低任期(Term 2, Term 3)的能力。
- 一旦 Term 4 的日志在过半数节点上扎了根,任何任期小于 4 的节点都不可能再当选 Leader 了。
- 这时候再宣布之前的日志生效,才是真正的安全。
面试官可能会追问:“如果一直没有新请求怎么办?”
这是一个好坑。如果老板(Client)一直不发新请求,那旧任期的日志是不是永远没法提交了?
答案是:No-op 日志。
在很多生产级别的 Raft 实现(比如 etcd)中,Leader 当选后的第一件事就是先发一条不带内容的 No-op(空指令)日志。
- 这条 No-op 日志的任期是当前任期。
- 它一旦过半提交,就能立刻把之前所有任期的旧账全部“冲正”并提交。
- 这就解决了“没有新业务请求时,旧账无法提交”的问题。
代码
在 UpdateCommitIndexLocked 函数里,这行逻辑就是 Figure 8 的灵魂:
// i 是正在尝试更新的 commitIndex 候选值
if rf.log[rf.GetLogLocked(i)].Term == rf.currentTerm { // <--- 就是这一行!
// 只有当前任期的日志才敢去数副本数
if count > len(rf.peers) / 2 {
rf.commitedIndex = i
}
}RAFT中遇到的问题
在ticker中不能开启协程进行选举——election storm
func (rf *Raft) ticker() {
for rf.killed() == false {
rf.mu.Lock()
state := rf.state
rf.mu.Unlock()
if state == StateLeader {
go rf.BroadCastAppendEntries()
time.Sleep(100 * time.Millisecond)
} else {
rf.mu.Lock()
elapsed := time.Since(rf.lastActiveTime)
timeout := rf.electionTimeout
rf.mu.Unlock()
if elapsed > timeout {
// go rf.startElection() <- 不正确
rf.startElection()
}
time.Sleep(10 * time.Millisecond)
}
}
}如果协程调度出现问题 导致没有及时归零计时器 就会有达到超时时间并且开一个选举协程(term++)
又一次超时的时候又会触发选举导致term被抬升的很高(不会影响一致性但是会严重影响可用性)
何时重置超时计时器
“在实现
RequestVote时,重置计时器的时机必须非常谨慎。只有在真正授予选票时重置,是为了防止恶意节点通过无效请求抑制正常选举。
此外,为了防止日志落后的节点仅仅靠高 Term 干扰集群稳定性,虽然标准 Raft 规定只要看到高 Term 就转为 Follower,但在实际工程中,我们通常会配合 Pre-Vote 机制。即:节点在转为 Follower 前,会先确认 Candidate 是否真的有资格(日志够新)以及当前集群是否真的丢失了 Leader。这保证了集群的稳定性(Stability)。”
单个分区中的节点抬高Term——pre-vote解决
- 场景:节点 A 被网络分区隔离了。它在自己的小黑屋里不断超时,Term 一直增加到了 100。
- 动作:隔离解除,节点 A 回归。它发起的
RequestVote(Term=100)到达了正在正常工作的 Leader(Term=10)。 - 反应:Leader 看到 Term 100,大吃一惊,立刻变回 Follower。但 Leader 发现 A 的日志太旧,不给它投票。
- 陷阱:此时,旧 Leader 变成了 Follower,且没有重置计时器。这意味着旧 Leader 会在很短的时间内(因为它之前的计时器已经在跑了)发生超时,再次发起 Term 101 的选举。
结果:这个落后节点虽然没当上 Leader,但它像个“搅屎棍”一样,仅仅靠一个高 Term 就把现有的 Leader 给踢下来了,导致集群发生了一次无意义的震荡。
为了解决上述“搅屎棍”问题,工业级的 Raft(如 etcd)引入了 Pre-Vote 机制:
- 当一个 Candidate 想发起选举时,它先发一个 Pre-Vote(不增加自己的 Term)。
- 大家收到 Pre-Vote 后,检查:
- 你的日志够新吗?
- 我最近是不是刚收到过 Leader 的心跳?(如果我最近收到了心跳,说明集群有活着的 Leader,我直接拒绝你的预投票)。
- 只有 Candidate 拿到大多数人的 Pre-Vote 许可,它才正式增加
currentTerm并发起真正的RequestVote。
为什么这能解决问题?
- 那个落后节点 A 回归时,它发出的 Pre-Vote 会被大家拒绝(因为大家能连上 Leader),A 甚至没有机会把大家的
currentTerm顶上去。
KVRAFT 实战
Clerk/Client
这个部分只需要完成向kvserver发送的RPC构建
需要注意的是
- 需要向所有可能的server进行询问 直到找到leader kvserver才行 可用性方面是raft保证的
- 需要维护clinet id 和 seq no 来保证各种操作的线性一致性和幂等性(如果服务器发现seqno已经过时了 就不会在他的状态机上应用了)
KVserver
KVserver 实际上是一个执行者 本质上就是维护了一个内存里的map 然后能够接受三种指令 GET PUT APPEND
而raft不理解这个指令 他眼里只有字节流日志 并且他保证这个日志在所有机器中达成共识
一旦raft把这个日志成功复制给半数节点 他就会通过apply channel告诉KVserver 你可以执行这个指令了
如果不通过 Raft 给指令,而是 KVServer 收到请求直接改 map:
- Server 1 收到 Put(a, 1),改了 map。
- Server 2 没收到,它的 map 还是旧的。
结果:两个服务器的状态不一样了,分布式系统崩溃。
通过 Raft 给指令的逻辑是: - 客户端找 Server 1 说:我想 Put(a, 1)。
- Server 1 不敢直接改 map,它先问 Raft:把这个记在账本上行不行?
- Raft 告诉 Server 1、2、3:大家都记好了,Index 100 是 Put(a, 1)。
- Server 1、2、3 的 applyLoop 都从 Raft 那里拿到了这条指令。
- 大家同时改 map。
结果:全世界的 map 永远保持同步。
节点 1 节点 2 节点 3
+--------------+ +--------------+ +--------------+
| KVServer | | KVServer | | KVServer |
+--------------+ +--------------+ +--------------+
| Raft | <------> | Raft | <------> | Raft |
+--------------+ +--------------+ +--------------+只有 Leader 身份的 KVServer 会处理 Client 的请求。
- Client 的视角:Client 会轮询服务器。如果它向一个 Follower 发送请求,这个 KVServer 会看一眼它内部的 Raft 状态,说:“我不是 Leader,你去问节点 X”。
- 写操作 (Put/Append):必须由 Leader 提交到 Raft 日志,等半数以上节点确认后才能修改数据库。
- 读操作 (Get):为了保证读到的是最新的,Leader 也需要把 Get 作为一个日志走一遍 Raft 流程(或者使用更高级的 Read Index 机制),确保它在执行读的一刻依然是 Leader。
KVServer 和 Raft 的交互流程
这是一个典型的“异步转同步”的过程,分为四个步骤:
- 提议 (Proposal):
- Client 调用 Put(key, val)。
- KVServer 收到后,调用 rf.Start(op)。
- Raft 说:“好,我把它记在我的日志第 100 号位置了,但我还没确定它能不能生效。”
- 复制 (Replication):
- Leader 的 Raft 开始疯狂给其他节点的 Raft 发消息:“大家快把这行字记在第 100 号位置!”
- 提交与应用 (Commit & Apply):
- 当半数以上节点记好了,Leader 的 Raft 就会认为第 100 号日志已提交。
- 关键点:Raft 通过 applyCh 告诉 KVServer:“第 100 号日志现在安全了,你可以执行它了。”
- 响应 (Reply):
- KVServer 的 applier 协程从 applyCh 拿到消息,修改 kv.db。
- KVServer 找到正在等 100 号索引的那个 RPC Handler,通过 waitCh 喊一声:“喂!做好了!”
- RPC Handler 回复 Client:“成功!”
分布式系统最难的就是“出事的时候”。
情况 A:Leader 突然断网/宕机了
- Raft 层:其他节点会发现 Leader 消失了,自动选出新 Leader。
- KVServer 层:旧 Leader 的 Handler 还在等 waitCh。由于它不再是 Leader,它的 Raft 永远不会在 applyCh 里返回那个索引。Handler 会超时(这就是你代码里 time.After 的作用),并告诉 Client:“出事了,你去问别人吧。”
情况 B:网络分区(脑裂)
- 假设有 5 个节点,节点 1 和 2 被隔离了。节点 1 觉得自己还是 Leader。
- Client 向节点 1 发请求。节点 1 调用 rf.Start()。
- 但是!由于它只能联络到节点 2,凑不够 3 票(半数),这个日志永远无法 Commit。
- Client 会一直等,直到超时。这保证了数据不会在错误的分区里被修改。
情况 C:旧指令延迟到达
- 如果一个 Append 指令因为网络延迟,在 10 秒后才到达。
- KVServer 会检查 lastApplied 表。如果发现这个客户端的序列号(SeqNo)已经处理过了,它就会直接丢弃这个操作,不修改数据库,但会返回 OK(幂等性)。
全部流程
- Client -> KVServer (RPC Handler):
Handler创建Op{Type: Append, Key: "a", Val: "1", ClerkId: 88, SeqNo: 5}。- 调用
index, term, isLeader := rf.Start(op)。 Handler创建ch := make(chan Op, 1)并存入waitChs[index]。
- KVServer -> Raft (Log Replication):
- Raft 在集群间同步这条日志。
- 此时
Handler正在select { case <-ch: ... }处阻塞。
- Raft -> KVServer (Applier Loop):
- Raft 达成共识,把
ApplyMsg{CommandIndex: 100, Command: Op{...}}塞入applyCh Applier协程读到这条消息。
- Raft 达成共识,把
- KVServer 内部状态更新:
Applier检查lastApplied[88]。如果当前记录的序号小于 5,则执行db["a"] += "1"。- 执行完后,
Applier查找waitChs[100]。 - 关键动作:
Applier把Op发进ch。
- KVServer -> Client (RPC Response):
Handler被唤醒,收到Op。- 检查收到的
Op是否还是ClerkId=88, SeqNo=5(防止 Index 100 已经被新 Leader 的其他命令覆盖)。 - 校验通过,给 Client 回复
OK。
为什么要这样设计通信?
这种设计完美解决了分布式环境下的三个地雷:
- 解耦 (Decoupling):
KVServer 不需要知道 Raft 是怎么同步日志的,它只需要给 Raft 一个任务,然后等 Raft 的回执。 - 线性化 (Linearizability):
因为Applier是单线程(一个协程)处理applyCh的,这保证了所有节点执行指令的顺序完全一致。即便有 100 个 Handler 同时在跑,最终改数据库的顺序也只取决于 Raft 日志的顺序。 - 安全性 (Safety):
即使 Leader 在同步过程中挂了,由于 Handler 校验了ClerkId和SeqNo,它能敏锐地发现:“Raft 确实返回了 Index 100 的结果,但那个结果不是我之前要存的‘1’,而是别人存的‘2’。” 这样它就不会给客户端返回错误的成功信息。
RAFT KV 在现实生活中是怎么被使用的
假设我有五台机器 下面管理了上千个GPU机器 用来训练模型
KV RAFT其实只对外提供两个接口 一个是PUT 一个是GET 但是他能保证
- 线性一致性:只要有一个PUT成功了 剩下任何人 在任何地方 调用GET 一定能够得到这个一样的值
- 高可用性:这五个管理者如果有三个可用 就能够正常进行服务
- 持久性与唯一性
一、 这 5 台机器(KV-Raft)在地理上分布在哪里?
在现实世界中,这 5 台机器的物理位置取决于你对 “容灾” 的要求。
同机房分布(LAN):
- 放在一个机房的 5 个不同机架上。
- 优点:速度极快,延迟 < 1ms。
- 缺点:如果整个机房着火或断电,系统就彻底挂了。
跨可用区分布(Multi-AZ):
- 在同一个城市(比如北京),分布在 3 个不同的数据中心(机房 A、B、C)。
- 优点:能抵御单个机房级别的故障。
- 这种最常见(比如 AWS 或阿里云的典型架构)。
跨地域/全球分布(Geo-Distributed):
- 比如 1 台在纽约,1 台在伦敦,1 台在东京,2 台在新加坡。
- 优点:能抵御战争、地震等区域性灾难。
- 缺点:非常慢!因为光速限制,Raft 达成共识(多数派确认)可能需要几百毫秒。Google 的 Spanner 数据库就是这种架构。
二、 只有 Put 和 Get,怎么管理几千台 GPU?
这听起来不可思议,但全世界最复杂的集群管理系统(如 Kubernetes)正是这么干的。
Kubernetes 底层用的是 etcd,而 etcd 本质上和现在写的 KV-Raft 几乎一模一样。
1. 它是如何通过两个方法管理集群的?
秘诀在于:所有的管理指令,在本质上都是“状态的变化”。
场景:给 GPU 1 号分配一个 AI 训练任务
- 包工头(Scheduler) 并不需要直接给 GPU 发消息。
- 包工头 只需调用你的
Put("GPU_001_Assignment", "{task: 'Training_Cat_Model', data: 's3://bucket/data'}")。 - GPU 1 号 上的 Agent 程序(就像一个 Clerk)一直在后台不停地调你的
Get("GPU_001_Assignment")(这种技术叫轮询或长轮询)。 - 一旦 GPU 发现内容变了,它就自己去下载数据开始干活。
场景:GPU 坏了怎么办?
- GPU 上的 Agent 定期调你的
Put("GPU_001_Status", "Healthy_12:00:05")。 - 包工头 调
Get("GPU_001_Status")。如果发现时间很久没更新了,包工头就知道它挂了,于是再发一个Put把任务指派给别人。
- GPU 上的 Agent 定期调你的
2. 为什么只给两个方法?
因为 “简单即力量”。
如果接口太复杂(比如包含 StartTraining(), StopTraining()),那么你的 KV-Raft 就只能用于 AI 训练。
但如果你只提供 Put 和 Get,你就创造了一个通用的真理之源。它既可以管理 GPU 集群,也可以管理银行账本,还可以管理自动驾驶汽车的调度。
三、 总结:这种架构的“全局视角”
底层:KV-Raft (大脑)
这 5 台机器缩在数据中心的一个小角落里。它们虽然只有 5 台,但它们通过 Raft 算法产生了一个 “统一的幻觉”:大家都看到一模一样的Put/Get记录。中间层:Clerk (神经末梢)
分布在全球各地的 1000 台 GPU 机器上,都跑着你写的Clerk代码。它们像触角一样,时刻连接着这 5 台核心机器。运作过程:
- 无论地理分布多广,所有的 GPU 机器都认准这 5 台机器。
- 当 Leader 发生切换,全球各地的
Clerk会自动重定向请求。 - 地理广度由
Clerk负责连接,数据的一致性由你的 5 台机器负责死守。
Shard KV
目的是实现了一个功能完备、可水平扩展(Horizontal Scalability) 且具有强一致性的分布式分片键值数据库。
目的
得到的是一个能够自动负载均衡的数据存储系统。
- 可伸缩性:如果你发现系统压力大,只需启动一组新的 KVServer 副本组,然后向 ShardCtrler 发送一个
Join命令。系统会自动把一部分分片从旧机器迁移到新机器,整个过程对用户近乎透明。 - 高可用性:每个副本组(Group)内部运行 Raft 协议。即使某个组掉线了一两台机器,只要大多数存活,该分片的数据依然可读写。
- 强一致性:通过 Raft 和分片迁移时的状态同步,系统保证了线性一致性。
用户如何与数据库交互?
作为终端用户,你不需要手动去联系 Controller。这些复杂的逻辑都被封装在 shardkv/client.go 的 Clerk 结构体中。
从用户的视角来看,代码是这样的:
ck := shardkv.MakeClerk(ctrlers)
ck.Put("user_1_name", "Alice") // 用户只关心 Key 和 Value
val := ck.Get("user_1_name") // 用户不关心数据在哪底层交互的全过程(Clerk 帮你做了什么)
当你调用 ck.Get("user_1_name") 时,内部发生了以下步骤:
计算分片 (Hashing):
Clerk调用key2shard("user_1_name"),根据 Key 的哈希值算出这个 Key 属于哪个 Shard(比如 Shard 4)。获取路由表 (Querying Ctrler):
Clerk会检查本地缓存的Config。- 如果缓存为空或者版本太旧,
Clerk会给 ShardCtrler 发送一个Query(-1)请求。 - ShardCtrler 回复:“现在的配置是第 5 版,Shard 4 归 Group 102 管,这个组的服务器地址是
[10.0.0.1, 10.0.0.2]。”
定位服务器 (Routing):
Clerk在本地记录下:Shard 4 -> Group 102。然后随机选择 Group 102 中的一台服务器发送Get请求。处理配置变更 (Error Handling):
这是最关键的一步。如果在你发送请求的同时,管理员把这个分片移到了 Group 103:- Group 102 的服务器收到请求后,发现自己已经不负责 Shard 4 了,会返回一个
ErrWrongGroup错误。 Clerk收到这个错误后,意识到“配置变了!”,它会重新向 ShardCtrler 发送Query获取最新路由。- 获取新路由后,
Clerk自动重试发送请求到新的 Group 103。 - 这一切对调用
ck.Get的用户来说是完全无感的。
- Group 102 的服务器收到请求后,发现自己已经不负责 Shard 4 了,会返回一个
角色分配
- ShardCtrler (大脑):配置中心。它只存“地图”(哪个分片在哪),不存实际的业务 Key-Value 数据。
- ShardKV Group (肌肉):数据中心。它存储实际的 Key-Value。它会定期向大脑同步“地图”,并根据地图把数据搬运给其他的 Group。
- Clerk (导航仪):客户端库。它负责查地图、带路、并在路不通(配置改变)时自动重新查地图并绕路。
如果你是一个运维人员,你只需要监控服务器负载。一旦发现 Group A 负载过高,你就去启动 Group B,然后给 ShardCtrler 发一个 Join。剩下的数据迁移、请求重定向,全靠你现在写的这套代码自动完成。这就是自动化运维和弹性伸缩的魅力。
代码实现——controller
代码实现——ShardKV server
ShardKV结构
type ShardKV struct {
mu sync.RWMutex
dead int32
rf *raft.Raft
applyCh chan raft.ApplyMsg
makeEnd func(string) *labrpc.ClientEnd
gid int
sc *shardctrler.Clerk
maxRaftState int
lastApplied int
lastConfig shardctrler.Config
currentConfig shardctrler.Config
// 分片状态机:每个分片独立存储
Shards [shardctrler.NShards]*Shard
// 客户端去重:ClientId -> 操作上下文
lastOperations map[int64]*OperationContext
// 通知通道:LogIndex -> 等待该日志应用的通道
notifyChans map[int]chan *CommandResponse
// 缓存从其他组接收到的待应用分片数据(RPC收到但尚未通过 Raft 应用)
// 当配置变更命令被应用时,从这里读取数据进行迁移
incomingShards map[int]*ShardMigration // ShardID -> 迁移数据
// 用于持久化(Snapshot)
persister *raft.Persister
}
type Shard struct {
KV map[string]string // 该分片的键值存储
State ShardState // 分片当前状态
}快照内容设计
| 字段 | 是否快照 | 原因 |
|---|---|---|
Shards | ✅ 必须 | 核心业务数据,存储所有分片的KV对和状态(Serving/Pulling等)。重启后必须恢复,否则数据丢失。 |
lastOperations | ✅ 必须 | 客户端去重表。若重启后丢失,同一个客户端重发旧请求会被重复执行,违反线性一致性。 |
currentConfig | ✅ 必须 | 当前配置版本。必须知道当前处于哪个配置阶段,才能正确处理新请求和迁移。 |
lastConfig | ✅ 必须 | 上一个配置。用于对比检测配置变更,以及计算需要迁移哪些分片。 |
lastApplied | ✅ 必须 | 最后应用的Raft日志索引。恢复后用于防止重复应用已快照的日志。 |
incomingShards | ❌ 不需要 | 临时缓存,仅用于接收RPC数据后、Raft应用前的过渡。重启后为空即可,丢失后可通过重新拉取恢复。 |
notifyChans | ❌ 不需要 | 内存中的通知通道,用于RPC等待Raft应用。重启后无等待中的RPC,初始化为空map即可。 |
dead | ❌ 不需要 | 运行时状态,Kill()时设置,重启后默认为0。 |
sc (clerk) | ❌ 不需要 | 可重新创建,无状态。 |
rf, applyCh | ❌ 不需要 | Raft实例和通道,运行时重建。 |