@@ -2,9 +2,10 @@ package raftengine
22
33import (
44 "context"
5- "encoding/json"
65
6+ "github.com/shaj13/raft/internal/raftpb"
77 "github.com/shaj13/raft/raftlog"
8+ "go.etcd.io/etcd/pkg/v3/pbutil"
89 "go.etcd.io/raft/v3"
910 etcdraftpb "go.etcd.io/raft/v3/raftpb"
1011)
@@ -105,7 +106,7 @@ func (m *mux) Start() {
105106 if n .rn .HasReady () {
106107 rd := n .rn .Ready ()
107108 advcs [gid ] = rd
108- n .readyc <- hb .suppress (rd )
109+ n .readyc <- hb .suppress (gid , rd )
109110 }
110111 }
111112 }
@@ -397,7 +398,6 @@ func (m *muxNode) Stop() {
397398
398399func newHeartbeats () * heartbeats {
399400 return & heartbeats {
400- pending : map [string ]struct {}{},
401401 step : func (ns * nodeState , m etcdraftpb.Message ) {
402402 _ = ns .rn .Step (m )
403403 },
@@ -410,37 +410,29 @@ func newHeartbeats() *heartbeats {
410410// no matter how many raft nodes they have in common.
411411type heartbeats struct {
412412 id uint64
413- pending map [ string ] struct {}
413+ pending []raftpb. Heartbeat
414414 // abstracted for testing purposes.
415415 step func (* nodeState , etcdraftpb.Message )
416416}
417417
418- func (h * heartbeats ) suppress (rd raft.Ready ) raft.Ready {
418+ func (h * heartbeats ) suppress (gid uint64 , rd raft.Ready ) raft.Ready {
419419 for i := 0 ; i < len (rd .Messages ); i ++ {
420420 msg := rd .Messages [i ]
421421 switch msg .Type {
422- case etcdraftpb .MsgHeartbeat :
423- // record heartbeats by context,
424- // typically ctx is embty unless there a safe read index request.
425- // individual ctx coalesced later.
426- // Same as etcd.Raft https://github.com/etcd-io/etcd/blob/main/raft/read_only.go#L68.
427- h .pending [string (msg .Context )] = struct {}{}
428- fallthrough
429- case etcdraftpb .MsgHeartbeatResp :
430- // MsgHeartbeatResp must be ignored from being pend, otherwise,
431- // the number of heartbeats will be doubled,
432- // instead, the fanout should respond accordingly.
433- // i.e
434- // incorrect flow:
422+ case etcdraftpb .MsgHeartbeatResp , etcdraftpb .MsgHeartbeat :
435423 // node A -> heartbeats -> node B (coalesced)
436424 // node B -> heartbeats -> node A (coalesced)
437425 // node B -> heartbeats resp -> node A (fanout)
438426 // correct flow:
439427 // node A -> heartbeats -> node B (coalesced)
440428 // node B -> heartbeats resp -> node A (fanout)
441-
429+ //
442430 // suppress both MsgHeartbeatResp & MsgHeartbeat.
443431 // individual heartbeats coalesced later.
432+ h .pending = append (h .pending , raftpb.Heartbeat {
433+ GID : gid ,
434+ Msg : msg ,
435+ })
444436 rd .Messages = append (rd .Messages [:i ], rd .Messages [i + 1 :]... )
445437 i --
446438 }
@@ -453,135 +445,52 @@ func (h *heartbeats) coalesced(nodes map[uint64]*nodeState) {
453445 return
454446 }
455447
456- // sent avoid sending the heartbeat to the same node
457- // that is participating in multiple consensus groups.
458- // i.e Nodes A,B groups C,D
459- // node A send msg once to B either in C or D.
460- sent := make (map [uint64 ]struct {})
461- // don't heartbeat yourself.
462- sent [h .id ] = struct {}{}
463-
464- cc := new (coalescedContext )
465- for ctx := range h .pending {
466- cc .Buffers = append (cc .Buffers , []byte (ctx ))
448+ nodeshb := map [uint64 ][]raftpb.Heartbeat {}
449+ for _ , b := range h .pending {
450+ nodeshb [b .Msg .To ] = append (nodeshb [b .Msg .To ], b )
467451 }
468452
469- bcc , err := json .Marshal (cc )
470- if err != nil {
471- raftlog .Warningf ("raft: marshal coalesced heartbeats context: %v" , err )
472- }
453+ groups := map [uint64 ][]etcdraftpb.Message {}
473454
474- for _ , node := range nodes {
475- cfg := node .rn .Status ().Config
476- msgs := []etcdraftpb.Message {}
477- for _ , v := range []map [uint64 ]struct {}{
478- cfg .Voters .IDs (),
479- cfg .Learners ,
480- } {
481- for id := range v {
482- if _ , ok := sent [id ]; ok {
483- continue
455+ for id , msgs := range nodeshb {
456+ for gid , node := range nodes {
457+ cfg := node .rn .Status ().Config
458+ if _ , ok := cfg .Voters .IDs ()[id ]; ok {
459+ cb := & raftpb.CoalescedHeartbeat {
460+ Heartbeats : msgs ,
484461 }
485462
486- sent [id ] = struct {}{}
487-
488- mh := etcdraftpb.Message {
463+ m := etcdraftpb.Message {
489464 From : h .id ,
490465 To : id ,
491466 Type : etcdraftpb .MsgHeartbeat ,
492- Context : bcc ,
467+ Context : pbutil . MustMarshal ( cb ) ,
493468 }
494- msgs = append (msgs , mh )
469+ groups [gid ] = append (groups [gid ], m )
470+ break
495471 }
496472 }
473+ }
497474
498- node .readyc <- raft.Ready {
499- Messages : msgs ,
500- }
475+ for gid , msgs := range groups {
476+ node := nodes [gid ]
477+ go func (msgs []etcdraftpb.Message ) {
478+ node .readyc <- raft.Ready {
479+ Messages : msgs ,
480+ }
481+ }(msgs )
501482 }
502483
503484 // reset the heartbeats state.
504- h .pending = map [ string ] struct {}{}
485+ h .pending = h . pending [: 0 ]
505486}
506487
507488func (h * heartbeats ) fanout (nodes map [uint64 ]* nodeState , msg etcdraftpb.Message ) {
508- var (
509- node * nodeState
510- success bool
511- )
512-
513- defer func () {
514- if ! success && raftlog .V (8 ).Enabled () {
515- str := raft .DescribeMessage (msg , nil )
516- raftlog .V (8 ).Infof ("raft: not fanning out msg: %s" , str )
517- }
518- }()
519-
520- isResp := msg .Type == etcdraftpb .MsgHeartbeatResp
521- ctx := msg .Context
522-
523- cc := new (coalescedContext )
524- if err := json .Unmarshal (ctx , cc ); err != nil {
525- raftlog .Warningf ("raft: unmarshal coalesced heartbeats context: %v" , err )
526- }
527-
528- for _ , n := range nodes {
529- // ok allow sends the given heartbeat to all groups which believe that
530- // their leader resides on the sending node.
531- ok := (n .lead == msg .From || n .lead == raft .None ) && ! isResp
532- // resp sends the given heartbeat response to all groups
533- // which overlap with the sender's groups and consider themselves leader.
534- okResp := n .lead == h .id && isResp
535-
536- // assign a node to reply to the given heartbeat.
537- // the assigned node must recognize the sender.
538- //
539- // Note: when a node is barely fresh the leader id may not exist in the conf state.
540- _ , found := n .rn .Status ().Config .Voters .IDs ()[msg .From ]
541- if ok || found {
542- node = n
543- }
544-
545- if ! (ok || okResp ) {
546- continue
547- }
489+ var cb raftpb.CoalescedHeartbeat
490+ pbutil .MustUnmarshal (& cb , msg .Context )
548491
549- for _ , ctx := range cc .Buffers {
550- m := etcdraftpb.Message {
551- From : msg .From ,
552- To : msg .To ,
553- Type : msg .Type ,
554- Context : ctx ,
555- }
556- h .step (n , m )
557- }
558-
559- success = true
560- }
561-
562- if isResp {
563- return
564- }
565-
566- if node == nil {
567- raftlog .Warningf ("raft: ignored heartbeat from unknown member %x" , msg .From )
568- return
492+ for _ , hb := range cb .Heartbeats {
493+ n := nodes [hb .GID ]
494+ h .step (n , hb .Msg )
569495 }
570-
571- // must respond whether the msg forwarded to any groups or not.
572- node .readyc <- raft.Ready {
573- Messages : []etcdraftpb.Message {
574- {
575- From : msg .To ,
576- To : msg .From ,
577- Type : etcdraftpb .MsgHeartbeatResp ,
578- Context : ctx ,
579- },
580- },
581- }
582- }
583-
584- // coalescedContext hold heartbeats context if any.
585- type coalescedContext struct {
586- Buffers [][]byte
587496}
0 commit comments