Skip to content

Commit

Permalink
Refined payload pruning (sigp#3587)
Browse files Browse the repository at this point in the history
## Proposed Changes

Improve the payload pruning feature in several ways:

- Payload pruning is now entirely optional. It is enabled by default but can be disabled with `--prune-payloads false`. The previous `--prune-payloads-on-startup` flag from sigp#3565 is removed.
- Initial payload pruning on startup now runs in a background thread. This thread will always load the split state, which is a small fraction of its total work (up to ~300ms) and then backtrack from that state. This pruning process ran in 2m5s on one Prater node with good I/O and 16m on a node with slower I/O.
- To work with the optional payload pruning the database function `try_load_full_block` will now attempt to load execution payloads for finalized slots _if_ pruning is currently disabled. This gives users an opt-out for the extensive traffic between the CL and EL for reconstructing payloads.

## Additional Info

If the `prune-payloads` flag is toggled on and off then the on-startup check may not see any payloads to delete and fail to clean them up. In this case the `lighthouse db prune_payloads` command should be used to force a manual sweep of the database.
  • Loading branch information
michaelsproul authored and Woodpile37 committed Jan 6, 2024
1 parent d1228a1 commit b22cc50
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 39 deletions.
21 changes: 14 additions & 7 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,13 +266,6 @@ where

self.genesis_time = Some(genesis_state.genesis_time());

// Prune finalized execution payloads.
if store.get_config().prune_payloads_on_init {
store
.try_prune_execution_payloads(false)
.map_err(|e| format!("Error pruning execution payloads: {e:?}"))?;
}

self.op_pool = Some(
store
.get_item::<PersistedOperationPool<TEthSpec>>(&OP_POOL_DB_KEY)
Expand Down Expand Up @@ -863,6 +856,20 @@ where
beacon_chain.store_migrator.process_reconstruction();
}

// Prune finalized execution payloads in the background.
if beacon_chain.store.get_config().prune_payloads {
let store = beacon_chain.store.clone();
let log = log.clone();
beacon_chain.task_executor.spawn_blocking(
move || {
if let Err(e) = store.try_prune_execution_payloads(false) {
error!(log, "Error pruning payloads in background"; "error" => ?e);
}
},
"prune_payloads_background",
);
}

Ok(beacon_chain)
}
}
Expand Down
8 changes: 5 additions & 3 deletions beacon_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,9 +516,11 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> {
.default_value("true")
)
.arg(
Arg::with_name("prune-payloads-on-startup")
.long("prune-payloads-on-startup")
.help("Check for execution payloads to prune on start-up.")
Arg::with_name("prune-payloads")
.long("prune-payloads")
.help("Prune execution payloads from Lighthouse's database. This saves space but \
imposes load on the execution client, as payloads need to be \
reconstructed and sent to syncing peers.")
.takes_value(true)
.default_value("true")
)
Expand Down
6 changes: 2 additions & 4 deletions beacon_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,10 +358,8 @@ pub fn get_config<E: EthSpec>(
.map_err(|_| "auto-compact-db takes a boolean".to_string())?;
}

