Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
test:
@go test ./... --cover
test:
@go test ./... --cover

test-randomized:
go test --count 10000 --run TestBBARandomized
go test --count 10000 --run TestRBCRandomized
go test --count 10000 --run TestACSRandomized
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,18 @@ make test
Create a new instance of HoneyBadger.

```golang
// Create a Config struct with your prefered settings.
// Create a Config struct with your preferred settings.
cfg := hbbft.Config{
// The number of nodes in the network.
N: 4,
// Number of tolerated faulty nodes, use -1 to take max possible.
F: -1,
// Identifier of this node.
ID: 101,
// Identifiers of the participating nodes.
Nodes: uint64{67, 1, 99, 101},
// The prefered batch size. If BatchSize is empty, an ideal batch size will
// be choosen for you.
// The preferred batch size. If BatchSize is empty, an ideal batch size will
// be chosen for you.
BatchSize: 100,
}

Expand Down
60 changes: 44 additions & 16 deletions acs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ type ACSMessage struct {
// ACS implements the Asynchronous Common Subset protocol.
// ACS assumes a network of N nodes that send signed messages to each other.
// There can be f faulty nodes where (3 * f < N).
// Each participating node proposes an element for inlcusion. The protocol
// Each participating node proposes an element for inclusion. The protocol
// guarantees that all of the good nodes output the same set, consisting of
// at least (N -f) of the proposed values.
//
// Algorithm:
// ACS creates a Broadcast algorithm for each of the participating nodes.
// At least (N -f) of these will eventually output the element proposed by that
// node. ACS will also create and BBA instance for each participating node, to
// decide whether that node's proposed element should be inlcuded in common set.
// Whenever an element is received via broadcast, we imput "true" into the
// decide whether that node's proposed element should be included in common set.
// Whenever an element is received via broadcast, we input "true" into the
// corresponding BBA instance. When (N-f) BBA instances have decided true we
// input false into the remaining ones, where we haven't provided input yet.
// Once all BBA instances have decided, ACS returns the set of all proposed
Expand Down Expand Up @@ -64,9 +64,7 @@ type (
}

acsInputResponse struct {
rbcMessages []*BroadcastMessage
acsMessages []*ACSMessage
err error
err error
}

acsInputTuple struct {
Expand All @@ -78,7 +76,7 @@ type (
// NewACS returns a new ACS instance configured with the given Config and node
// ids.
func NewACS(cfg Config) *ACS {
if cfg.F == 0 {
if cfg.F == -1 {
cfg.F = (cfg.N - 1) / 3
}
acs := &ACS{
Expand All @@ -95,12 +93,23 @@ func NewACS(cfg Config) *ACS {
// Create all the instances for the participating nodes
for _, id := range cfg.Nodes {
acs.rbcInstances[id] = NewRBC(cfg, id)
acs.bbaInstances[id] = NewBBA(cfg)
acs.bbaInstances[id] = NewBBA(cfg, id)
}
go acs.run()
return acs
}

// DebugInfo returns internal state. Should be used for debugging only.
func (a *ACS) DebugInfo() (map[uint64]*RBC, map[uint64]*BBA, map[uint64][]byte, map[uint64]bool, []MessageTuple) { // TODO: ...
return a.rbcInstances, a.bbaInstances, a.rbcResults, a.bbaResults, a.messageQue.que
}

// Messages returns all the internal messages from the message que. Note that
// the que will be empty after invoking this method.
func (a *ACS) Messages() []MessageTuple {
return a.messageQue.messages()
}

// InputValue sets the input value for broadcast and returns an initial set of
// Broadcast and ACS Messages to be broadcasted in the network.
func (a *ACS) InputValue(val []byte) error {
Expand Down Expand Up @@ -151,12 +160,17 @@ func (a *ACS) Output() map[uint64][]byte {
}

// Done returns true whether ACS has completed its agreements and cleared its
// messageQue.
// messageQue. It is possible that the BBA part will decide before the RBC.
func (a *ACS) Done() bool {
agreementsDone := true
for _, bba := range a.bbaInstances {
if !bba.done {
for i, bba := range a.bbaInstances {
if !bba.Done() {
agreementsDone = false
break
}
if a.bbaResults[i] && a.rbcResults[i] == nil {
agreementsDone = false
break
}
}
return agreementsDone && a.messageQue.len() == 0
Expand Down Expand Up @@ -184,7 +198,7 @@ func (a *ACS) inputValue(data []byte) error {
}
if output := rbc.Output(); output != nil {
a.rbcResults[a.ID] = output
a.processAgreement(a.ID, func(bba *BBA) error {
return a.processAgreement(a.ID, func(bba *BBA) error {
if bba.AcceptInput() {
return bba.InputValue(true)
}
Expand All @@ -194,8 +208,14 @@ func (a *ACS) inputValue(data []byte) error {
return nil
}

func (a *ACS) stop() {
func (a *ACS) Stop() {
close(a.closeCh)
for _, rbc := range a.rbcInstances {
rbc.Stop()
}
for _, bba := range a.bbaInstances {
bba.Stop()
}
}

func (a *ACS) run() {
Expand Down Expand Up @@ -255,7 +275,10 @@ func (a *ACS) processAgreement(pid uint64, fun func(bba *BBA) error) error {
if !ok {
return fmt.Errorf("could not find bba instance for (%d)", pid)
}
if bba.done {
if bba.Done() {
if !a.decided {
a.tryCompleteAgreement()
}
return nil
}
if err := fun(bba); err != nil {
Expand Down Expand Up @@ -287,6 +310,10 @@ func (a *ACS) processAgreement(pid uint64, fun func(bba *BBA) error) error {
}
}
}
}
// Completion can be triggered either by the BBA or the RBC output,
// depending on which is completed first. Both variants are possible.
if _, ok := a.bbaResults[pid]; ok {
a.tryCompleteAgreement()
}
return nil
Expand All @@ -308,8 +335,9 @@ func (a *ACS) tryCompleteAgreement() {
}
bcResults := make(map[uint64][]byte)
for _, id := range nodesThatProvidedTrue {
val, _ := a.rbcResults[id]
bcResults[id] = val
if val, bcOk := a.rbcResults[id]; bcOk {
bcResults[id] = val
}
}
if len(nodesThatProvidedTrue) == len(bcResults) {
a.output = bcResults
Expand Down
91 changes: 89 additions & 2 deletions acs_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package hbbft

import (
"bytes"
"fmt"
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// Test ACS with 4 good nodes. The result should be that at least the output
Expand Down Expand Up @@ -78,6 +82,7 @@ func TestNewACS(t *testing.T) {
nodes = []uint64{0, 1, 2, 3}
acs = NewACS(Config{
N: len(nodes),
F: -1, // Use default.
ID: id,
Nodes: nodes,
})
Expand All @@ -97,7 +102,7 @@ func TestNewACS(t *testing.T) {
}

func TestACSOutputIsNilAfterConsuming(t *testing.T) {
acs := NewACS(Config{N: 4})
acs := NewACS(Config{N: 4, F: -1}) // Use default for F.
output := map[uint64][]byte{
1: []byte("this is it"),
}
Expand All @@ -106,15 +111,97 @@ func TestACSOutputIsNilAfterConsuming(t *testing.T) {
assert.Nil(t, acs.Output())
}

// This test checks, if messages sent in any order are still handled correctly.
// It is expected to run this test multiple times.
func TestACSRandomized(t *testing.T) {
if err := testACSRandomized(t); err != nil {
t.Fatalf("Failed, reason=%+v", err)
}
}
func testACSRandomized(t *testing.T) error {
var err error
var N, T = 7, 5

msgs := make([]*testMsg, 0)
nodes := make([]uint64, N)
for n := range nodes {
nodes[n] = uint64(n)
}

cfg := make([]Config, N)
for i := range cfg {
cfg[i] = Config{
N: N,
F: N - T,
ID: uint64(i),
Nodes: nodes,
BatchSize: 21254, // Should be unused.
}

}

acs := make([]*ACS, N)
for a := range acs {
acs[a] = NewACS(cfg[a])
if err = acs[a].InputValue([]byte{1, 2, byte(a)}); err != nil {
return fmt.Errorf("Failed to process ACS.InputValue: %+v", err)
}
msgs = appendTestMsgs(acs[a].Messages(), nodes[a], msgs)
}

// var done bool
for len(msgs) != 0 {
m := rand.Intn(len(msgs))
msg := msgs[m]

msgTo := msg.msg.To
if acsMsg, ok := msg.msg.Payload.(*ACSMessage); ok {
if err = acs[msgTo].HandleMessage(uint64(msg.from), acsMsg); err != nil {
return fmt.Errorf("Failed to ACS.HandleMessage: %+v", err)
}
} else {
return fmt.Errorf("Unexpected message type: %+v", msg.msg.Payload)
}

// Remove the message from the buffer and append the new messages, if any.
msgs[m] = msgs[len(msgs)-1]
msgs = msgs[:len(msgs)-1]
msgs = appendTestMsgs(acs[msgTo].Messages(), nodes[msgTo], msgs)
}

out0 := acs[0].Output()
for a := range acs {
require.True(t, acs[a].Done())
if a == 0 {
continue
}
var outA map[uint64][]byte = acs[a].Output()
require.Equal(t, len(out0), len(outA))
for i := range out0 {
require.Equal(t, bytes.Compare(out0[i], outA[i]), 0)
}
acs[a].Stop()
}
return nil
}

type testMsg struct {
from uint64
msg MessageTuple
}

func appendTestMsgs(msgs []MessageTuple, senderID uint64, buf []*testMsg) []*testMsg {
output := buf[:]
for m := range msgs {
output = append(output, &testMsg{from: senderID, msg: msgs[m]})
}
return output
}

func makeACSNetwork(n int) []*ACS {
network := make([]*ACS, n)
for i := 0; i < n; i++ {
network[i] = NewACS(Config{N: n, ID: uint64(i), Nodes: makeids(n)})
network[i] = NewACS(Config{N: n, F: -1, ID: uint64(i), Nodes: makeids(n)}) // Use default for F.
go network[i].run()
}
return network
Expand Down
Loading