Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion beacon_node/beacon_chain/src/fork_revert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ pub fn reset_fork_choice_to_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: It
// Replay blocks from finalized checkpoint back to head.
// We do not replay attestations presently, relying on the absence of other blocks
// to guarantee `head_block_root` as the head.
let blocks = store
// TODO(gloas): this code doesn't work anyway, could just delete all of it
let (blocks, _envelopes) = store
.load_blocks_to_replay(finalized_slot + 1, head_state.slot(), head_block_root)
.map_err(|e| format!("Error loading blocks to replay for fork choice: {:?}", e))?;

Expand Down
3 changes: 2 additions & 1 deletion beacon_node/beacon_chain/tests/rewards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -845,13 +845,14 @@ async fn check_all_base_rewards_for_subset(
.state_at_slot(Slot::new(slot - 1), StateSkipConfig::WithoutStateRoots)
.unwrap();

// TODO(gloas): handle payloads?
let mut pre_state = BlockReplayer::<E, BlockReplayError, IntoIter<_, 0>>::new(
parent_state,
&harness.spec,
)
.no_signature_verification()
.minimal_block_root_verification()
.apply_blocks(vec![], Some(block.slot()))
.apply_blocks(vec![], vec![], Some(block.slot()))
.unwrap()
.into_state();

Expand Down
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/tests/store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ async fn block_replayer_hooks() {
.add_attested_blocks_at_slots(state.clone(), state_root, &block_slots, &all_validators)
.await;

let blocks = store
let (blocks, envelopes) = store
.load_blocks_to_replay(Slot::new(0), max_slot, end_block_root.into())
.unwrap();

Expand Down Expand Up @@ -724,7 +724,7 @@ async fn block_replayer_hooks() {
post_block_slots.push(block.slot());
Ok(())
}))
.apply_blocks(blocks, None)
.apply_blocks(blocks, envelopes, None)
.unwrap()
.into_state();

Expand Down
3 changes: 2 additions & 1 deletion beacon_node/http_api/src/attestation_performance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,9 @@ pub fn get_attestation_performance<T: BeaconChainTypes>(
})
.collect::<Result<Vec<_>, _>>()?;

// TODO(gloas): add payloads
replayer = replayer
.apply_blocks(blocks, None)
.apply_blocks(blocks, vec![], None)
.map_err(|e| custom_server_error(format!("{:?}", e)))?;
}

Expand Down
3 changes: 2 additions & 1 deletion beacon_node/http_api/src/block_packing_efficiency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,9 @@ pub fn get_block_packing_efficiency<T: BeaconChainTypes>(
})
.collect::<Result<Vec<_>, _>>()?;

// TODO(gloas): add payloads
replayer = replayer
.apply_blocks(blocks, None)
.apply_blocks(blocks, vec![], None)
.map_err(|e: PackingEfficiencyError| custom_server_error(format!("{:?}", e)))?;
}

Expand Down
7 changes: 4 additions & 3 deletions beacon_node/http_api/src/block_rewards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub fn get_block_rewards<T: BeaconChainTypes>(
.map_err(unhandled_error)?
.ok_or_else(|| custom_bad_request(format!("block at end slot {} unknown", end_slot)))?;

let blocks = chain
let (blocks, envelopes) = chain
.store
.load_blocks_to_replay(start_slot, end_slot, end_block_root)
.map_err(|e| unhandled_error(BeaconChainError::from(e)))?;
Expand Down Expand Up @@ -78,7 +78,7 @@ pub fn get_block_rewards<T: BeaconChainTypes>(
)
.no_signature_verification()
.minimal_block_root_verification()
.apply_blocks(blocks, None)
.apply_blocks(blocks, envelopes, None)
.map_err(unhandled_error)?;

if block_replayer.state_root_miss() {
Expand Down Expand Up @@ -138,11 +138,12 @@ pub fn compute_block_rewards<T: BeaconChainTypes>(
))
})?;

// TODO(gloas): handle payloads?
let block_replayer = BlockReplayer::new(parent_state, &chain.spec)
.no_signature_verification()
.state_root_iter([Ok((parent_block.state_root(), parent_block.slot()))].into_iter())
.minimal_block_root_verification()
.apply_blocks(vec![], Some(block.slot()))
.apply_blocks(vec![], vec![], Some(block.slot()))
.map_err(unhandled_error::<BeaconChainError>)?;

