From bb08a8eb56c03bb27391465cdf5c940d54c85edb Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Wed, 4 Feb 2026 19:45:57 +0300 Subject: [PATCH] implement missing checks for aggregator committee --- .../v2/ssv/runner/aggregator_committee.go | 5 +- protocol/v2/ssv/runner/runner.go | 4 +- protocol/v2/ssv/runner/runner_validations.go | 84 +++++++++++++++---- protocol/v2/ssv/spectest/value_checker.go | 10 ++- protocol/v2/ssv/testing/runner.go | 10 ++- protocol/v2/ssv/value_check.go | 83 +++++++++++++++++- 6 files changed, 172 insertions(+), 24 deletions(-) diff --git a/protocol/v2/ssv/runner/aggregator_committee.go b/protocol/v2/ssv/runner/aggregator_committee.go index 9d7c56c760..23f90f9cae 100644 --- a/protocol/v2/ssv/runner/aggregator_committee.go +++ b/protocol/v2/ssv/runner/aggregator_committee.go @@ -83,7 +83,6 @@ func NewAggregatorCommitteeRunner( Share: share, QBFTController: qbftController, }, - ValCheck: ssv.NewAggregatorCommitteeChecker(), beacon: beacon, network: network, signer: signer, @@ -582,6 +581,10 @@ func (r *AggregatorCommitteeRunner) ProcessPreConsensus( return fmt.Errorf("invalid aggregator committee consensus data: %w", err) } + r.ValCheck = ssv.NewAggregatorCommitteeChecker( + duty, + r.GetBeaconNode(), + ) r.measurements.StartConsensus() if err := r.BaseRunner.decide( ctx, diff --git a/protocol/v2/ssv/runner/runner.go b/protocol/v2/ssv/runner/runner.go index 73baa76958..d006048bd1 100644 --- a/protocol/v2/ssv/runner/runner.go +++ b/protocol/v2/ssv/runner/runner.go @@ -234,7 +234,7 @@ func (b *BaseRunner) basePreConsensusMsgProcessing(ctx context.Context, logger * // Reuse the existing span instead of generating new one to keep tracing-data lightweight. span := trace.SpanFromContext(ctx) - if err := b.ValidatePreConsensusMsg(ctx, runner, signedMsg); err != nil { + if err := b.ValidatePreConsensusMsg(ctx, logger, runner, signedMsg); err != nil { return false, nil, fmt.Errorf("invalid pre-consensus message: %w", err) } @@ -330,7 +330,7 @@ func (b *BaseRunner) basePostConsensusMsgProcessing( // Reuse the existing span instead of generating new one to keep tracing-data lightweight. span := trace.SpanFromContext(ctx) - if err := b.ValidatePostConsensusMsg(ctx, runner, signedMsg); err != nil { + if err := b.ValidatePostConsensusMsg(ctx, logger, runner, signedMsg); err != nil { return false, nil, fmt.Errorf("invalid post-consensus message: %w", err) } diff --git a/protocol/v2/ssv/runner/runner_validations.go b/protocol/v2/ssv/runner/runner_validations.go index 2ec83ce864..71d713fbcc 100644 --- a/protocol/v2/ssv/runner/runner_validations.go +++ b/protocol/v2/ssv/runner/runner_validations.go @@ -10,6 +10,7 @@ import ( ssz "github.com/ferranbt/fastssz" "github.com/pkg/errors" specqbft "github.com/ssvlabs/ssv-spec/qbft" + "go.uber.org/zap" spectypes "github.com/ssvlabs/ssv-spec/types" @@ -18,6 +19,7 @@ import ( func (b *BaseRunner) ValidatePreConsensusMsg( ctx context.Context, + logger *zap.Logger, runner Runner, psigMsgs *spectypes.PartialSignatureMessages, ) error { @@ -28,27 +30,40 @@ func (b *BaseRunner) ValidatePreConsensusMsg( return spectypes.WrapError(spectypes.NoRunningDutyErrorCode, ErrRunningDutyFinished) } - // Validate the pre-consensus message differently depending on a message type. - validateMsg := func() error { - if err := b.validatePartialSigMsg(psigMsgs, b.State.CurrentDuty.DutySlot()); err != nil { - return err + if err := b.validatePartialSigMsg(psigMsgs, b.State.CurrentDuty.DutySlot()); err != nil { + return err + } + + if runner.GetRole() == spectypes.RoleAggregatorCommittee { + aggRunner, ok := runner.(*AggregatorCommitteeRunner) + if !ok { + return fmt.Errorf("unexpected runner type %T for aggregator committee role", runner) } - roots, domain, err := runner.expectedPreConsensusRootsAndDomain() + aggregatorMap, contributionMap, err := aggRunner.expectedPreConsensusRoots(ctx, logger) if err != nil { - return fmt.Errorf("compute pre-consensus roots and domain: %w", err) + return fmt.Errorf("compute pre-consensus roots: %w", err) } - return b.verifyExpectedRoot(ctx, runner, psigMsgs, roots, domain) + expectedRoots := make(map[[32]byte]struct{}) + for _, root := range aggregatorMap { + expectedRoots[root] = struct{}{} + } + for _, indexMap := range contributionMap { + for _, root := range indexMap { + expectedRoots[root] = struct{}{} + } + } + + return b.verifyExpectedSigningRoots(psigMsgs, expectedRoots) } - if runner.GetRole() == spectypes.RoleAggregatorCommittee { - validateMsg = func() error { - return b.validatePartialSigMsg(psigMsgs, b.State.CurrentDuty.DutySlot()) - } + roots, domain, err := runner.expectedPreConsensusRootsAndDomain() + if err != nil { + return fmt.Errorf("compute pre-consensus roots and domain: %w", err) } - return validateMsg() + return b.verifyExpectedRoot(ctx, runner, psigMsgs, roots, domain) } // Verify each signature in container removing the invalid ones @@ -63,7 +78,12 @@ func (b *BaseRunner) FallBackAndVerifyEachSignature(container *ssv.PartialSigCon } } -func (b *BaseRunner) ValidatePostConsensusMsg(ctx context.Context, runner Runner, psigMsgs *spectypes.PartialSignatureMessages) error { +func (b *BaseRunner) ValidatePostConsensusMsg( + ctx context.Context, + logger *zap.Logger, + runner Runner, + psigMsgs *spectypes.PartialSignatureMessages, +) error { if !b.hasDutyAssigned() { return spectypes.WrapError(spectypes.NoRunningDutyErrorCode, ErrNoDutyAssigned) } @@ -159,7 +179,31 @@ func (b *BaseRunner) ValidatePostConsensusMsg(ctx context.Context, runner Runner // Use b.State.CurrentDuty.DutySlot() since CurrentDuty never changes for AggregatorCommitteeRunner // by design, hence there is no need to store slot number on decidedValue for AggregatorCommitteeRunner. expectedSlot := b.State.CurrentDuty.DutySlot() - return b.validatePartialSigMsg(psigMsgs, expectedSlot) + if err := b.validatePartialSigMsg(psigMsgs, expectedSlot); err != nil { + return err + } + + aggRunner, ok := runner.(*AggregatorCommitteeRunner) + if !ok { + return fmt.Errorf("unexpected runner type %T for aggregator committee role", runner) + } + + aggregatorMap, contributionMap, _, err := aggRunner.expectedPostConsensusRootsAndBeaconObjects(ctx, logger) + if err != nil { + return fmt.Errorf("compute post-consensus roots: %w", err) + } + + expectedRoots := make(map[[32]byte]struct{}) + for _, root := range aggregatorMap { + expectedRoots[root] = struct{}{} + } + for _, roots := range contributionMap { + for _, root := range roots { + expectedRoots[root] = struct{}{} + } + } + + return b.verifyExpectedSigningRoots(psigMsgs, expectedRoots) } } @@ -235,3 +279,15 @@ func (b *BaseRunner) verifyExpectedRoot( } return nil } + +func (b *BaseRunner) verifyExpectedSigningRoots( + psigMsgs *spectypes.PartialSignatureMessages, + expectedRoots map[[32]byte]struct{}, +) error { + for _, msg := range psigMsgs.Messages { + if _, ok := expectedRoots[msg.SigningRoot]; !ok { + return spectypes.NewError(spectypes.RootHashInvalidErrorCode, "unexpected signing root") + } + } + return nil +} diff --git a/protocol/v2/ssv/spectest/value_checker.go b/protocol/v2/ssv/spectest/value_checker.go index 2a67c7f9e6..6fbd3db50c 100644 --- a/protocol/v2/ssv/spectest/value_checker.go +++ b/protocol/v2/ssv/spectest/value_checker.go @@ -134,7 +134,10 @@ func (test *ValCheckSpecTest) valCheckF(signer ekm.BeaconSigner) func([]byte) er ) return checker.CheckValue case spectypes.RoleAggregatorCommittee: - checker := ssv.NewAggregatorCommitteeChecker() + checker := ssv.NewAggregatorCommitteeChecker( + spectestingutils.TestingAggregatorCommitteeDutyMixed(spec.DataVersionPhase0), + spectestingutils.NewTestingBeaconNode(), + ) return checker.CheckValue default: return nil @@ -251,7 +254,10 @@ func createValueChecker(r runner.Runner, signerSource ...runner.Runner) ssv.Valu expectedVote, ) case *runner.AggregatorCommitteeRunner: - return ssv.NewAggregatorCommitteeChecker() + return ssv.NewAggregatorCommitteeChecker( + spectestingutils.TestingAggregatorCommitteeDutyMixed(spec.DataVersionPhase0), + spectestingutils.NewTestingBeaconNode(), + ) default: return nil diff --git a/protocol/v2/ssv/testing/runner.go b/protocol/v2/ssv/testing/runner.go index 4f6cfe94f9..349f464f2f 100644 --- a/protocol/v2/ssv/testing/runner.go +++ b/protocol/v2/ssv/testing/runner.go @@ -101,7 +101,10 @@ var ConstructBaseRunner = func( valCheck = ssv.NewVoteChecker(km, spectestingutils.TestingDutySlot, []phase0.BLSPubKey{phase0.BLSPubKey(share.SharePubKey)}, vote) case spectypes.RoleAggregatorCommittee: - valCheck = ssv.NewAggregatorCommitteeChecker() + valCheck = ssv.NewAggregatorCommitteeChecker( + spectestingutils.TestingAggregatorCommitteeDutyMixed(spec.DataVersionPhase0), + protocoltesting.NewTestingBeaconNodeWrapped(), + ) case spectypes.RoleProposer: valCheck = ssv.NewProposerChecker(km, networkconfig.TestNetwork.Beacon, (spectypes.ValidatorPK)(spectestingutils.TestingValidatorPubKey), spectestingutils.TestingValidatorIndex, @@ -391,7 +394,10 @@ var ConstructBaseRunnerWithShareMap = func( valCheck = ssv.NewVoteChecker(km, spectestingutils.TestingDutySlot, sharePubKeys, vote) case spectypes.RoleAggregatorCommittee: - valCheck = ssv.NewAggregatorCommitteeChecker() + valCheck = ssv.NewAggregatorCommitteeChecker( + spectestingutils.TestingAggregatorCommitteeDutyMixed(spec.DataVersionPhase0), + protocoltesting.NewTestingBeaconNodeWrapped(), + ) case spectypes.RoleProposer: valCheck = ssv.NewProposerChecker(km, networkconfig.TestNetwork.Beacon, shareInstance.ValidatorPubKey, shareInstance.ValidatorIndex, phase0.BLSPubKey(shareInstance.SharePubKey)) diff --git a/protocol/v2/ssv/value_check.go b/protocol/v2/ssv/value_check.go index 1d95f08257..b5a4696eb0 100644 --- a/protocol/v2/ssv/value_check.go +++ b/protocol/v2/ssv/value_check.go @@ -78,10 +78,47 @@ func (v *voteChecker) CheckValue(value []byte) error { return nil } -type aggregatorCommitteeChecker struct{} +type aggregatorCommitteeChecker struct { + allowedAggregators map[phase0.ValidatorIndex]map[phase0.CommitteeIndex]struct{} + allowedContributors map[phase0.ValidatorIndex]map[uint64]struct{} +} + +type syncCommitteeSubnetIDProvider interface { + SyncCommitteeSubnetID(phase0.CommitteeIndex) uint64 +} + +func NewAggregatorCommitteeChecker( + duty *spectypes.AggregatorCommitteeDuty, + subnetProvider syncCommitteeSubnetIDProvider, +) ValueChecker { + allowedAggregators := make(map[phase0.ValidatorIndex]map[phase0.CommitteeIndex]struct{}) + allowedContributors := make(map[phase0.ValidatorIndex]map[uint64]struct{}) + + for _, vDuty := range duty.ValidatorDuties { + switch vDuty.Type { + case spectypes.BNRoleAggregator: + if _, ok := allowedAggregators[vDuty.ValidatorIndex]; !ok { + allowedAggregators[vDuty.ValidatorIndex] = make(map[phase0.CommitteeIndex]struct{}) + } + allowedAggregators[vDuty.ValidatorIndex][(vDuty.CommitteeIndex)] = struct{}{} + + case spectypes.BNRoleSyncCommitteeContribution: + if _, ok := allowedContributors[vDuty.ValidatorIndex]; !ok { + allowedContributors[vDuty.ValidatorIndex] = make(map[uint64]struct{}) + } + for _, index := range vDuty.ValidatorSyncCommitteeIndices { + subnet := subnetProvider.SyncCommitteeSubnetID(phase0.CommitteeIndex(index)) + allowedContributors[vDuty.ValidatorIndex][subnet] = struct{}{} + } + default: + // Other duty types are unexpected + } + } -func NewAggregatorCommitteeChecker() ValueChecker { - return &aggregatorCommitteeChecker{} + return &aggregatorCommitteeChecker{ + allowedAggregators: allowedAggregators, + allowedContributors: allowedContributors, + } } func (v *aggregatorCommitteeChecker) CheckValue(value []byte) error { @@ -96,6 +133,46 @@ func (v *aggregatorCommitteeChecker) CheckValue(value []byte) error { return fmt.Errorf("invalid value: %w", err) } + if len(cd.Aggregators) == 0 && len(cd.Contributors) == 0 { + return spectypes.WrapError( + spectypes.AggCommConsensusDataNoValidatorErrorCode, + fmt.Errorf("no aggregators or sync committee contributors in consensus data"), + ) + } + + for _, agg := range cd.Aggregators { + allowedByValidator, ok := v.allowedAggregators[agg.ValidatorIndex] + if !ok { + return spectypes.NewError( + spectypes.QBFTValueInvalidErrorCode, + fmt.Sprintf("unexpected aggregator validator %d", agg.ValidatorIndex), + ) + } + if _, ok := allowedByValidator[phase0.CommitteeIndex(agg.CommitteeIndex)]; !ok { + return spectypes.NewError( + spectypes.QBFTValueInvalidErrorCode, + fmt.Sprintf("unexpected aggregator committee index %d for validator %d", agg.CommitteeIndex, agg.ValidatorIndex), + ) + } + } + + for _, contrib := range cd.Contributors { + allowedByValidator, ok := v.allowedContributors[contrib.ValidatorIndex] + if !ok { + return spectypes.NewError( + spectypes.QBFTValueInvalidErrorCode, + fmt.Sprintf("unexpected contributor validator %d", contrib.ValidatorIndex), + ) + } + subnetID := contrib.CommitteeIndex + if _, ok := allowedByValidator[subnetID]; !ok { + return spectypes.NewError( + spectypes.QBFTValueInvalidErrorCode, + fmt.Sprintf("unexpected contributor subnet %d for validator %d", subnetID, contrib.ValidatorIndex), + ) + } + } + return nil }