Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 79 additions & 82 deletions anchor/message_validator/src/consensus_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ use types::Epoch;

use crate::{
FIRST_ROUND, ValidatedSSVMessage, ValidationContext, ValidationFailure, compute_quorum_size,
duty_state::DutyState, hash_data, slot_start_time, validate_beacon_duty, validate_duty_count,
validate_slot_time, verify_message_signatures,
duty_state::OperatorState, hash_data, slot_start_time, validate_beacon_duty,
validate_duty_count, validate_slot_time, verify_message_signatures,
};

pub(crate) fn validate_consensus_message(
validation_context: ValidationContext<impl SlotClock>,
duty_state: &mut DutyState,
operator_state: &mut OperatorState,
duty_provider: Arc<impl DutiesProvider>,
) -> Result<ValidatedSSVMessage, ValidationFailure> {
// Decode message to QbftMessage
Expand Down Expand Up @@ -54,12 +54,12 @@ pub(crate) fn validate_consensus_message(
validation_context.operator_pub_keys,
)?;

validate_qbft_logic(&validation_context, &consensus_message, duty_state)?;
validate_qbft_logic(&validation_context, &consensus_message, operator_state)?;

validate_qbft_message_by_duty_logic(
&validation_context,
&consensus_message,
duty_state,
operator_state,
duty_provider,
)?;

Expand All @@ -68,10 +68,13 @@ pub(crate) fn validate_consensus_message(
validation_context.operator_pub_keys,
)?;

duty_state.update_for_consensus_message(
let msg_slot = Slot::from(consensus_message.height);
let msg_epoch = msg_slot.epoch(validation_context.slots_per_epoch);
operator_state.update(
validation_context.signed_ssv_message,
&consensus_message,
validation_context.slots_per_epoch,
&msg_slot,
&msg_epoch,
);

// Return the validated message
Expand Down Expand Up @@ -232,7 +235,7 @@ fn validate_justification_list<N: Unsigned>(
pub(crate) fn validate_qbft_logic(
validation_context: &ValidationContext<impl SlotClock>,
consensus_message: &QbftMessage,
duty_state: &mut DutyState,
operator_state: &mut OperatorState,
) -> Result<(), ValidationFailure> {
let signed_ssv_message = validation_context.signed_ssv_message;

Expand Down Expand Up @@ -260,55 +263,49 @@ pub(crate) fn validate_qbft_logic(
// Create slot from height
let msg_slot = Slot::new(consensus_message.height);

// Check validation rules for each signer
for signer in signers {
// Get or create the operator state first, then check if there's a signer state
let Some(signer_state) = duty_state
.get_or_create_operator(signer)
.get_signer_state(&msg_slot)
else {
continue;
};

if signers.len() == 1 {
// Single-signer validation rules (non-decided messages)

// Rule: Ignore if peer already advanced to a later round
if consensus_message.round < signer_state.round {
// Signers aren't allowed to decrease their round.
// If they've sent a future message due to clock error,
// they'd have to wait for the next slot/round to be accepted.
return Err(ValidationFailure::RoundAlreadyAdvanced {
got: consensus_message.round,
want: signer_state.round,
});
}
// Get or create the operator state first, then check if there's a signer state
let Some(signer_state) = operator_state.get_signer_state(&msg_slot) else {
return Ok(());
Copy link
Member

@shane-moore shane-moore Feb 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the pr! This return Ok() causes validate_round_in_allowed_spread to be skipped for single signer messages when there's no prior state. The old continue fell through to the round
spread check after the loop

perhaps move the round spread check before the early return:

if signers.len() == 1 {
    validate_round_in_allowed_spread(consensus_message, validation_context)?;
}

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, i missed that. will make the change

};

if consensus_message.round == signer_state.round {
// Rule: Peer must not send two proposals with different data.
// We separately verify that the root in the message matches the data.
if !signed_ssv_message.full_data().is_empty()
&& signer_state
.proposal_hash
.as_ref()
.is_some_and(|hash| hash != consensus_message.root)
{
return Err(ValidationFailure::DifferentProposalData);
}
if signers.len() == 1 {
// Single-signer validation rules (non-decided messages)

// Rule: Ignore if peer already advanced to a later round
if consensus_message.round < signer_state.round {
// Signers aren't allowed to decrease their round.
// If they've sent a future message due to clock error,
// they'd have to wait for the next slot/round to be accepted.
return Err(ValidationFailure::RoundAlreadyAdvanced {
got: consensus_message.round,
want: signer_state.round,
});
}

signer_state
.message_counts
.validate_consensus_message_limits(
signed_ssv_message,
consensus_message.qbft_message_type,
)?;
}
} else if signers.len() > 1 {
// Rule: Decided msg can't have the same signers as previously sent before for the same
// duty
if signer_state.has_seen_signers(signers) {
return Err(ValidationFailure::DecidedWithSameSigners);
if consensus_message.round == signer_state.round {
// Rule: Peer must not send two proposals with different data.
// We separately verify that the root in the message matches the data.
if !signed_ssv_message.full_data().is_empty()
&& signer_state
.proposal_hash
.as_ref()
.is_some_and(|hash| hash != consensus_message.root)
{
return Err(ValidationFailure::DifferentProposalData);
}

signer_state
.message_counts
.validate_consensus_message_limits(
signed_ssv_message,
consensus_message.qbft_message_type,
)?;
}
} else if signers.len() > 1 {
// Rule: Decided msg can't have the same signers as previously sent before for the same
// duty
if signer_state.has_seen_signers(signers) {
return Err(ValidationFailure::DecidedWithSameSigners);
}
}

Expand Down Expand Up @@ -435,24 +432,20 @@ fn current_estimated_round(since_slot_start: Duration) -> Round {
pub(crate) fn validate_qbft_message_by_duty_logic(
validation_context: &ValidationContext<impl SlotClock>,
consensus_message: &QbftMessage,
duty_state: &mut DutyState,
operator_state: &mut OperatorState,
duty_provider: Arc<impl DutiesProvider>,
) -> Result<(), ValidationFailure> {
let role = validation_context.role;
let signed_ssv_message = validation_context.signed_ssv_message;

// Rule: Height must not be "old". I.e., signer must not have already advanced to a later slot.
// Skip for committee roles
if !role.is_committee_role() {
for &signer in signed_ssv_message.operator_ids() {
let signer_state = duty_state.get_or_create_operator(&signer);
let max_slot = signer_state.max_slot();
if max_slot > consensus_message.height {
return Err(ValidationFailure::SlotAlreadyAdvanced {
got: consensus_message.height,
want: max_slot.as_u64(),
});
}
let max_slot = operator_state.max_slot();
if max_slot > consensus_message.height {
return Err(ValidationFailure::SlotAlreadyAdvanced {
got: consensus_message.height,
want: max_slot.as_u64(),
});
}
}

Expand All @@ -472,15 +465,12 @@ pub(crate) fn validate_qbft_message_by_duty_logic(
validate_slot_time(msg_slot, validation_context)?;

// Rule: valid number of duties per epoch
for &signer in signed_ssv_message.operator_ids() {
let signer_state = duty_state.get_or_create_operator(&signer);
validate_duty_count(
validation_context,
msg_slot,
signer_state,
duty_provider.clone(),
)?;
}
validate_duty_count(
validation_context,
msg_slot,
operator_state,
duty_provider.clone(),
)?;

Ok(())
}
Expand Down Expand Up @@ -601,7 +591,7 @@ mod tests {
let expected_duty_count = 5;
let result = validate_ssv_message(
validation_context,
&mut DutyState::new(2),
&mut OperatorState::new(2),
Arc::new(MockDutiesProvider {
voluntary_exit_duty_count: expected_duty_count,
}),
Expand Down Expand Up @@ -660,7 +650,7 @@ mod tests {

let result = validate_ssv_message(
validation_context,
&mut DutyState::new(2),
&mut OperatorState::new(2),
Arc::new(MockDutiesProvider {
voluntary_exit_duty_count: 0,
}),
Expand Down Expand Up @@ -715,7 +705,7 @@ mod tests {

let result = validate_ssv_message(
validation_context,
&mut DutyState::new(2),
&mut OperatorState::new(2),
Arc::new(MockDutiesProvider {
voluntary_exit_duty_count: 0,
}),
Expand Down Expand Up @@ -767,7 +757,7 @@ mod tests {

let result = validate_ssv_message(
validation_context,
&mut DutyState::new(2),
&mut OperatorState::new(2),
Arc::new(MockDutiesProvider {
voluntary_exit_duty_count: 0,
}),
Expand Down Expand Up @@ -1600,7 +1590,7 @@ mod tests {
};

// Create a duty state where the operator has already advanced to slot 10
let mut duty_state = DutyState::new(64);
let mut operator_state = OperatorState::new(64);
// Process a dummy consensus message for slot 10 to advance the operator's max_slot
let mut dummy_qbft =
QbftMessageBuilder::new(Role::AggregatorCommittee, QbftMessageType::Prepare).build();
Expand All @@ -1611,13 +1601,20 @@ mod tests {
vec![],
vec![],
);
duty_state.update_for_consensus_message(&dummy_signed_msg, &dummy_qbft, 32);
let msg_slot = Slot::from(dummy_qbft.height);
let estimated_msg_epoch = Epoch::new(msg_slot.as_u64() / 32);
operator_state.update(
&dummy_signed_msg,
&dummy_qbft,
&msg_slot,
&estimated_msg_epoch,
);

// Now validate a consensus message for height 1 (which is "old")
let result = validate_qbft_message_by_duty_logic(
&validation_context,
&qbft_message,
&mut duty_state,
&mut operator_state,
Arc::new(MockDutiesProvider {
voluntary_exit_duty_count: 0,
}),
Expand Down Expand Up @@ -1647,7 +1644,7 @@ mod tests {
let result = validate_qbft_message_by_duty_logic(
&validation_context_proposer,
&qbft_message,
&mut duty_state,
&mut operator_state,
Arc::new(MockDutiesProvider {
voluntary_exit_duty_count: 0,
}),
Expand Down
Loading
Loading