if block_replayer.state_root_miss() {
Expand Down
3 changes: 2 additions & 1 deletion beacon_node/http_api/src/sync_committee_rewards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,12 @@ pub fn get_state_before_applying_block<T: BeaconChainTypes>(
})
.map_err(|e| custom_not_found(format!("Parent state is not available! {:?}", e)))?;

// TODO(gloas): handle payloads?
let replayer = BlockReplayer::new(parent_state, &chain.spec)
.no_signature_verification()
.state_root_iter([Ok((parent_block.state_root(), parent_block.slot()))].into_iter())
.minimal_block_root_verification()
.apply_blocks(vec![], Some(block.slot()))
.apply_blocks(vec![], vec![], Some(block.slot()))
.map_err(unhandled_error::<BeaconChainError>)?;

Ok(replayer.into_state())
Expand Down
106 changes: 91 additions & 15 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ pub enum HotColdDBError {
MissingHotHDiff(Hash256),
MissingHDiff(Slot),
MissingExecutionPayload(Hash256),
MissingExecutionPayloadEnvelope(Hash256),
MissingFullBlockExecutionPayloadPruned(Hash256, Slot),
MissingAnchorInfo,
MissingFrozenBlockSlot(Hash256),
Expand Down Expand Up @@ -2020,7 +2021,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
return Ok(base_state);
}

let blocks = self.load_blocks_to_replay(base_state.slot(), slot, latest_block_root)?;
let (blocks, envelopes) =
self.load_blocks_to_replay(base_state.slot(), slot, latest_block_root)?;
let _t = metrics::start_timer(&metrics::STORE_BEACON_REPLAY_HOT_BLOCKS_TIME);

// If replaying blocks, and `update_cache` is true, also cache the epoch boundary
Expand Down Expand Up @@ -2053,6 +2055,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.replay_blocks(
base_state,
blocks,
envelopes,
slot,
no_state_root_iter(),
Some(Box::new(state_cache_hook)),
Expand Down Expand Up @@ -2356,7 +2359,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
return Ok(base_state);
}

let blocks = self.load_cold_blocks(base_state.slot() + 1, slot)?;
let (blocks, envelopes) = self.load_cold_blocks(base_state.slot() + 1, slot)?;

// Include state root for base state as it is required by block processing to not
// have to hash the state.
Expand All @@ -2365,7 +2368,14 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.forwards_state_roots_iterator_until(base_state.slot(), slot, || {
Err(Error::StateShouldNotBeRequired(slot))
})?;
let state = self.replay_blocks(base_state, blocks, slot, Some(state_root_iter), None)?;
let state = self.replay_blocks(
base_state,
blocks,
envelopes,
slot,
Some(state_root_iter),
None,
)?;
debug!(
target_slot = %slot,
replay_time_ms = metrics::stop_timer_with_duration(replay_timer).as_millis(),
Expand Down Expand Up @@ -2458,40 +2468,70 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}
}

/// Load cold blocks between `start_slot` and `end_slot` inclusive.
/// Load cold blocks and payload envelopes between `start_slot` and `end_slot` inclusive.
#[allow(clippy::type_complexity)]
pub fn load_cold_blocks(
&self,
start_slot: Slot,
end_slot: Slot,
) -> Result<Vec<SignedBlindedBeaconBlock<E>>, Error> {
) -> Result<
(
Vec<SignedBlindedBeaconBlock<E>>,
Vec<SignedExecutionPayloadEnvelope<E>>,
),
Error,
> {
let _t = metrics::start_timer(&metrics::STORE_BEACON_LOAD_COLD_BLOCKS_TIME);
let block_root_iter =
self.forwards_block_roots_iterator_until(start_slot, end_slot, || {
Err(Error::StateShouldNotBeRequired(end_slot))
})?;
process_results(block_root_iter, |iter| {
let blocks = process_results(block_root_iter, |iter| {
iter.map(|(block_root, _slot)| block_root)
.dedup()
.map(|block_root| {
self.get_blinded_block(&block_root)?
.ok_or(Error::MissingBlock(block_root))
})
.collect()
})?
.collect::<Result<Vec<_>, Error>>()
})??;

// If Gloas is not enabled for any slots in the range, just return `blocks`.
if !self.spec.fork_name_at_slot::<E>(start_slot).gloas_enabled()
&& !self.spec.fork_name_at_slot::<E>(end_slot).gloas_enabled()
{
return Ok((blocks, vec![]));
}
let envelopes = self.load_payload_envelopes_for_blocks(&blocks)?;

Ok((blocks, envelopes))
}