if let Some(prune_payloads_on_init) =
clap_utils::parse_optional(cli_args, "prune-payloads-on-startup")?
{
client_config.store.prune_payloads_on_init = prune_payloads_on_init;
if let Some(prune_payloads) = clap_utils::parse_optional(cli_args, "prune-payloads")? {
client_config.store.prune_payloads = prune_payloads;
}

/*
Expand Down
6 changes: 3 additions & 3 deletions beacon_node/store/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ pub struct StoreConfig {
pub compact_on_init: bool,
/// Whether to compact the database during database pruning.
pub compact_on_prune: bool,
/// Whether to try pruning execution payloads on initialization.
pub prune_payloads_on_init: bool,
/// Whether to prune payloads on initialization and finalization.
pub prune_payloads: bool,
}

/// Variant of `StoreConfig` that gets written to disk. Contains immutable configuration params.
Expand All @@ -45,7 +45,7 @@ impl Default for StoreConfig {
block_cache_size: DEFAULT_BLOCK_CACHE_SIZE,
compact_on_init: false,
compact_on_prune: true,
prune_payloads_on_init: true,
prune_payloads: true,
}
}
}
Expand Down
75 changes: 57 additions & 18 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::{
get_key_for_col, DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStoreOp,
PartialBeaconState, StoreItem, StoreOp,
};
use itertools::process_results;
use leveldb::iterator::LevelDBIterator;
use lru::LruCache;
use parking_lot::{Mutex, RwLock};
Expand Down Expand Up @@ -334,8 +335,10 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
};

// If the block is after the split point then we should have the full execution payload
// stored in the database. Otherwise, just return the blinded block.
// Hold the split lock so that it can't change.
// stored in the database. If it isn't but payload pruning is disabled, try to load it
// on-demand.
//
// Hold the split lock so that it can't change while loading the payload.
let split = self.split.read_recursive();

let block = if blinded_block.message().execution_payload().is_err()
Expand All @@ -348,6 +351,18 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
self.block_cache.lock().put(*block_root, full_block.clone());

DatabaseBlock::Full(full_block)
} else if !self.config.prune_payloads {
// If payload pruning is disabled there's a chance we may have the payload of
// this finalized block. Attempt to load it but don't error in case it's missing.
if let Some(payload) = self.get_execution_payload(block_root)? {
DatabaseBlock::Full(
blinded_block
.try_into_full_block(Some(payload))
.ok_or(Error::AddPayloadLogicError)?,
)
} else {
DatabaseBlock::Blinded(blinded_block)
}
} else {
DatabaseBlock::Blinded(blinded_block)
};
Expand Down Expand Up @@ -388,7 +403,9 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
blinded_block: SignedBeaconBlock<E, BlindedPayload<E>>,
) -> Result<SignedBeaconBlock<E>, Error> {
if blinded_block.message().execution_payload().is_ok() {
let execution_payload = self.get_execution_payload(block_root)?;
let execution_payload = self
.get_execution_payload(block_root)?
.ok_or(HotColdDBError::MissingExecutionPayload(*block_root))?;
blinded_block.try_into_full_block(Some(execution_payload))
} else {
blinded_block.try_into_full_block(None)
Expand Down Expand Up @@ -433,9 +450,8 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
pub fn get_execution_payload(
&self,
block_root: &Hash256,
) -> Result<ExecutionPayload<E>, Error> {
self.get_item(block_root)?
.ok_or_else(|| HotColdDBError::MissingExecutionPayload(*block_root).into())
) -> Result<Option<ExecutionPayload<E>>, Error> {
self.get_item(block_root)
}

/// Check if the execution payload for a block exists on disk.
Expand Down Expand Up @@ -1446,19 +1462,36 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>

// The finalized block may or may not have its execution payload stored, depending on
// whether it was at a skipped slot. However for a fully pruned database its parent
// should *always* have been pruned.
let split_parent_block_root = split_state.get_block_root(split.slot - 1)?;
if !self.execution_payload_exists(split_parent_block_root)? && !force {
// should *always* have been pruned. In case of a long split (no parent found) we
// continue as if the payloads are pruned, as the node probably has other things to worry
// about.
let split_block_root = split_state.get_latest_block_root(split.state_root);

let already_pruned =
process_results(split_state.rev_iter_block_roots(&self.spec), |mut iter| {
iter.find(|(_, block_root)| *block_root != split_block_root)
.map_or(Ok(true), |(_, split_parent_root)| {
self.execution_payload_exists(&split_parent_root)
.map(|exists| !exists)
})
})??;

if already_pruned && !force {
info!(self.log, "Execution payloads are pruned");
return Ok(());
}

// Iterate block roots backwards to the Bellatrix fork or the anchor slot, whichever comes
// first.
let split_block_root = split_state.get_latest_block_root(split.state_root);
warn!(
self.log,
"Pruning finalized payloads";
"info" => "you may notice degraded I/O performance while this runs"
);
let anchor_slot = self.get_anchor_info().map(|info| info.anchor_slot);

let mut ops = vec![];
let mut last_pruned_block_root = None;

for res in std::iter::once(Ok((split_block_root, split.slot)))
.chain(BlockRootsIterator::new(self, &split_state))
Expand All @@ -1468,7 +1501,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
Err(e) => {
warn!(
self.log,
"Stopping backtrack early";
"Stopping payload pruning early";
"error" => ?e,
);
break;
Expand All @@ -1478,25 +1511,28 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
if slot < bellatrix_fork_slot {
info!(
self.log,
"Finished backtrack to Bellatrix fork";
"Payload pruning reached Bellatrix boundary";
);
break;
}

if self.execution_payload_exists(&block_root)? {
if Some(block_root) != last_pruned_block_root
&& self.execution_payload_exists(&block_root)?
{
debug!(
self.log,
"Pruning execution payload";
"slot" => slot,
"block_root" => ?block_root,
);
last_pruned_block_root = Some(block_root);
ops.push(StoreOp::DeleteExecutionPayload(block_root));
}

if Some(slot) == anchor_slot {
info!(
self.log,
"Finished backtrack to anchor state";
"Payload pruning reached anchor state";
"slot" => slot
);
break;
Expand Down Expand Up @@ -1583,10 +1619,13 @@ pub fn migrate_database<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>>(
// Delete the old summary, and the full state if we lie on an epoch boundary.
hot_db_ops.push(StoreOp::DeleteState(state_root, Some(slot)));

// Delete the execution payload. Even if this execution payload is the payload of the
// new finalized block it is OK to delete it, as `try_get_full_block` looks at the split
// slot when determining whether to reconstruct payloads.
hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root));
// Delete the execution payload if payload pruning is enabled. At a skipped slot we may
// delete the payload for the finalized block itself, but that's OK as we only guarantee
// that payloads are present for slots >= the split slot. The payload fetching code is also
// forgiving of missing payloads.
if store.config.prune_payloads {
hot_db_ops.push(StoreOp::DeleteExecutionPayload(block_root));
}
}

// Warning: Critical section. We have to take care not to put any of the two databases in an
Expand Down
8 changes: 4 additions & 4 deletions lighthouse/tests/beacon_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1227,17 +1227,17 @@ fn compact_db_flag() {
.with_config(|config| assert!(config.store.compact_on_init));
}
#[test]
fn prune_payloads_on_startup_default() {
fn prune_payloads_default() {
CommandLineTest::new()
.run_with_zero_port()
.with_config(|config| assert!(config.store.prune_payloads_on_init));
.with_config(|config| assert!(config.store.prune_payloads));
}
#[test]
fn prune_payloads_on_startup_false() {
CommandLineTest::new()
.flag("prune-payloads-on-startup", Some("false"))
.flag("prune-payloads", Some("false"))
.run_with_zero_port()
.with_config(|config| assert!(!config.store.prune_payloads_on_init));
.with_config(|config| assert!(!config.store.prune_payloads));
}
#[test]
fn reconstruct_historic_states_flag() {
Expand Down

0 comments on commit b22cc50

Please sign in to comment.