diff --git a/Makefile b/Makefile index a755502..5d2e7c8 100644 --- a/Makefile +++ b/Makefile @@ -1,2 +1,7 @@ -test: - @go test ./... --cover \ No newline at end of file +test: + @go test ./... --cover + +test-randomized: + go test --count 10000 --run TestBBARandomized + go test --count 10000 --run TestRBCRandomized + go test --count 10000 --run TestACSRandomized diff --git a/README.md b/README.md index e960ba8..dc657bb 100644 --- a/README.md +++ b/README.md @@ -41,16 +41,18 @@ make test Create a new instance of HoneyBadger. ```golang -// Create a Config struct with your prefered settings. +// Create a Config struct with your preferred settings. cfg := hbbft.Config{ // The number of nodes in the network. N: 4, + // Number of tolerated faulty nodes, use -1 to take max possible. + F: -1, // Identifier of this node. ID: 101, // Identifiers of the participating nodes. Nodes: uint64{67, 1, 99, 101}, - // The prefered batch size. If BatchSize is empty, an ideal batch size will - // be choosen for you. + // The preferred batch size. If BatchSize is empty, an ideal batch size will + // be chosen for you. BatchSize: 100, } diff --git a/acs.go b/acs.go index 8417a43..7af4864 100644 --- a/acs.go +++ b/acs.go @@ -15,7 +15,7 @@ type ACSMessage struct { // ACS implements the Asynchronous Common Subset protocol. // ACS assumes a network of N nodes that send signed messages to each other. // There can be f faulty nodes where (3 * f < N). -// Each participating node proposes an element for inlcusion. The protocol +// Each participating node proposes an element for inclusion. The protocol // guarantees that all of the good nodes output the same set, consisting of // at least (N -f) of the proposed values. // @@ -23,8 +23,8 @@ type ACSMessage struct { // ACS creates a Broadcast algorithm for each of the participating nodes. // At least (N -f) of these will eventually output the element proposed by that // node. ACS will also create and BBA instance for each participating node, to -// decide whether that node's proposed element should be inlcuded in common set. -// Whenever an element is received via broadcast, we imput "true" into the +// decide whether that node's proposed element should be included in common set. +// Whenever an element is received via broadcast, we input "true" into the // corresponding BBA instance. When (N-f) BBA instances have decided true we // input false into the remaining ones, where we haven't provided input yet. // Once all BBA instances have decided, ACS returns the set of all proposed @@ -64,9 +64,7 @@ type ( } acsInputResponse struct { - rbcMessages []*BroadcastMessage - acsMessages []*ACSMessage - err error + err error } acsInputTuple struct { @@ -78,7 +76,7 @@ type ( // NewACS returns a new ACS instance configured with the given Config and node // ids. func NewACS(cfg Config) *ACS { - if cfg.F == 0 { + if cfg.F == -1 { cfg.F = (cfg.N - 1) / 3 } acs := &ACS{ @@ -95,12 +93,23 @@ func NewACS(cfg Config) *ACS { // Create all the instances for the participating nodes for _, id := range cfg.Nodes { acs.rbcInstances[id] = NewRBC(cfg, id) - acs.bbaInstances[id] = NewBBA(cfg) + acs.bbaInstances[id] = NewBBA(cfg, id) } go acs.run() return acs } +// DebugInfo returns internal state. Should be used for debugging only. +func (a *ACS) DebugInfo() (map[uint64]*RBC, map[uint64]*BBA, map[uint64][]byte, map[uint64]bool, []MessageTuple) { // TODO: ... + return a.rbcInstances, a.bbaInstances, a.rbcResults, a.bbaResults, a.messageQue.que +} + +// Messages returns all the internal messages from the message que. Note that +// the que will be empty after invoking this method. +func (a *ACS) Messages() []MessageTuple { + return a.messageQue.messages() +} + // InputValue sets the input value for broadcast and returns an initial set of // Broadcast and ACS Messages to be broadcasted in the network. func (a *ACS) InputValue(val []byte) error { @@ -151,12 +160,17 @@ func (a *ACS) Output() map[uint64][]byte { } // Done returns true whether ACS has completed its agreements and cleared its -// messageQue. +// messageQue. It is possible that the BBA part will decide before the RBC. func (a *ACS) Done() bool { agreementsDone := true - for _, bba := range a.bbaInstances { - if !bba.done { + for i, bba := range a.bbaInstances { + if !bba.Done() { + agreementsDone = false + break + } + if a.bbaResults[i] && a.rbcResults[i] == nil { agreementsDone = false + break } } return agreementsDone && a.messageQue.len() == 0 @@ -184,7 +198,7 @@ func (a *ACS) inputValue(data []byte) error { } if output := rbc.Output(); output != nil { a.rbcResults[a.ID] = output - a.processAgreement(a.ID, func(bba *BBA) error { + return a.processAgreement(a.ID, func(bba *BBA) error { if bba.AcceptInput() { return bba.InputValue(true) } @@ -194,8 +208,14 @@ func (a *ACS) inputValue(data []byte) error { return nil } -func (a *ACS) stop() { +func (a *ACS) Stop() { close(a.closeCh) + for _, rbc := range a.rbcInstances { + rbc.Stop() + } + for _, bba := range a.bbaInstances { + bba.Stop() + } } func (a *ACS) run() { @@ -255,7 +275,10 @@ func (a *ACS) processAgreement(pid uint64, fun func(bba *BBA) error) error { if !ok { return fmt.Errorf("could not find bba instance for (%d)", pid) } - if bba.done { + if bba.Done() { + if !a.decided { + a.tryCompleteAgreement() + } return nil } if err := fun(bba); err != nil { @@ -287,6 +310,10 @@ func (a *ACS) processAgreement(pid uint64, fun func(bba *BBA) error) error { } } } + } + // Completion can be triggered either by the BBA or the RBC output, + // depending on which is completed first. Both variants are possible. + if _, ok := a.bbaResults[pid]; ok { a.tryCompleteAgreement() } return nil @@ -308,8 +335,9 @@ func (a *ACS) tryCompleteAgreement() { } bcResults := make(map[uint64][]byte) for _, id := range nodesThatProvidedTrue { - val, _ := a.rbcResults[id] - bcResults[id] = val + if val, bcOk := a.rbcResults[id]; bcOk { + bcResults[id] = val + } } if len(nodesThatProvidedTrue) == len(bcResults) { a.output = bcResults diff --git a/acs_test.go b/acs_test.go index db28df7..6a5dcc2 100644 --- a/acs_test.go +++ b/acs_test.go @@ -1,10 +1,14 @@ package hbbft import ( + "bytes" + "fmt" + "math/rand" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // Test ACS with 4 good nodes. The result should be that at least the output @@ -78,6 +82,7 @@ func TestNewACS(t *testing.T) { nodes = []uint64{0, 1, 2, 3} acs = NewACS(Config{ N: len(nodes), + F: -1, // Use default. ID: id, Nodes: nodes, }) @@ -97,7 +102,7 @@ func TestNewACS(t *testing.T) { } func TestACSOutputIsNilAfterConsuming(t *testing.T) { - acs := NewACS(Config{N: 4}) + acs := NewACS(Config{N: 4, F: -1}) // Use default for F. output := map[uint64][]byte{ 1: []byte("this is it"), } @@ -106,15 +111,97 @@ func TestACSOutputIsNilAfterConsuming(t *testing.T) { assert.Nil(t, acs.Output()) } +// This test checks, if messages sent in any order are still handled correctly. +// It is expected to run this test multiple times. +func TestACSRandomized(t *testing.T) { + if err := testACSRandomized(t); err != nil { + t.Fatalf("Failed, reason=%+v", err) + } +} +func testACSRandomized(t *testing.T) error { + var err error + var N, T = 7, 5 + + msgs := make([]*testMsg, 0) + nodes := make([]uint64, N) + for n := range nodes { + nodes[n] = uint64(n) + } + + cfg := make([]Config, N) + for i := range cfg { + cfg[i] = Config{ + N: N, + F: N - T, + ID: uint64(i), + Nodes: nodes, + BatchSize: 21254, // Should be unused. + } + + } + + acs := make([]*ACS, N) + for a := range acs { + acs[a] = NewACS(cfg[a]) + if err = acs[a].InputValue([]byte{1, 2, byte(a)}); err != nil { + return fmt.Errorf("Failed to process ACS.InputValue: %+v", err) + } + msgs = appendTestMsgs(acs[a].Messages(), nodes[a], msgs) + } + + // var done bool + for len(msgs) != 0 { + m := rand.Intn(len(msgs)) + msg := msgs[m] + + msgTo := msg.msg.To + if acsMsg, ok := msg.msg.Payload.(*ACSMessage); ok { + if err = acs[msgTo].HandleMessage(uint64(msg.from), acsMsg); err != nil { + return fmt.Errorf("Failed to ACS.HandleMessage: %+v", err) + } + } else { + return fmt.Errorf("Unexpected message type: %+v", msg.msg.Payload) + } + + // Remove the message from the buffer and append the new messages, if any. + msgs[m] = msgs[len(msgs)-1] + msgs = msgs[:len(msgs)-1] + msgs = appendTestMsgs(acs[msgTo].Messages(), nodes[msgTo], msgs) + } + + out0 := acs[0].Output() + for a := range acs { + require.True(t, acs[a].Done()) + if a == 0 { + continue + } + var outA map[uint64][]byte = acs[a].Output() + require.Equal(t, len(out0), len(outA)) + for i := range out0 { + require.Equal(t, bytes.Compare(out0[i], outA[i]), 0) + } + acs[a].Stop() + } + return nil +} + type testMsg struct { from uint64 msg MessageTuple } +func appendTestMsgs(msgs []MessageTuple, senderID uint64, buf []*testMsg) []*testMsg { + output := buf[:] + for m := range msgs { + output = append(output, &testMsg{from: senderID, msg: msgs[m]}) + } + return output +} + func makeACSNetwork(n int) []*ACS { network := make([]*ACS, n) for i := 0; i < n; i++ { - network[i] = NewACS(Config{N: n, ID: uint64(i), Nodes: makeids(n)}) + network[i] = NewACS(Config{N: n, F: -1, ID: uint64(i), Nodes: makeids(n)}) // Use default for F. go network[i].run() } return network diff --git a/bba.go b/bba.go index caee0ba..7a12270 100644 --- a/bba.go +++ b/bba.go @@ -33,31 +33,66 @@ type AuxRequest struct { Value bool } +// CCRequest is not part of the HB protocol. We use it +// to interact with the CC asynchronously, to avoid blocking +// and to have a uniform interface from the user of the BBA. +type CCRequest struct { + Payload interface{} +} + +// DoneRequest is not part of the HB protocol, but we use +// it here to terminate the protocol in a graceful way. +type DoneRequest struct{} + // BBA is the Binary Byzantine Agreement build from a common coin protocol. type BBA struct { // Config holds the BBA configuration. Config + + // Common Coin implementation to use. + commonCoin CommonCoin + commonCoinAsked bool + commonCoinValue *bool + // Current epoch. epoch uint32 - // Bval requests we accepted this epoch. + + // Bval requests we accepted this epoch (received from a quorum). binValues []bool - // sentBvals are the binary values this instance sent. + + // sentBvals are the binary values this instance sent in this epoch. sentBvals []bool - // recvBval is a mapping of the sender and the receveived binary value. - recvBval map[uint64]bool - // recvAux is a mapping of the sender and the receveived Aux value. + + // recvBval is a mapping of the sender and the received binary value. + // We need to collect both values, because each node can send several + // different bval messages in a single round. + recvBvalT map[uint64]bool + recvBvalF map[uint64]bool + + // recvAux is a mapping of the sender and the received Aux value. recvAux map[uint64]bool + + // recvDone contains received info, on which epoch which peer has decided. + recvDone map[uint64]uint32 + // Whether this bba is terminated or not. + // It can have an output, but should be still running, + // because it has to help other peers to decide. done bool - // output and estimated of the bba protocol. This can be either nil or a - // boolean. + + // output and estimated of the bba protocol. + // This can be either nil or a boolean. + // The decision is kept permanently, while the output is cleared on first fetch. output, estimated, decision interface{} + //delayedMessages are messages that are received by a node that is already - // in a later epoch. These messages will be qued an handled the next epoch. - delayedMessages []delayedMessage + // in a later epoch. These messages will be queued an handled the next epoch. + delayedMessages []*delayedMessage + // For all the external access, like Done, Output, etc. lock sync.RWMutex - // Que of AgreementMessages that need to be broadcasted after each received + + // Queue of AgreementMessages that need to be broadcasted after each received // message. messages []*AgreementMessage @@ -69,21 +104,32 @@ type BBA struct { } // NewBBA returns a new instance of the Binary Byzantine Agreement. -func NewBBA(cfg Config) *BBA { - if cfg.F == 0 { +func NewBBA(cfg Config, nodeID uint64) *BBA { + if cfg.F == -1 { cfg.F = (cfg.N - 1) / 3 } + var cc CommonCoin + if cfg.CommonCoin != nil { + cc = cfg.CommonCoin + } else { + cc = NewFakeCoin() // Use it by default to avoid breaking changes in the API. + } bba := &BBA{ Config: cfg, - recvBval: make(map[uint64]bool), + commonCoin: cc.ForNodeID(nodeID), + commonCoinAsked: false, + commonCoinValue: nil, + recvBvalT: make(map[uint64]bool), + recvBvalF: make(map[uint64]bool), recvAux: make(map[uint64]bool), + recvDone: make(map[uint64]uint32), sentBvals: []bool{}, binValues: []bool{}, closeCh: make(chan struct{}), inputCh: make(chan bbaInputTuple), messageCh: make(chan bbaMessageTuple), messages: []*AgreementMessage{}, - delayedMessages: []delayedMessage{}, + delayedMessages: []*delayedMessage{}, } go bba.run() return bba @@ -120,8 +166,8 @@ func (b *BBA) InputValue(val bool) error { return <-t.err } -// HandleMessage will process the given rpc message. The caller is resposible to -// make sure only RPC messages are passed that are elligible for the BBA protocol. +// HandleMessage will process the given rpc message. The caller is responsible to +// make sure only RPC messages are passed that are eligible for the BBA protocol. func (b *BBA) HandleMessage(senderID uint64, msg *AgreementMessage) error { b.msgCount++ t := bbaMessageTuple{ @@ -133,7 +179,7 @@ func (b *BBA) HandleMessage(senderID uint64, msg *AgreementMessage) error { return <-t.err } -// AcceptInput returns true whether this bba instance is elligable for accepting +// AcceptInput returns true whether this bba instance is eligable for accepting // a new input value. func (b *BBA) AcceptInput() bool { return b.epoch == 0 && b.estimated == nil @@ -143,7 +189,9 @@ func (b *BBA) AcceptInput() bool { // then it will return the output else nil. Note that after consuming the output // its will be set to nil forever. func (b *BBA) Output() interface{} { - if b.output != nil { + b.lock.Lock() + defer b.lock.Unlock() + if b.done && b.output != nil { out := b.output b.output = nil return out @@ -155,23 +203,31 @@ func (b *BBA) Output() interface{} { // processing a protocol message. After calling this method the que will // be empty. Hence calling Messages can only occur once in a single roundtrip. func (b *BBA) Messages() []*AgreementMessage { - b.lock.RLock() - msgs := b.messages - b.lock.RUnlock() - b.lock.Lock() defer b.lock.Unlock() + msgs := b.messages b.messages = []*AgreementMessage{} return msgs } +// addMessage adds single message to be broadcasted. +// The actual recipients are set in the ACS part. func (b *BBA) addMessage(msg *AgreementMessage) { b.lock.Lock() defer b.lock.Unlock() b.messages = append(b.messages, msg) } -func (b *BBA) stop() { +// Done indicates, if the process is already decided and can be terminated. +// It can be so that the BBA has decided, but needs to support other peers to decide. +func (b *BBA) Done() bool { + b.lock.Lock() + defer b.lock.Unlock() + return b.done +} + +// Stop the BBA thread. +func (b *BBA) Stop() { close(b.closeCh) } @@ -182,10 +238,14 @@ func (b *BBA) run() { select { case <-b.closeCh: return - case t := <-b.inputCh: - t.err <- b.inputValue(t.value) - case t := <-b.messageCh: - t.err <- b.handleMessage(t.senderID, t.msg) + case t, ok := <-b.inputCh: + if ok { + t.err <- b.inputValue(t.value) + } + case t, ok := <-b.messageCh: + if ok { + t.err <- b.handleMessage(t.senderID, t.msg) + } } } } @@ -193,19 +253,21 @@ func (b *BBA) run() { // inputValue will set the given val as the initial value to be proposed in the // Agreement. func (b *BBA) inputValue(val bool) error { - // Make sure we are in the first epoch round. + // Make sure we are in the first epoch and the value is not estimated yet. if b.epoch != 0 || b.estimated != nil { return nil } b.estimated = val - b.sentBvals = append(b.sentBvals, val) - b.addMessage(NewAgreementMessage(int(b.epoch), &BvalRequest{val})) - return b.handleBvalRequest(b.ID, val) + return b.sendBval(val) } -// handleMessage will process the given rpc message. The caller is resposible to -// make sure only RPC messages are passed that are elligible for the BBA protocol. +// handleMessage will process the given rpc message. The caller is responsible to +// make sure only RPC messages are passed that are eligible for the BBA protocol. func (b *BBA) handleMessage(senderID uint64, msg *AgreementMessage) error { + if _, ok := msg.Message.(*DoneRequest); ok { + // We handle done messages from all the epochs. + return b.handleDoneRequest(senderID, uint32(msg.Epoch)) + } if b.done { return nil } @@ -217,9 +279,9 @@ func (b *BBA) handleMessage(senderID uint64, msg *AgreementMessage) error { ) return nil } - // Messages from later epochs will be qued and processed later. + // Messages from later epochs will be queued and processed later. if msg.Epoch > int(b.epoch) { - b.delayedMessages = append(b.delayedMessages, delayedMessage{senderID, msg}) + b.delayedMessages = append(b.delayedMessages, &delayedMessage{senderID, msg}) return nil } @@ -228,6 +290,8 @@ func (b *BBA) handleMessage(senderID uint64, msg *AgreementMessage) error { return b.handleBvalRequest(senderID, t.Value) case *AuxRequest: return b.handleAuxRequest(senderID, t.Value) + case *CCRequest: + return b.handleCCRequest(t.Payload) default: return fmt.Errorf("unknown BBA message received: %v", t) } @@ -236,64 +300,110 @@ func (b *BBA) handleMessage(senderID uint64, msg *AgreementMessage) error { // handleBvalRequest processes the received binary value and fills up the // message que if there are any messages that need to be broadcasted. func (b *BBA) handleBvalRequest(senderID uint64, val bool) error { - b.lock.Lock() - b.recvBval[senderID] = val - b.lock.Unlock() - lenBval := b.countBvals(val) + if b.commonCoinAsked { + return nil + } + b.addRecvBval(senderID, val) + lenBval := b.countRecvBvals(val) // When receiving n bval(b) messages from 2f+1 nodes: inputs := inputs u {b} - if lenBval == 2*b.F+1 { + // The set binValues can increase over time (more that a single element). + if lenBval > 2*b.F { wasEmptyBinValues := len(b.binValues) == 0 - b.binValues = append(b.binValues, val) + b.addBinValue(val) // If inputs > 0 broadcast output(b) and handle the output ourselfs. // Wait until binValues > 0, then broadcast AUX(b). The AUX(b) broadcast - // may only occure once per epoch. + // may only occur once per epoch. if wasEmptyBinValues { b.addMessage(NewAgreementMessage(int(b.epoch), &AuxRequest{val})) - b.handleAuxRequest(b.ID, val) + if err := b.handleAuxRequest(b.ID, val); err != nil { + return err + } } - return nil } // When receiving input(b) messages from f + 1 nodes, if inputs(b) is not // been sent yet broadcast input(b) and handle the input ourselfs. - if lenBval == b.F+1 && !b.hasSentBval(val) { - b.sentBvals = append(b.sentBvals, val) - b.addMessage(NewAgreementMessage(int(b.epoch), &BvalRequest{val})) - return b.handleBvalRequest(b.ID, val) + if lenBval > b.F && !b.hasSentBval(val) { + return b.sendBval(val) } - return nil + + // It is possible that we have the needed aux messages already, + // therefore we need to try to make a decision. + return b.tryOutputAgreement() } func (b *BBA) handleAuxRequest(senderID uint64, val bool) error { - b.lock.Lock() + if b.commonCoinAsked { + return nil + } + if _, ok := b.recvAux[senderID]; ok { + // Only a single aux can be received from a peer. + return fmt.Errorf("aux already received, recvNode=%v, epoch=%v, aux=%+v, new %v->%v", b.ID, b.epoch, b.recvAux, senderID, val) + } b.recvAux[senderID] = val - b.lock.Unlock() - b.tryOutputAgreement() + return b.tryOutputAgreement() +} + +func (b *BBA) handleCCRequest(payload interface{}) error { + coin, outPayloads, err := b.commonCoin.HandleRequest(b.epoch, payload) + if err != nil { + return err + } + if err := b.sendCC(outPayloads); err != nil { + return err + } + if b.commonCoinValue == nil { + b.commonCoinValue = coin + } + return b.tryOutputAgreement() +} + +func (b *BBA) handleDoneRequest(senderID uint64, doneInEpoch uint32) error { + b.recvDone[senderID] = doneInEpoch + if b.canMarkDoneNow() { + b.done = true + } return nil } // tryOutputAgreement waits until at least (N - f) output messages received, // once the (N - f) messages are received, make a common coin and uses it to // compute the next decision estimate and output the optional decision value. -func (b *BBA) tryOutputAgreement() { +func (b *BBA) tryOutputAgreement() error { if len(b.binValues) == 0 { - return + return nil } // Wait longer till eventually receive (N - F) aux messages. - lenOutputs, values := b.countOutputs() + lenOutputs, values := b.countGoodAux() if lenOutputs < b.N-b.F { - return + return nil } - // TODO: implement a real common coin algorithm. - coin := b.epoch%2 == 0 + if !b.commonCoinAsked { + maybeCoin, ccPayloads, err := b.commonCoin.StartCoinFlip(b.epoch) + if err != nil { + return err + } + if err := b.sendCC(ccPayloads); err != nil { + return err + } + b.commonCoinAsked = true + if b.commonCoinValue == nil { + b.commonCoinValue = maybeCoin + } + } + if b.commonCoinValue == nil { + // Still waiting for the common coin. + return nil + } + coin := *b.commonCoinValue // Continue the BBA until both: // - a value b is output in some epoch r // - the value (coin r) = b for some round r' > r - if b.done || b.decision != nil && b.decision.(bool) == coin { + if b.done || (b.decision != nil && b.decision.(bool) == coin) { b.done = true - return + return nil } log.Debugf( @@ -304,30 +414,55 @@ func (b *BBA) tryOutputAgreement() { // Start the next epoch. b.advanceEpoch() - if len(values) != 1 { - b.estimated = coin - } else { + if len(values) == 1 { b.estimated = values[0] - // Output may be set only once. - if b.decision == nil && values[0] == coin { + if b.decision == nil && values[0] == coin { // Output may be set only once. b.output = values[0] b.decision = values[0] log.Debugf("id (%d) outputed a decision (%v) after (%d) msgs", b.ID, values[0], b.msgCount) b.msgCount = 0 + if err := b.sendDone(); err != nil { + return err + } } + } else { + b.estimated = coin + } + if err := b.sendBval(b.estimated.(bool)); err != nil { + return err } - estimated := b.estimated.(bool) - b.sentBvals = append(b.sentBvals, estimated) - b.addMessage(NewAgreementMessage(int(b.epoch), &BvalRequest{estimated})) - // handle the delayed messages. - for _, que := range b.delayedMessages { - if err := b.handleMessage(que.sid, que.msg); err != nil { - // TODO: Handle this error properly. - log.Warn(err) + // Handle the delayed messages. They can be re-added to the delayed list, + // if their epoch is still in the future (in the handleMessage function). + deliveryCandidates := b.delayedMessages + b.delayedMessages = []*delayedMessage{} + for _, delayed := range deliveryCandidates { + if err := b.handleMessage(delayed.sid, delayed.msg); err != nil { + return err } } - b.delayedMessages = []delayedMessage{} + return nil +} + +func (b *BBA) sendBval(val bool) error { + b.sentBvals = append(b.sentBvals, val) + b.addMessage(NewAgreementMessage(int(b.epoch), &BvalRequest{val})) + return b.handleBvalRequest(b.ID, val) +} + +func (b *BBA) sendCC(outPayloads []interface{}) error { + for _, outPayload := range outPayloads { + b.addMessage(NewAgreementMessage(int(b.epoch), &CCRequest{Payload: outPayload})) + } + return nil +} + +func (b *BBA) sendDone() error { + if _, ok := b.recvDone[b.ID]; ok { + return nil + } + b.addMessage(NewAgreementMessage(int(b.epoch), &DoneRequest{})) + return b.handleDoneRequest(b.ID, b.epoch) } // advanceEpoch will reset all the values that are bound to an epoch and increments @@ -336,37 +471,59 @@ func (b *BBA) advanceEpoch() { b.binValues = []bool{} b.sentBvals = []bool{} b.recvAux = make(map[uint64]bool) - b.recvBval = make(map[uint64]bool) + b.recvBvalT = make(map[uint64]bool) + b.recvBvalF = make(map[uint64]bool) + b.commonCoinAsked = false + b.commonCoinValue = nil b.epoch++ } -// countOutputs returns the number of received (aux) messages, the corresponding +// countGoodAux returns the number of received (aux) messages, the corresponding // values that where also in our inputs. -func (b *BBA) countOutputs() (int, []bool) { - m := map[bool]int{} - for i, val := range b.recvAux { - m[val] = int(i) - } - vals := []bool{} - for _, val := range b.binValues { - if _, ok := m[val]; ok { - vals = append(vals, val) +func (b *BBA) countGoodAux() (int, []bool) { + // Collect aux messages, that were received with values also present in binValues. + goodAux := map[uint64]bool{} + for i := range b.recvAux { + for _, binVal := range b.binValues { + if b.recvAux[i] == binVal { + goodAux[i] = binVal + } + } + } + // Take only the values present in the goodAux + values := []bool{} + for _, auxVal := range goodAux { + found := false + for _, val := range values { + if auxVal == val { + found = true + break + } + } + if !found { + values = append(values, auxVal) } } - return len(b.recvAux), vals + return len(goodAux), values } -// countBvals counts all the received Bval inputs matching b. -func (b *BBA) countBvals(ok bool) int { - b.lock.RLock() - defer b.lock.RUnlock() - n := 0 - for _, val := range b.recvBval { - if val == ok { - n++ - } +// addRecvBval marks the specified BVAL_r(val) as received. +// The same node can send multiple values, we track them separatelly. +func (b *BBA) addRecvBval(senderID uint64, val bool) { + if val { + b.recvBvalT[senderID] = val + } else { + b.recvBvalF[senderID] = val + } +} + +// countRecvBvals counts all the received Bval inputs matching b. +func (b *BBA) countRecvBvals(val bool) int { + if val { + return len(b.recvBvalT) + } else { + return len(b.recvBvalF) } - return n } // hasSentBval return true if we already sent out the given value. @@ -378,3 +535,29 @@ func (b *BBA) hasSentBval(val bool) bool { } return false } + +func (b *BBA) addBinValue(val bool) { + for _, bv := range b.binValues { + if bv == val { + return + } + } + b.binValues = append(b.binValues, val) +} + +// If others (more than F) have decided in previous epochs, then we are +// among the others, who decided in a subsequent round, therefore we don't +// need to wait for more epochs to close the process. +func (b *BBA) canMarkDoneNow() bool { + if _, ok := b.recvDone[b.ID]; !ok { + // We have not decided yet, can't close the process. + return false + } + count := 0 + for _, e := range b.recvDone { + if e < b.recvDone[b.ID] { + count++ + } + } + return count > b.F +} diff --git a/bba_test.go b/bba_test.go index 9f6cb59..8347b93 100644 --- a/bba_test.go +++ b/bba_test.go @@ -1,16 +1,19 @@ package hbbft import ( + "fmt" + "math/rand" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // Testing BBA should cover all of the following specifications. // // 1. If a correct node outputs the value (b), then every good node outputs (b). // 2. If all good nodes receive input, then every good node outputs a value. -// 3. If any good node ouputs value (b), then at least one good ndoe receives (b) +// 3. If any good node outputs value (b), then at least one good node receives (b) // as input. // func TestAllNodesFaultyAgreement(t *testing.T) { @@ -38,13 +41,13 @@ func TestAgreementGoodNodes(t *testing.T) { } func TestBBAStepByStep(t *testing.T) { - bba := NewBBA(Config{N: 4, ID: 0}) + bba := NewBBA(Config{N: 4, F: -1, ID: 0}, 0) // Set our input value. assert.Nil(t, bba.InputValue(true)) assert.Equal(t, 1, len(bba.sentBvals)) assert.True(t, bba.sentBvals[0]) - assert.True(t, bba.recvBval[0]) // we are id (0) + assert.True(t, bba.recvBvalT[0]) // we are id (0) msgs := bba.Messages() assert.Equal(t, 1, len(msgs)) assert.IsType(t, &BvalRequest{}, msgs[0].Message) @@ -52,13 +55,13 @@ func TestBBAStepByStep(t *testing.T) { // Sent input from node 1 bba.handleBvalRequest(uint64(1), true) - assert.True(t, bba.recvBval[1]) + assert.True(t, bba.recvBvalT[1]) // Sent input from node 2 // The algorithm decribes that after receiving (N - f) bval messages we // broadcast AUX(b) bba.handleBvalRequest(uint64(2), true) - assert.True(t, bba.recvBval[2]) + assert.True(t, bba.recvBvalT[2]) msg := bba.Messages() assert.Equal(t, 1, len(msg)) assert.IsType(t, &AuxRequest{}, msg[0].Message) @@ -79,10 +82,11 @@ func TestBBAStepByStep(t *testing.T) { } func TestNewBBA(t *testing.T) { - cfg := Config{N: 4} - bba := NewBBA(cfg) + cfg := Config{N: 4, F: -1} + bba := NewBBA(cfg, 0) assert.Equal(t, 0, len(bba.binValues)) - assert.Equal(t, 0, len(bba.recvBval)) + assert.Equal(t, 0, len(bba.recvBvalT)) + assert.Equal(t, 0, len(bba.recvBvalF)) assert.Equal(t, 0, len(bba.recvAux)) assert.Equal(t, 0, len(bba.sentBvals)) assert.Equal(t, uint32(0), bba.epoch) @@ -91,8 +95,8 @@ func TestNewBBA(t *testing.T) { } func TestAdvanceEpochInBBA(t *testing.T) { - cfg := Config{N: 4} - bba := NewBBA(cfg) + cfg := Config{N: 4, F: -1} + bba := NewBBA(cfg, 0) bba.epoch = 8 bba.binValues = []bool{false, true, true} bba.sentBvals = []bool{false, true} @@ -178,8 +182,7 @@ func excludeID(ids []uint64, id uint64) []uint64 { func makeBBAInstances(n int) []*BBA { bbas := make([]*BBA, n) for i := 0; i < n; i++ { - bbas[i] = NewBBA(Config{N: n, ID: uint64(i)}) - go bbas[i].run() + bbas[i] = NewBBA(Config{N: n, F: -1, ID: uint64(i)}, uint64(i)) } return bbas } @@ -189,3 +192,81 @@ type testAgreementMessage struct { to uint64 msg *AgreementMessage } + +func appendTestAgreementMessages(msgs []*AgreementMessage, senderID uint64, nodes []uint64, buf []*testAgreementMessage) []*testAgreementMessage { + output := buf[:] + for n := range nodes { + if nodes[n] != senderID { + for m := range msgs { + output = append(output, &testAgreementMessage{from: senderID, to: nodes[n], msg: msgs[m]}) + } + } + } + return output +} + +// This test should be run repeatedly for some time. +func TestBBARandomized(t *testing.T) { + if err := testBBARandomized(t); err != nil { + t.Fatalf("Failed, reason=%+v", err) + } +} +func testBBARandomized(t *testing.T) error { + var err error + var N, T = 7, 5 + + msgs := make([]*testAgreementMessage, 0) + nodes := make([]uint64, N) + for n := range nodes { + nodes[n] = uint64(n) + } + + cfg := make([]Config, N) + for i := range cfg { + cfg[i] = Config{ + N: N, + F: N - T, + ID: uint64(i), + Nodes: nodes, + BatchSize: 21254, // Should be unused. + } + } + + inputs := make([]bool, N) + for i := range inputs { + inputs[i] = rand.Int()%2 == 0 + } + + bba := make([]*BBA, N) + for i := range bba { + bba[i] = NewBBA(cfg[i], nodes[i]) + if err = bba[i].InputValue(inputs[i]); err != nil { + return fmt.Errorf("Failed to process BBA.InputValue: %v", err) + } + msgs = appendTestAgreementMessages(bba[i].Messages(), nodes[i], nodes, msgs) + } + + for len(msgs) != 0 { + m := rand.Intn(len(msgs)) + msg := msgs[m] + msgTo := msg.to + if err = bba[msgTo].HandleMessage(msg.from, msg.msg); err != nil { + return fmt.Errorf("Failed to BBA.HandleMessage: %v", err) + } + + // Remove the message from the buffer and add the new messages. + msgs[m] = msgs[len(msgs)-1] + msgs = msgs[:len(msgs)-1] + msgs = appendTestAgreementMessages(bba[msgTo].Messages(), nodes[msgTo], nodes, msgs) + } + + out0 := bba[0].Output().(bool) + for i := range bba { + require.True(t, bba[i].done) + if i != 0 { + require.Equal(t, bba[i].Output(), out0) + } + bba[i].Stop() + } + return nil +} diff --git a/bench/main.go b/bench/main.go index 84bcf27..1337f9a 100644 --- a/bench/main.go +++ b/bench/main.go @@ -70,6 +70,7 @@ func makeNodes(n, ntx, txsize, batchSize int) []*hbbft.HoneyBadger { for i := 0; i < n; i++ { cfg := hbbft.Config{ N: n, + F: -1, ID: uint64(i), Nodes: makeids(n), BatchSize: batchSize, diff --git a/cc.go b/cc.go new file mode 100644 index 0000000..250d46a --- /dev/null +++ b/cc.go @@ -0,0 +1,31 @@ +package hbbft + +// CommonCoin is an interface to be provided by the real CC implementation. +type CommonCoin interface { + ForNodeID(nodeID uint64) CommonCoin + HandleRequest(epoch uint32, payload interface{}) (*bool, []interface{}, error) + StartCoinFlip(epoch uint32) (*bool, []interface{}, error) +} + +// fakeCoin is a trivial incorrect implementation of the common coin interface. +// It is used here just to avoid dependencies to particular crypto libraries. +type fakeCoin struct{} + +// Let's return the same coin for 2 times in a row. +// That can lead to faster consensus, because the next round is successful +// when the coin is the same as the first one with some decisions. +// It is beneficial to start such a sequence with [true, true, ...]. +func NewFakeCoin() CommonCoin { + return &fakeCoin{} +} + +func (fc *fakeCoin) ForNodeID(nodeID uint64) CommonCoin { + return &fakeCoin{} +} +func (fc *fakeCoin) HandleRequest(epoch uint32, payload interface{}) (*bool, []interface{}, error) { + panic("HandleRequest is not used in this implementation") +} +func (fc *fakeCoin) StartCoinFlip(epoch uint32) (*bool, []interface{}, error) { + coin := (epoch/2)%2 == 0 + return &coin, []interface{}{}, nil +} diff --git a/go.mod b/go.mod index ddaa570..af80e25 100644 --- a/go.mod +++ b/go.mod @@ -7,4 +7,5 @@ require ( github.com/klauspost/cpuid v1.2.1 // indirect github.com/klauspost/reedsolomon v1.9.2 github.com/sirupsen/logrus v1.4.2 + github.com/stretchr/testify v1.2.2 ) diff --git a/go.sum b/go.sum index 821fffe..4fa9787 100644 --- a/go.sum +++ b/go.sum @@ -1,15 +1,18 @@ github.com/NebulousLabs/merkletree v0.0.0-20181203152040-08d5d54b07f5 h1:pk9SclNGplPbF6YDIDKMhHh9SaUWcoxPkMr7zdu1hfk= github.com/NebulousLabs/merkletree v0.0.0-20181203152040-08d5d54b07f5/go.mod h1:Cn056wBLKay+uIS9LJn7ymwhgC5mqbOtG6iOhEvyy4M= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/klauspost/cpuid v1.2.1 h1:vJi+O/nMdFt0vqm8NZBI6wzALWdA2X+egi0ogNyrC/w= github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/reedsolomon v1.9.2 h1:E9CMS2Pqbv+C7tsrYad4YC9MfhnMVWhMRsTi7U0UB18= github.com/klauspost/reedsolomon v1.9.2/go.mod h1:CwCi+NUr9pqSVktrkN+Ondf06rkhYZ/pcNv7fu+8Un4= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/honey_badger.go b/honey_badger.go index cdf999f..e82c60e 100644 --- a/honey_badger.go +++ b/honey_badger.go @@ -28,8 +28,10 @@ type Config struct { ID uint64 // Identifiers of the participating nodes. Nodes []uint64 - // Maximum number of transactions that will be comitted in one epoch. + // Maximum number of transactions that will be committed in one epoch. BatchSize int + // Common Coin to use. + CommonCoin CommonCoin } // HoneyBadger represents the top-level protocol of the hbbft consensus. @@ -213,13 +215,7 @@ func (hb *HoneyBadger) removeOldEpochs(epoch uint64) { if i >= hb.epoch-1 { continue } - for _, t := range acs.bbaInstances { - t.stop() - } - for _, t := range acs.rbcInstances { - t.stop() - } - acs.stop() + acs.Stop() delete(hb.acsInstances, i) } } diff --git a/honey_badger_test.go b/honey_badger_test.go index cd6b9a8..2a81c95 100644 --- a/honey_badger_test.go +++ b/honey_badger_test.go @@ -11,6 +11,7 @@ import ( func TestEngineAddTransaction(t *testing.T) { cfg := Config{ N: 6, + F: -1, ID: 0, Nodes: []uint64{0, 1, 2, 3, 4, 5, 6}, } @@ -75,6 +76,7 @@ func makeTestNodes(n int) []*testNode { cfg := Config{ ID: uint64(i), N: len(transports), + F: -1, Nodes: makeids(n), } nodes[i] = newTestNode(NewHoneyBadger(cfg), tr) diff --git a/rbc.go b/rbc.go index 7bf705d..1e9bcc7 100644 --- a/rbc.go +++ b/rbc.go @@ -20,9 +20,8 @@ type BroadcastMessage struct { // ProofRequest holds the RootHash along with the Shard of the erasure encoded // payload. type ProofRequest struct { - RootHash []byte - // Proof[0] will containt the actual data. - Proof [][]byte + RootHash []byte + Proof [][]byte // Proof[0] will containt the actual data. Index, Leaves int } @@ -52,9 +51,9 @@ type RBC struct { // The reedsolomon encoder to encode the proposed value into shards. enc reedsolomon.Encoder // recvReadys is a mapping between the sender and the root hash that was - // inluded in the ReadyRequest. + // included in the ReadyRequest. recvReadys map[uint64][]byte - // revcEchos is a mapping between the sender and the EchoRequest. + // recvEchos is a mapping between the sender and the EchoRequest. recvEchos map[uint64]*EchoRequest // Number of the parity and data shards that will be used for erasure encoding // the given value. @@ -94,13 +93,15 @@ type ( // NewRBC returns a new instance of the ReliableBroadcast configured // with the given config func NewRBC(cfg Config, proposerID uint64) *RBC { - if cfg.F == 0 { + if cfg.F == -1 { cfg.F = (cfg.N - 1) / 3 } - var ( - parityShards = 2 * cfg.F - dataShards = cfg.N - parityShards - ) + parityShards := 2 * cfg.F + if parityShards == 0 { + parityShards = 1 + } + dataShards := cfg.N - parityShards + enc, err := reedsolomon.New(dataShards, parityShards) if err != nil { panic(err) @@ -137,8 +138,8 @@ func (r *RBC) InputValue(data []byte) ([]*BroadcastMessage, error) { } // HandleMessage will process the given rpc message and will return a possible -// outcome. The caller is resposible to make sure only RPC messages are passed -// that are elligible for the RBC protocol. +// outcome. The caller is responsible to make sure only RPC messages are passed +// that are eligible for the RBC protocol. func (r *RBC) HandleMessage(senderID uint64, msg *BroadcastMessage) error { t := rbcMessageTuple{ senderID: senderID, @@ -149,7 +150,7 @@ func (r *RBC) HandleMessage(senderID uint64, msg *BroadcastMessage) error { return <-t.err } -func (r *RBC) stop() { +func (r *RBC) Stop() { close(r.closeCh) } @@ -259,8 +260,8 @@ func (r *RBC) handleEchoRequest(senderID uint64, req *EchoRequest) error { return fmt.Errorf( "received invalid proof from (%d) my id (%d)", senderID, r.ID) } - r.recvEchos[senderID] = req + if r.readySent || r.countEchos(req.RootHash) < r.N-r.F { return r.tryDecodeValue(req.RootHash) } @@ -283,10 +284,11 @@ func (r *RBC) handleReadyRequest(senderID uint64, req *ReadyRequest) error { } r.recvReadys[senderID] = req.RootHash - if r.countReadys(req.RootHash) == r.F+1 && !r.readySent { + if !r.readySent && r.countReadys(req.RootHash) == r.F+1 { r.readySent = true ready := &ReadyRequest{req.RootHash} r.messages = append(r.messages, &BroadcastMessage{ready}) + return r.handleReadyRequest(r.ID, ready) } return r.tryDecodeValue(req.RootHash) } @@ -299,7 +301,6 @@ func (r *RBC) tryDecodeValue(hash []byte) error { } // At this point we can decode the shards. First we create a new slice of // only sortable proof values. - r.outputDecoded = true var prfs proofs for _, echo := range r.recvEchos { prfs = append(prfs, echo.ProofRequest) @@ -312,13 +313,17 @@ func (r *RBC) tryDecodeValue(hash []byte) error { shards[p.Index] = p.Proof[0] } if err := r.enc.Reconstruct(shards); err != nil { - return nil + if err == reedsolomon.ErrTooFewShards { + return nil + } + return err } var value []byte for _, data := range shards[:r.numDataShards] { value = append(value, data...) } r.output = value + r.outputDecoded = true return nil } @@ -326,7 +331,7 @@ func (r *RBC) tryDecodeValue(hash []byte) error { func (r *RBC) countEchos(hash []byte) int { n := 0 for _, e := range r.recvEchos { - if bytes.Compare(hash, e.RootHash) == 0 { + if bytes.Equal(hash, e.RootHash) { n++ } } @@ -337,7 +342,7 @@ func (r *RBC) countEchos(hash []byte) int { func (r *RBC) countReadys(hash []byte) int { n := 0 for _, h := range r.recvReadys { - if bytes.Compare(hash, h) == 0 { + if bytes.Equal(hash, h) { n++ } } @@ -350,7 +355,9 @@ func makeProofRequests(shards [][]byte) ([]*ProofRequest, error) { reqs := make([]*ProofRequest, len(shards)) for i := 0; i < len(reqs); i++ { tree := merkletree.New(sha256.New()) - tree.SetIndex(uint64(i)) + if err := tree.SetIndex(uint64(i)); err != nil { + return nil, err + } for i := 0; i < len(shards); i++ { tree.Push(shards[i]) } @@ -371,7 +378,9 @@ func makeBroadcastMessages(shards [][]byte) ([]*BroadcastMessage, error) { msgs := make([]*BroadcastMessage, len(shards)) for i := 0; i < len(msgs); i++ { tree := merkletree.New(sha256.New()) - tree.SetIndex(uint64(i)) + if err := tree.SetIndex(uint64(i)); err != nil { + return nil, err + } for i := 0; i < len(shards); i++ { tree.Push(shards[i]) } diff --git a/rbc_test.go b/rbc_test.go index eff6df0..1b103ce 100644 --- a/rbc_test.go +++ b/rbc_test.go @@ -1,13 +1,17 @@ package hbbft import ( + "bytes" + "fmt" "log" + "math/rand" "sync" "testing" "github.com/klauspost/reedsolomon" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) // Test RBC where 1 node will not provide its value. We use 4 nodes that will @@ -73,6 +77,7 @@ func TestRBC4GoodNodes(t *testing.T) { func TestRBCInputValue(t *testing.T) { rbc := NewRBC(Config{ N: 4, + F: -1, }, 0) reqs, err := rbc.InputValue([]byte("this is a test string")) assert.Nil(t, err) @@ -105,7 +110,7 @@ func TestNewReliableBroadcast(t *testing.T) { } func TestRBCOutputIsNilAfterConsuming(t *testing.T) { - rbc := NewRBC(Config{N: 4}, 0) + rbc := NewRBC(Config{N: 4, F: -1}, 0) output := []byte("a") rbc.output = output assert.Equal(t, output, rbc.Output()) @@ -113,8 +118,8 @@ func TestRBCOutputIsNilAfterConsuming(t *testing.T) { } func TestRBCMessagesIsEmptyAfterConsuming(t *testing.T) { - rbc := NewRBC(Config{N: 4}, 0) - rbc.messages = []*BroadcastMessage{&BroadcastMessage{}} + rbc := NewRBC(Config{N: 4, F: -1}, 0) + rbc.messages = []*BroadcastMessage{{}} assert.Equal(t, 1, len(rbc.Messages())) assert.Equal(t, 0, len(rbc.Messages())) } @@ -155,6 +160,98 @@ func TestMakeProofRequests(t *testing.T) { } } +// This test should be run repeatedly for some time. +func TestRBCRandomized(t *testing.T) { + if err := testRBCRandomized(t); err != nil { + t.Fatalf("Failed, reason=%+v", err) + } +} +func testRBCRandomized(t *testing.T) error { + var err error + var N, T = 7, 5 + + msgs := make([]*testRBCMsg, 0) + nodes := make([]uint64, N) + for n := range nodes { + nodes[n] = uint64(n) + } + + cfg := make([]Config, N) + for i := range cfg { + cfg[i] = Config{ + N: N, + F: N - T, + ID: uint64(i), + Nodes: nodes, + BatchSize: 21254, // Should be unused. + } + } + + var input [10000]byte + rand.Read(input[:]) + + rbc := make([]*RBC, N) + proposerID := uint64(rand.Intn(N)) + for i := range rbc { + rbc[i] = NewRBC(cfg[i], proposerID) + } + var inMsgs []*BroadcastMessage + if inMsgs, err = rbc[proposerID].InputValue(input[:]); err != nil { + return fmt.Errorf("Failed to process RBC.InputValue: %v", err) + } + msgs = appendTestRBCMsgsExplicit(inMsgs, proposerID, nodes, msgs) + for len(msgs) != 0 { + m := rand.Intn(len(msgs)) + msg := msgs[m] + msgTo := msg.to + if err = rbc[msgTo].HandleMessage(msg.from, msg.msg); err != nil { + return fmt.Errorf("Failed to RBC.HandleMessage: %v", err) + } + + // Remove the message from the buffer and add the new messages. + msgs[m] = msgs[len(msgs)-1] + msgs = msgs[:len(msgs)-1] + msgs = appendTestRBCMsgsBroadcast(rbc[msgTo].Messages(), msgTo, nodes, msgs) + } + + for i := range rbc { + out := rbc[i].Output() + require.NotNil(t, out) + require.Equal(t, bytes.Compare(input[:], out[:len(input)]), 0) // RBC adds zeros to the end. + rbc[i].Stop() + } + return nil +} + +type testRBCMsg struct { + from uint64 + to uint64 + msg *BroadcastMessage +} + +func appendTestRBCMsgsExplicit(msgs []*BroadcastMessage, senderID uint64, nodes []uint64, buf []*testRBCMsg) []*testRBCMsg { + output := buf[:] + msgPos := 0 + for n := range nodes { + if nodes[n] != senderID { + output = append(output, &testRBCMsg{from: senderID, to: nodes[n], msg: msgs[msgPos]}) + msgPos++ + } + } + return output +} +func appendTestRBCMsgsBroadcast(msgs []*BroadcastMessage, senderID uint64, nodes []uint64, buf []*testRBCMsg) []*testRBCMsg { + output := buf[:] + for n := range nodes { + if nodes[n] != senderID { + for m := range msgs { + output = append(output, &testRBCMsg{from: senderID, to: nodes[n], msg: msgs[m]}) + } + } + } + return output +} + type bcResult struct { nodeID uint64 value []byte @@ -188,11 +285,13 @@ func (e *testRBCEngine) run() { continue } for _, msg := range e.rbc.Messages() { - e.transport.Broadcast(e.rbc.ID, msg) + if err := e.transport.Broadcast(e.rbc.ID, msg); err != nil { + panic(err) + } } if output := e.rbc.Output(); output != nil { // Faulty node will refuse to send its produced output, causing - // potential disturb of conensus liveness. + // potential disturb of consensus liveness. if e.faulty { continue } @@ -214,8 +313,7 @@ func (e *testRBCEngine) inputValue(data []byte) error { for i := 0; i < len(reqs); i++ { msgs[i] = reqs[i] } - e.transport.SendProofMessages(e.rbc.ID, msgs) - return nil + return e.transport.SendProofMessages(e.rbc.ID, msgs) } func makeRBCNodes(n, pid int, resCh chan bcResult) []*testRBCEngine { @@ -227,6 +325,7 @@ func makeRBCNodes(n, pid int, resCh chan bcResult) []*testRBCEngine { cfg := Config{ ID: uint64(i), N: len(transports), + F: -1, } nodes[i] = newTestRBCEngine(resCh, NewRBC(cfg, uint64(pid)), tr) go nodes[i].run() diff --git a/simulation/main.go b/simulation/main.go index a1dc63d..9c0230c 100644 --- a/simulation/main.go +++ b/simulation/main.go @@ -84,6 +84,7 @@ type Server struct { func newServer(id uint64, tr hbbft.Transport, nodes []uint64) *Server { hb := hbbft.NewHoneyBadger(hbbft.Config{ N: len(nodes), + F: -1, ID: id, Nodes: nodes, BatchSize: batchSize,