Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Prune finalized execution payloads #3565

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,13 @@ where

self.genesis_time = Some(genesis_state.genesis_time());

// Prune finalized execution payloads.
if store.get_config().prune_payloads_on_init {
store
.try_prune_execution_payloads(false)
.map_err(|e| format!("Error pruning execution payloads: {e:?}"))?;
}

self.op_pool = Some(
store
.get_item::<PersistedOperationPool<TEthSpec>>(&OP_POOL_DB_KEY)
Expand Down
33 changes: 16 additions & 17 deletions beacon_node/beacon_chain/tests/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,27 @@ async fn get_chain_segment() -> Vec<BeaconSnapshot<E>> {
)
.await;

harness
let mut segment = Vec::with_capacity(CHAIN_SEGMENT_LENGTH);
for snapshot in harness
.chain
.chain_dump()
.expect("should dump chain")
.into_iter()
.map(|snapshot| {
let full_block = harness
.chain
.store
.make_full_block(
&snapshot.beacon_block_root,
snapshot.beacon_block.as_ref().clone(),
)
.unwrap();
BeaconSnapshot {
beacon_block_root: snapshot.beacon_block_root,
beacon_block: Arc::new(full_block),
beacon_state: snapshot.beacon_state,
}
})
.skip(1)
.collect()
{
let full_block = harness
.chain
.get_block(&snapshot.beacon_block_root)
.await
.unwrap()
.unwrap();
segment.push(BeaconSnapshot {
beacon_block_root: snapshot.beacon_block_root,
beacon_block: Arc::new(full_block),
beacon_state: snapshot.beacon_state,
});
}
segment
}