/// Load the blocks between `start_slot` and `end_slot` by backtracking from `end_block_hash`.
/// Load the blocks & envelopes between `start_slot` and `end_slot` by backtracking from
/// `end_block_root`.
///
/// Blocks are returned in slot-ascending order, suitable for replaying on a state with slot
/// equal to `start_slot`, to reach a state with slot equal to `end_slot`.
///
/// Payloads are also returned in slot-ascending order, but only payloads forming part of
/// the chain are loaded (payloads for EMPTY slots are omitted). Prior to Gloas, an empty
/// vec of payloads will be returned.
// TODO(gloas): handle last payload
#[allow(clippy::type_complexity)]
pub fn load_blocks_to_replay(
&self,
start_slot: Slot,
end_slot: Slot,
end_block_hash: Hash256,
) -> Result<Vec<SignedBeaconBlock<E, BlindedPayload<E>>>, Error> {
end_block_root: Hash256,
) -> Result<
(
Vec<SignedBlindedBeaconBlock<E>>,
Vec<SignedExecutionPayloadEnvelope<E>>,
),
Error,
> {
let _t = metrics::start_timer(&metrics::STORE_BEACON_LOAD_HOT_BLOCKS_TIME);
let mut blocks = ParentRootBlockIterator::new(self, end_block_hash)
let mut blocks = ParentRootBlockIterator::new(self, end_block_root)
.map(|result| result.map(|(_, block)| block))
// Include the block at the end slot (if any), it needs to be
// replayed in order to construct the canonical state at `end_slot`.
Expand All @@ -2518,7 +2558,42 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
})
.collect::<Result<Vec<_>, _>>()?;
blocks.reverse();
Ok(blocks)

// If Gloas is not enabled for any slots in the range, just return `blocks`.
if !self.spec.fork_name_at_slot::<E>(start_slot).gloas_enabled()
&& !self.spec.fork_name_at_slot::<E>(end_slot).gloas_enabled()
{
return Ok((blocks, vec![]));
}

let envelopes = self.load_payload_envelopes_for_blocks(&blocks)?;

Ok((blocks, envelopes))
}

pub fn load_payload_envelopes_for_blocks(
&self,
blocks: &[SignedBlindedBeaconBlock<E>],
) -> Result<Vec<SignedExecutionPayloadEnvelope<E>>, Error> {
let mut envelopes = vec![];

for (block, next_block) in blocks.iter().tuple_windows() {
if block.fork_name_unchecked().gloas_enabled() {
// Check next block to see if this block's payload is canonical on this chain.
let block_hash = block.payload_bid_block_hash()?;
if !next_block.is_parent_block_full(block_hash) {
// No payload at this slot (empty), nothing to load.
continue;
}
// Using `parent_root` avoids computation.
let block_root = next_block.parent_root();
let envelope = self
.get_payload_envelope(&block_root)?
.ok_or(HotColdDBError::MissingExecutionPayloadEnvelope(block_root))?;
envelopes.push(envelope);
}
}
Ok(envelopes)
}

/// Replay `blocks` on top of `state` until `target_slot` is reached.
Expand All @@ -2528,7 +2603,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn replay_blocks(
&self,
state: BeaconState<E>,
blocks: Vec<SignedBeaconBlock<E, BlindedPayload<E>>>,
blocks: Vec<SignedBlindedBeaconBlock<E>>,
envelopes: Vec<SignedExecutionPayloadEnvelope<E>>,
target_slot: Slot,
state_root_iter: Option<impl Iterator<Item = Result<(Hash256, Slot), Error>>>,
pre_slot_hook: Option<PreSlotHook<E, Error>>,
Expand All @@ -2549,7 +2625,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
}

block_replayer
.apply_blocks(blocks, Some(target_slot))
.apply_blocks(blocks, envelopes, Some(target_slot))
.map(|block_replayer| {
if have_state_root_iterator && block_replayer.state_root_miss() {
warn!(
Expand Down
1 change: 1 addition & 0 deletions beacon_node/store/src/reconstruct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ where

state.build_caches(&self.spec)?;

// TODO(gloas): handle payload envelope replay
process_results(block_root_iter, |iter| -> Result<(), Error> {
let mut io_batch = vec![];

Expand Down
Loading