fn get_harness(validator_count: usize) -> BeaconChainHarness<EphemeralHarnessType<E>> {
Expand Down
29 changes: 23 additions & 6 deletions beacon_node/beacon_chain/tests/store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2114,25 +2114,26 @@ async fn weak_subjectivity_sync() {
assert_eq!(new_blocks[0].beacon_block.slot(), wss_slot + 1);

for snapshot in new_blocks {
let block = &snapshot.beacon_block;
let full_block = harness
.chain
.store
.make_full_block(&snapshot.beacon_block_root, block.as_ref().clone())
.get_block(&snapshot.beacon_block_root)
.await
.unwrap()
.unwrap();
let slot = full_block.slot();
let state_root = full_block.state_root();

beacon_chain.slot_clock.set_slot(block.slot().as_u64());
beacon_chain.slot_clock.set_slot(slot.as_u64());
beacon_chain
.process_block(Arc::new(full_block), CountUnrealized::True)
.await
.unwrap();
beacon_chain.recompute_head_at_current_slot().await;

// Check that the new block's state can be loaded correctly.
let state_root = block.state_root();
let mut state = beacon_chain
.store
.get_state(&state_root, Some(block.slot()))
.get_state(&state_root, Some(slot))
.unwrap()
.unwrap();
assert_eq!(state.update_tree_hash_cache().unwrap(), state_root);
Expand Down Expand Up @@ -2583,6 +2584,7 @@ fn check_split_slot(harness: &TestHarness, store: Arc<HotColdDB<E, LevelDB<E>, L
/// Check that all the states in a chain dump have the correct tree hash.
fn check_chain_dump(harness: &TestHarness, expected_len: u64) {
let chain_dump = harness.chain.chain_dump().unwrap();
let split_slot = harness.chain.store.get_split_slot();

assert_eq!(chain_dump.len() as u64, expected_len);

Expand All @@ -2606,6 +2608,21 @@ fn check_chain_dump(harness: &TestHarness, expected_len: u64) {
.slot(),
checkpoint.beacon_state.slot()
);

// Check presence of execution payload on disk.
if harness.chain.spec.bellatrix_fork_epoch.is_some() {
assert_eq!(
harness
.chain
.store
.execution_payload_exists(&checkpoint.beacon_block_root)
.unwrap(),
checkpoint.beacon_block.slot() >= split_slot,
"incorrect payload storage for block at slot {}: {:?}",
checkpoint.beacon_block.slot(),
checkpoint.beacon_block_root,
);
}
}

// Check the forwards block roots iterator against the chain dump
Expand Down
7 changes: 7 additions & 0 deletions beacon_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,13 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.takes_value(true)
.default_value("true")
)
.arg(
Arg::with_name("prune-payloads-on-startup")
.long("prune-payloads-on-startup")
.help("Check for execution payloads to prune on start-up.")
.takes_value(true)
.default_value("true")
)

/*
* Misc.
Expand Down
6 changes: 6 additions & 0 deletions beacon_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,12 @@ pub fn get_config<E: EthSpec>(
.map_err(|_| "auto-compact-db takes a boolean".to_string())?;
}

if let Some(prune_payloads_on_init) =
clap_utils::parse_optional(cli_args, "prune-payloads-on-startup")?
{
client_config.store.prune_payloads_on_init = prune_payloads_on_init;
}

/*
* Zero-ports
*
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub struct StoreConfig {
pub compact_on_init: bool,
/// Whether to compact the database during database pruning.
pub compact_on_prune: bool,
/// Whether to try pruning execution payloads on initialization.
pub prune_payloads_on_init: bool,
}

/// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params.
Expand All @@ -43,6 +45,7 @@ impl Default for StoreConfig {
block_cache_size: DEFAULT_BLOCK_CACHE_SIZE,
compact_on_init: false,
compact_on_prune: true,
prune_payloads_on_init: true,
}
}
}
Expand Down
110 changes: 104 additions & 6 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::config::{
};
use crate::forwards_iter::{HybridForwardsBlockRootsIterator, HybridForwardsStateRootsIterator};
use crate::impls::beacon_state::{get_full_state, store_full_state};
use crate::iter::{ParentRootBlockIterator, StateRootsIterator};
use crate::iter::{BlockRootsIterator, ParentRootBlockIterator, RootsIterator};
use crate::leveldb_store::BytesKey;
use crate::leveldb_store::LevelDB;
use crate::memory_store::MemoryStore;
Expand Down Expand Up @@ -438,6 +438,12 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
.ok_or_else(|| HotColdDBError::MissingExecutionPayload(*block_root).into())
}

/// Check if the execution payload for a block exists on disk.
pub fn execution_payload_exists(&self, block_root: &Hash256) -> Result<bool, Error> {
self.get_item::<ExecutionPayload<E>>(block_root)
.map(|payload| payload.is_some())
}

/// Determine whether a block exists in the database.
pub fn block_exists(&self, block_root: &Hash256) -> Result<bool, Error> {
self.hot_db
Expand Down Expand Up @@ -1418,6 +1424,93 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
&CompactionTimestamp(compaction_timestamp.as_secs()),
)
}

/// Try to prune all execution payloads, returning early if there is no need to prune.
pub fn try_prune_execution_payloads(&self, force: bool) -> Result<(), Error> {
let split = self.get_split_info();

if split.slot == 0 {
return Ok(());
}

let bellatrix_fork_slot = if let Some(epoch) = self.spec.bellatrix_fork_epoch {
epoch.start_slot(E::slots_per_epoch())
} else {
return Ok(());
};

// Load the split state so we can backtrack to find execution payloads.
let split_state = self.get_state(&split.state_root, Some(split.slot))?.ok_or(
HotColdDBError::MissingSplitState(split.state_root, split.slot),
)?;

// The finalized block may or may not have its execution payload stored, depending on
// whether it was at a skipped slot. However for a fully pruned database its parent
// should *always* have been pruned.
let split_parent_block_root = split_state.get_block_root(split.slot - 1)?;
if !self.execution_payload_exists(split_parent_block_root)? && !force {
info!(self.log, "Execution payloads are pruned");
return Ok(());
}

// Iterate block roots backwards to the Bellatrix fork or the anchor slot, whichever comes
// first.
let split_block_root = split_state.get_latest_block_root(split.state_root);
let anchor_slot = self.get_anchor_info().map(|info| info.anchor_slot);

let mut ops = vec![];

for res in std::iter::once(Ok((split_block_root, split.slot)))
.chain(BlockRootsIterator::new(self, &split_state))
{
let (block_root, slot) = match res {
Ok(tuple) => tuple,
Err(e) => {
warn!(
self.log,
"Stopping backtrack early";
"error" => ?e,
);
break;
}
};

if slot < bellatrix_fork_slot {
info!(
self.log,
"Finished backtrack to Bellatrix fork";
);
break;
}

if self.execution_payload_exists(&block_root)? {
debug!(
self.log,
"Pruning execution payload";
"slot" => slot,
"block_root" => ?block_root,
);
ops.push(StoreOp::DeleteExecutionPayload(block_root));
}

if Some(slot) == anchor_slot {
info!(
self.log,
"Finished backtrack to anchor state";
"slot" => slot
);
break;
}
}
let payloads_pruned = ops.len();
self.do_atomically(ops)?;
info!(
self.log,
"Execution payload pruning complete";
"payloads_pruned" => payloads_pruned,
);
Ok(())
}
}

/// Advance the split point of the store, moving new finalized states to the freezer.
Expand Down Expand Up @@ -1457,16 +1550,16 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
let mut hot_db_ops: Vec<StoreOp<E>> = Vec::new();

// 1. Copy all of the states between the head and the split slot, from the hot DB
// to the cold DB.
let state_root_iter = StateRootsIterator::new(&store, frozen_head);
for maybe_pair in state_root_iter.take_while(|result| match result {
Ok((_, slot)) => {
// to the cold DB. Delete the execution payloads of these now-finalized blocks.
let state_root_iter = RootsIterator::new(&store, frozen_head);
for maybe_tuple in state_root_iter.take_while(|result| match result {
Ok((_, _, slot)) => {
slot >= &current_split_slot
&& anchor_slot.map_or(true, |anchor_slot| slot >= &anchor_slot)
}
Err(_) => true,
}) {
let (state_root, slot) = maybe_pair?;
let (block_root, state_root, slot) = maybe_tuple?;

let mut cold_db_ops: Vec<KeyValueStoreOp> = Vec::new();

Expand All @@ -1489,6 +1582,11 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(

// Delete the old summary, and the full state if we lie on an epoch boundary.
hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot)));

// Delete the execution payload. Even if this execution payload is the payload of the
// new finalized block it is OK to delete it, as `try_get_full_block` looks at the split
// slot when determining whether to reconstruct payloads.
hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root));
}

// Warning: Critical section. We have to take care not to put any of the two databases in an
Expand Down
32 changes: 32 additions & 0 deletions database_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ pub fn inspect_cli_app<'a, 'b>() -> App<'a, 'b> {
)
}

pub fn prune_payloads_app<'a, 'b>() -> App<'a, 'b> {
App::new("prune_payloads")
.setting(clap::AppSettings::ColoredHelp)
.about("Prune finalized execution payloads")
}

pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
App::new(CMD)
.visible_aliases(&["db"])
Expand All @@ -85,6 +91,7 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.subcommand(migrate_cli_app())
.subcommand(version_cli_app())
.subcommand(inspect_cli_app())
.subcommand(prune_payloads_app())
}

fn parse_client_config<E: EthSpec>(
Expand Down Expand Up @@ -257,6 +264,30 @@ pub fn migrate_db<E: EthSpec>(
)
}

pub fn prune_payloads<E: EthSpec>(
client_config: ClientConfig,
runtime_context: &RuntimeContext<E>,
log: Logger,
) -> Result<(), Error> {
let spec = &runtime_context.eth2_config.spec;
let hot_path = client_config.get_db_path();
let cold_path = client_config.get_freezer_db_path();

let db = HotColdDB::<E, LevelDB<E>, LevelDB<E>>::open(
&hot_path,
&cold_path,
|_, _, _| Ok(()),
client_config.store,
spec.clone(),
log,
)?;

// If we're trigging a prune manually then ignore the check on the split's parent that bails
// out early.
let force = true;
db.try_prune_execution_payloads(force)
}

/// Run the database manager, returning an error string if the operation did not succeed.
pub fn run<T: EthSpec>(cli_args: &ArgMatches<'_>, mut env: Environment<T>) -> Result<(), String> {
let client_config = parse_client_config(cli_args, &env)?;
Expand All @@ -273,6 +304,7 @@ pub fn run<T: EthSpec>(cli_args: &ArgMatches<'_>, mut env: Environment<T>) -> Re
let inspect_config = parse_inspect_config(cli_args)?;
inspect_db(inspect_config, client_config, &context, log)
}
("prune_payloads", Some(_)) => prune_payloads(client_config, &context, log),
_ => {
return Err("Unknown subcommand, for help `lighthouse database_manager --help`".into())
}
Expand Down
13 changes: 13 additions & 0 deletions lighthouse/tests/beacon_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1227,6 +1227,19 @@ fn compact_db_flag() {
.with_config(|config| assert!(config.store.compact_on_init));
}
#[test]
fn prune_payloads_on_startup_default() {
CommandLineTest::new()
.run_with_zero_port()
.with_config(|config| assert!(config.store.prune_payloads_on_init));
}
#[test]
fn prune_payloads_on_startup_false() {
CommandLineTest::new()
.flag("prune-payloads-on-startup", Some("false"))
.run_with_zero_port()
.with_config(|config| assert!(!config.store.prune_payloads_on_init));
}
#[test]
fn reconstruct_historic_states_flag() {
CommandLineTest::new()
.flag("reconstruct-historic-states", None)
Expand Down