From 1278ed0287af426f4682e6440731c8b9d8d3f699 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Fri, 24 Jan 2025 12:06:13 +0300 Subject: [PATCH 1/5] Fix redb compilation errors --- .github/workflows/test-suite.yml | 2 +- .../beacon_chain/src/light_client_server_cache.rs | 2 +- .../src/schema_change/migration_schema_v21.rs | 4 ++-- .../src/schema_change/migration_schema_v22.rs | 2 +- beacon_node/store/src/database/leveldb_impl.rs | 8 ++++---- beacon_node/store/src/database/redb_impl.rs | 6 ++---- beacon_node/store/src/forwards_iter.rs | 2 ++ beacon_node/store/src/garbage_collection.rs | 2 +- beacon_node/store/src/hot_cold_store.rs | 14 +++++++------- beacon_node/store/src/lib.rs | 5 +++-- beacon_node/store/src/memory_store.rs | 12 ++++++++---- database_manager/src/lib.rs | 1 + 12 files changed, 33 insertions(+), 27 deletions(-) diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index 0ee9dbb622f..87dd03dd426 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -27,7 +27,7 @@ env: # Disable incremental compilation CARGO_INCREMENTAL: 0 # Enable portable to prevent issues with caching `blst` for the wrong CPU type - TEST_FEATURES: portable + TEST_FEATURES: portable,beacon-node-leveldb,beacon-node-redb jobs: check-labels: runs-on: ubuntu-latest diff --git a/beacon_node/beacon_chain/src/light_client_server_cache.rs b/beacon_node/beacon_chain/src/light_client_server_cache.rs index 78442d8df08..23ef0b00c91 100644 --- a/beacon_node/beacon_chain/src/light_client_server_cache.rs +++ b/beacon_node/beacon_chain/src/light_client_server_cache.rs @@ -277,7 +277,7 @@ impl LightClientServerCache { let mut light_client_updates = vec![]; for res in store .hot_db - .iter_column_from::>(column, &start_period.to_le_bytes()) + .iter_column_from::>(column, &start_period.to_le_bytes())? { let (sync_committee_bytes, light_client_update_bytes) = res?; let sync_committee_period = u64::from_ssz_bytes(&sync_committee_bytes) diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v21.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v21.rs index f02f5ee6f3a..5ca482e5e8b 100644 --- a/beacon_node/beacon_chain/src/schema_change/migration_schema_v21.rs +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v21.rs @@ -19,7 +19,7 @@ pub fn upgrade_to_v21( // Iterate through all pubkeys and decompress them. for (i, res) in db .hot_db - .iter_column::(DBColumn::PubkeyCache) + .iter_column::(DBColumn::PubkeyCache)? .enumerate() { let (key, value) = res?; @@ -51,7 +51,7 @@ pub fn downgrade_from_v21( // Iterate through all pubkeys and recompress them. for (i, res) in db .hot_db - .iter_column::(DBColumn::PubkeyCache) + .iter_column::(DBColumn::PubkeyCache)? .enumerate() { let (key, value) = res?; diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v22.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v22.rs index 982c3ded467..6450edfd262 100644 --- a/beacon_node/beacon_chain/src/schema_change/migration_schema_v22.rs +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v22.rs @@ -133,7 +133,7 @@ pub fn delete_old_schema_freezer_data( ]; for column in columns { - for res in db.cold_db.iter_column_keys::>(column) { + for res in db.cold_db.iter_column_keys::>(column)? { let key = res?; cold_ops.push(KeyValueStoreOp::DeleteKey(column, key)); } diff --git a/beacon_node/store/src/database/leveldb_impl.rs b/beacon_node/store/src/database/leveldb_impl.rs index 3d8bbe14737..17b9aedb268 100644 --- a/beacon_node/store/src/database/leveldb_impl.rs +++ b/beacon_node/store/src/database/leveldb_impl.rs @@ -222,7 +222,7 @@ impl LevelDB { let iter = self.db.iter(self.read_options()); iter.seek(&start_key); - Box::new( + Ok(Box::new( iter.take_while(move |(key, _)| key.matches_column(column)) .map(move |(bytes_key, value)| { metrics::inc_counter_vec(&metrics::DISK_DB_READ_COUNT, &[column.into()]); @@ -238,7 +238,7 @@ impl LevelDB { })?; Ok((K::from_bytes(key)?, value)) }), - ) + )) } pub fn iter_column_keys_from(&self, column: DBColumn, from: &[u8]) -> ColumnKeyIter { @@ -247,7 +247,7 @@ impl LevelDB { let iter = self.db.keys_iter(self.read_options()); iter.seek(&start_key); - Box::new( + Ok(Box::new( iter.take_while(move |key| key.matches_column(column)) .map(move |bytes_key| { metrics::inc_counter_vec(&metrics::DISK_DB_KEY_READ_COUNT, &[column.into()]); @@ -259,7 +259,7 @@ impl LevelDB { let key = &bytes_key.key[column.as_bytes().len()..]; K::from_bytes(key) }), - ) + )) } /// Iterate through all keys and values in a particular column. diff --git a/beacon_node/store/src/database/redb_impl.rs b/beacon_node/store/src/database/redb_impl.rs index 6a776da7b17..4cb91a43f15 100644 --- a/beacon_node/store/src/database/redb_impl.rs +++ b/beacon_node/store/src/database/redb_impl.rs @@ -231,7 +231,7 @@ impl Redb { }) }; - Box::new(iter) + Ok(Box::new(iter)) } /// Iterate through all keys and values in a particular column. @@ -243,8 +243,6 @@ impl Redb { let table_definition: TableDefinition<'_, &[u8], &[u8]> = TableDefinition::new(column.into()); - let prefix = from.to_vec(); - let iter = { let open_db = self.db.read(); let read_txn = open_db.begin_read()?; @@ -272,7 +270,7 @@ impl Redb { } pub fn iter_column(&self, column: DBColumn) -> ColumnIter { - self.iter_column_from(column, &vec![0; column.key_size()], |_, _| true) + self.iter_column_from(column, &vec![0; column.key_size()]) } pub fn delete_batch(&self, col: DBColumn, ops: HashSet<&[u8]>) -> Result<(), Error> { diff --git a/beacon_node/store/src/forwards_iter.rs b/beacon_node/store/src/forwards_iter.rs index 5300a74c060..cc70499ffbd 100644 --- a/beacon_node/store/src/forwards_iter.rs +++ b/beacon_node/store/src/forwards_iter.rs @@ -158,6 +158,8 @@ impl, Cold: ItemStore> Iterator return None; } self.inner + .as_mut() + .ok()? .next()? .and_then(|(slot_bytes, root_bytes)| { let slot = slot_bytes diff --git a/beacon_node/store/src/garbage_collection.rs b/beacon_node/store/src/garbage_collection.rs index 06393f2d219..38c1f7752bc 100644 --- a/beacon_node/store/src/garbage_collection.rs +++ b/beacon_node/store/src/garbage_collection.rs @@ -18,7 +18,7 @@ where /// Delete the temporary states that were leftover by failed block imports. pub fn delete_temp_states(&self) -> Result<(), Error> { let mut ops = vec![]; - self.iter_temporary_state_roots().for_each(|state_root| { + self.iter_temporary_state_roots()?.for_each(|state_root| { if let Ok(state_root) = state_root { ops.push(state_root); } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 75251cb5fb4..a64438dfd7b 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -14,8 +14,8 @@ use crate::metadata::{ }; use crate::state_cache::{PutStateOutcome, StateCache}; use crate::{ - get_data_column_key, metrics, parse_data_column_key, BlobSidecarListFromRoot, DBColumn, - DatabaseBlock, Error, ItemStore, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp, + get_data_column_key, metrics, parse_data_column_key, BlobSidecarListFromRoot, ColumnKeyIter, + DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp, }; use itertools::{process_results, Itertools}; use lru::LruCache; @@ -405,7 +405,7 @@ impl HotColdDB, BeaconNodeBackend> { } /// Return an iterator over the state roots of all temporary states. - pub fn iter_temporary_state_roots(&self) -> impl Iterator> + '_ { + pub fn iter_temporary_state_roots(&self) -> ColumnKeyIter { self.hot_db .iter_column_keys::(DBColumn::BeaconStateTemporary) } @@ -778,7 +778,7 @@ impl, Cold: ItemStore> HotColdDB let mut light_client_updates = vec![]; for res in self .hot_db - .iter_column_from::>(column, &start_period.to_le_bytes()) + .iter_column_from::>(column, &start_period.to_le_bytes())? { let (sync_committee_bytes, light_client_update_bytes) = res?; let sync_committee_period = u64::from_ssz_bytes(&sync_committee_bytes)?; @@ -2077,7 +2077,7 @@ impl, Cold: ItemStore> HotColdDB /// Fetch all keys in the data_column column with prefix `block_root` pub fn get_data_column_keys(&self, block_root: Hash256) -> Result, Error> { self.blobs_db - .iter_column_from::>(DBColumn::BeaconDataColumn, block_root.as_slice()) + .iter_column_from::>(DBColumn::BeaconDataColumn, block_root.as_slice())? .take_while(|res| { let Ok((key, _)) = res else { return false }; @@ -2932,7 +2932,7 @@ impl, Cold: ItemStore> HotColdDB columns.extend(previous_schema_columns); for column in columns { - for res in self.cold_db.iter_column_keys::>(column) { + for res in self.cold_db.iter_column_keys::>(column)? { let key = res?; cold_ops.push(KeyValueStoreOp::DeleteKey(column, key)); } @@ -2976,7 +2976,7 @@ impl, Cold: ItemStore> HotColdDB let mut state_delete_batch = vec![]; for res in self .hot_db - .iter_column::(DBColumn::BeaconStateSummary) + .iter_column::(DBColumn::BeaconStateSummary)? { let (state_root, summary_bytes) = res?; let summary = HotStateSummary::from_ssz_bytes(&summary_bytes)?; diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 0cfc42ab156..57dc846af7d 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -47,8 +47,9 @@ pub use types::*; const DATA_COLUMN_DB_KEY_SIZE: usize = 32 + 8; -pub type ColumnIter<'a, K> = Box), Error>> + 'a>; -pub type ColumnKeyIter<'a, K> = Box> + 'a>; +pub type ColumnIter<'a, K> = + Result), Error>> + 'a>, Error>; +pub type ColumnKeyIter<'a, K> = Result> + 'a>, Error>; pub type RawEntryIter<'a> = Result, Vec), Error>> + 'a>, Error>; diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 6070a2d3f0c..16036eebe2f 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -94,17 +94,19 @@ impl KeyValueStore for MemoryStore { .take_while(|(k, _)| k.remove_column_variable(column).is_some()) .filter_map(|(k, _)| k.remove_column_variable(column).map(|k| k.to_vec())) .collect::>(); - Box::new(keys.into_iter().filter_map(move |key| { + Ok(Box::new(keys.into_iter().filter_map(move |key| { self.get_bytes(column, &key).transpose().map(|res| { let k = K::from_bytes(&key)?; let v = res?; Ok((k, v)) }) - })) + }))) } fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter { - Box::new(self.iter_column(column).map(|res| res.map(|(k, _)| k))) + Ok(Box::new( + self.iter_column(column)?.map(|res| res.map(|(k, _)| k)), + )) } fn begin_rw_transaction(&self) -> MutexGuard<()> { @@ -127,7 +129,9 @@ impl KeyValueStore for MemoryStore { .take_while(|(k, _)| k.remove_column_variable(column).is_some()) .filter_map(|(k, _)| k.remove_column_variable(column).map(|k| k.to_vec())) .collect::>(); - Box::new(keys.into_iter().map(move |key| K::from_bytes(&key))) + Ok(Box::new( + keys.into_iter().map(move |key| K::from_bytes(&key)), + )) } fn delete_batch(&self, col: DBColumn, ops: HashSet<&[u8]>) -> Result<(), DBError> { diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index bed90df9df0..9d3ebc23676 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -172,6 +172,7 @@ pub fn inspect_db( for res in sub_db .iter_column::>(inspect_config.column) + .map_err(|e| format!("Unable to iterate column: {:?}", e))? .skip(skip) .take(limit) { From 6c6717a21f54c553be349cddd36ff9f64ed93e33 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Fri, 24 Jan 2025 12:31:53 +0300 Subject: [PATCH 2/5] fix tests --- Makefile | 2 +- beacon_node/beacon_chain/tests/store_tests.rs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Makefile b/Makefile index e8b44cb7807..0f08afd1687 100644 --- a/Makefile +++ b/Makefile @@ -222,7 +222,7 @@ lint-fix: # Also run the lints on the optimized-only tests lint-full: - RUSTFLAGS="-C debug-assertions=no $(RUSTFLAGS)" $(MAKE) lint + TEST_FEATURES="beacon-node-leveldb,beacon-node-redb,${TEST_FEATURES}" RUSTFLAGS="-C debug-assertions=no $(RUSTFLAGS)" $(MAKE) lint # Runs the makefile in the `ef_tests` repo. # diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index d1a38b1cdec..9fae1b1c1a6 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -2155,7 +2155,7 @@ async fn garbage_collect_temp_states_from_failed_block_on_startup() { .unwrap_err(); assert_eq!( - store.iter_temporary_state_roots().count(), + store.iter_temporary_state_roots().unwrap().count(), block_slot.as_usize() - 1 ); store @@ -2174,7 +2174,7 @@ async fn garbage_collect_temp_states_from_failed_block_on_startup() { // On startup, the store should garbage collect all the temporary states. let store = get_store(&db_path); - assert_eq!(store.iter_temporary_state_roots().count(), 0); + assert_eq!(store.iter_temporary_state_roots().unwrap().count(), 0); } #[tokio::test] @@ -2210,7 +2210,7 @@ async fn garbage_collect_temp_states_from_failed_block_on_finalization() { .unwrap_err(); assert_eq!( - store.iter_temporary_state_roots().count(), + store.iter_temporary_state_roots().unwrap().count(), block_slot.as_usize() - 1 ); @@ -2229,7 +2229,7 @@ async fn garbage_collect_temp_states_from_failed_block_on_finalization() { assert_ne!(store.get_split_slot(), 0); // Check that temporary states have been pruned. - assert_eq!(store.iter_temporary_state_roots().count(), 0); + assert_eq!(store.iter_temporary_state_roots().unwrap().count(), 0); } #[tokio::test] From d3c57ab718d6e36a348177effdd44fac26754556 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Fri, 24 Jan 2025 12:34:03 +0300 Subject: [PATCH 3/5] fix tests --- .github/workflows/test-suite.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test-suite.yml b/.github/workflows/test-suite.yml index 87dd03dd426..0ee9dbb622f 100644 --- a/.github/workflows/test-suite.yml +++ b/.github/workflows/test-suite.yml @@ -27,7 +27,7 @@ env: # Disable incremental compilation CARGO_INCREMENTAL: 0 # Enable portable to prevent issues with caching `blst` for the wrong CPU type - TEST_FEATURES: portable,beacon-node-leveldb,beacon-node-redb + TEST_FEATURES: portable jobs: check-labels: runs-on: ubuntu-latest From 9d4ae5c190c9e6ec7885a056073af06d063e87f0 Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Tue, 28 Jan 2025 19:04:20 +0300 Subject: [PATCH 4/5] dont wrap Result around column iter type --- .../src/light_client_server_cache.rs | 2 +- .../src/schema_change/migration_schema_v21.rs | 4 +-- .../src/schema_change/migration_schema_v22.rs | 2 +- beacon_node/beacon_chain/tests/store_tests.rs | 8 ++--- .../store/src/database/leveldb_impl.rs | 8 ++--- beacon_node/store/src/database/redb_impl.rs | 29 ++++++++++++------- beacon_node/store/src/forwards_iter.rs | 1 - beacon_node/store/src/garbage_collection.rs | 2 +- beacon_node/store/src/hot_cold_store.rs | 8 ++--- beacon_node/store/src/lib.rs | 5 ++-- beacon_node/store/src/memory_store.rs | 12 +++----- database_manager/src/lib.rs | 1 - lighthouse/Cargo.toml | 2 +- 13 files changed, 42 insertions(+), 42 deletions(-) diff --git a/beacon_node/beacon_chain/src/light_client_server_cache.rs b/beacon_node/beacon_chain/src/light_client_server_cache.rs index 23ef0b00c91..78442d8df08 100644 --- a/beacon_node/beacon_chain/src/light_client_server_cache.rs +++ b/beacon_node/beacon_chain/src/light_client_server_cache.rs @@ -277,7 +277,7 @@ impl LightClientServerCache { let mut light_client_updates = vec![]; for res in store .hot_db - .iter_column_from::>(column, &start_period.to_le_bytes())? + .iter_column_from::>(column, &start_period.to_le_bytes()) { let (sync_committee_bytes, light_client_update_bytes) = res?; let sync_committee_period = u64::from_ssz_bytes(&sync_committee_bytes) diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v21.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v21.rs index 5ca482e5e8b..f02f5ee6f3a 100644 --- a/beacon_node/beacon_chain/src/schema_change/migration_schema_v21.rs +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v21.rs @@ -19,7 +19,7 @@ pub fn upgrade_to_v21( // Iterate through all pubkeys and decompress them. for (i, res) in db .hot_db - .iter_column::(DBColumn::PubkeyCache)? + .iter_column::(DBColumn::PubkeyCache) .enumerate() { let (key, value) = res?; @@ -51,7 +51,7 @@ pub fn downgrade_from_v21( // Iterate through all pubkeys and recompress them. for (i, res) in db .hot_db - .iter_column::(DBColumn::PubkeyCache)? + .iter_column::(DBColumn::PubkeyCache) .enumerate() { let (key, value) = res?; diff --git a/beacon_node/beacon_chain/src/schema_change/migration_schema_v22.rs b/beacon_node/beacon_chain/src/schema_change/migration_schema_v22.rs index 6450edfd262..982c3ded467 100644 --- a/beacon_node/beacon_chain/src/schema_change/migration_schema_v22.rs +++ b/beacon_node/beacon_chain/src/schema_change/migration_schema_v22.rs @@ -133,7 +133,7 @@ pub fn delete_old_schema_freezer_data( ]; for column in columns { - for res in db.cold_db.iter_column_keys::>(column)? { + for res in db.cold_db.iter_column_keys::>(column) { let key = res?; cold_ops.push(KeyValueStoreOp::DeleteKey(column, key)); } diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 9fae1b1c1a6..d1a38b1cdec 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -2155,7 +2155,7 @@ async fn garbage_collect_temp_states_from_failed_block_on_startup() { .unwrap_err(); assert_eq!( - store.iter_temporary_state_roots().unwrap().count(), + store.iter_temporary_state_roots().count(), block_slot.as_usize() - 1 ); store @@ -2174,7 +2174,7 @@ async fn garbage_collect_temp_states_from_failed_block_on_startup() { // On startup, the store should garbage collect all the temporary states. let store = get_store(&db_path); - assert_eq!(store.iter_temporary_state_roots().unwrap().count(), 0); + assert_eq!(store.iter_temporary_state_roots().count(), 0); } #[tokio::test] @@ -2210,7 +2210,7 @@ async fn garbage_collect_temp_states_from_failed_block_on_finalization() { .unwrap_err(); assert_eq!( - store.iter_temporary_state_roots().unwrap().count(), + store.iter_temporary_state_roots().count(), block_slot.as_usize() - 1 ); @@ -2229,7 +2229,7 @@ async fn garbage_collect_temp_states_from_failed_block_on_finalization() { assert_ne!(store.get_split_slot(), 0); // Check that temporary states have been pruned. - assert_eq!(store.iter_temporary_state_roots().unwrap().count(), 0); + assert_eq!(store.iter_temporary_state_roots().count(), 0); } #[tokio::test] diff --git a/beacon_node/store/src/database/leveldb_impl.rs b/beacon_node/store/src/database/leveldb_impl.rs index 17b9aedb268..3d8bbe14737 100644 --- a/beacon_node/store/src/database/leveldb_impl.rs +++ b/beacon_node/store/src/database/leveldb_impl.rs @@ -222,7 +222,7 @@ impl LevelDB { let iter = self.db.iter(self.read_options()); iter.seek(&start_key); - Ok(Box::new( + Box::new( iter.take_while(move |(key, _)| key.matches_column(column)) .map(move |(bytes_key, value)| { metrics::inc_counter_vec(&metrics::DISK_DB_READ_COUNT, &[column.into()]); @@ -238,7 +238,7 @@ impl LevelDB { })?; Ok((K::from_bytes(key)?, value)) }), - )) + ) } pub fn iter_column_keys_from(&self, column: DBColumn, from: &[u8]) -> ColumnKeyIter { @@ -247,7 +247,7 @@ impl LevelDB { let iter = self.db.keys_iter(self.read_options()); iter.seek(&start_key); - Ok(Box::new( + Box::new( iter.take_while(move |key| key.matches_column(column)) .map(move |bytes_key| { metrics::inc_counter_vec(&metrics::DISK_DB_KEY_READ_COUNT, &[column.into()]); @@ -259,7 +259,7 @@ impl LevelDB { let key = &bytes_key.key[column.as_bytes().len()..]; K::from_bytes(key) }), - )) + ) } /// Iterate through all keys and values in a particular column. diff --git a/beacon_node/store/src/database/redb_impl.rs b/beacon_node/store/src/database/redb_impl.rs index 4cb91a43f15..cbe575d184e 100644 --- a/beacon_node/store/src/database/redb_impl.rs +++ b/beacon_node/store/src/database/redb_impl.rs @@ -215,11 +215,12 @@ impl Redb { let table_definition: TableDefinition<'_, &[u8], &[u8]> = TableDefinition::new(column.into()); - let iter = { + let result = (|| { let open_db = self.db.read(); let read_txn = open_db.begin_read()?; let table = read_txn.open_table(table_definition)?; - table.range(from..)?.map(move |res| { + let range = table.range(from..)?; + Ok(range.map(move |res| { let (key, _) = res?; metrics::inc_counter_vec(&metrics::DISK_DB_KEY_READ_COUNT, &[column.into()]); metrics::inc_counter_vec_by( @@ -228,10 +229,13 @@ impl Redb { key.value().len() as u64, ); K::from_bytes(key.value()) - }) - }; + })) + })(); - Ok(Box::new(iter)) + match result { + Ok(iter) => Box::new(iter), + Err(err) => Box::new(std::iter::once(Err(err))), + } } /// Iterate through all keys and values in a particular column. @@ -243,13 +247,13 @@ impl Redb { let table_definition: TableDefinition<'_, &[u8], &[u8]> = TableDefinition::new(column.into()); - let iter = { + let result = (|| { let open_db = self.db.read(); let read_txn = open_db.begin_read()?; let table = read_txn.open_table(table_definition)?; + let range = table.range(from..)?; - table - .range(from..)? + Ok(range .take_while(move |res| match res.as_ref() { Ok((_, _)) => true, Err(_) => false, @@ -263,10 +267,13 @@ impl Redb { value.value().len() as u64, ); Ok((K::from_bytes(key.value())?, value.value().to_vec())) - }) - }; + })) + })(); - Ok(Box::new(iter)) + match result { + Ok(iter) => Box::new(iter), + Err(err) => Box::new(std::iter::once(Err(err))), + } } pub fn iter_column(&self, column: DBColumn) -> ColumnIter { diff --git a/beacon_node/store/src/forwards_iter.rs b/beacon_node/store/src/forwards_iter.rs index cc70499ffbd..255b7d8eac8 100644 --- a/beacon_node/store/src/forwards_iter.rs +++ b/beacon_node/store/src/forwards_iter.rs @@ -159,7 +159,6 @@ impl, Cold: ItemStore> Iterator } self.inner .as_mut() - .ok()? .next()? .and_then(|(slot_bytes, root_bytes)| { let slot = slot_bytes diff --git a/beacon_node/store/src/garbage_collection.rs b/beacon_node/store/src/garbage_collection.rs index 38c1f7752bc..06393f2d219 100644 --- a/beacon_node/store/src/garbage_collection.rs +++ b/beacon_node/store/src/garbage_collection.rs @@ -18,7 +18,7 @@ where /// Delete the temporary states that were leftover by failed block imports. pub fn delete_temp_states(&self) -> Result<(), Error> { let mut ops = vec![]; - self.iter_temporary_state_roots()?.for_each(|state_root| { + self.iter_temporary_state_roots().for_each(|state_root| { if let Ok(state_root) = state_root { ops.push(state_root); } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index a64438dfd7b..45b19834925 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -778,7 +778,7 @@ impl, Cold: ItemStore> HotColdDB let mut light_client_updates = vec![]; for res in self .hot_db - .iter_column_from::>(column, &start_period.to_le_bytes())? + .iter_column_from::>(column, &start_period.to_le_bytes()) { let (sync_committee_bytes, light_client_update_bytes) = res?; let sync_committee_period = u64::from_ssz_bytes(&sync_committee_bytes)?; @@ -2077,7 +2077,7 @@ impl, Cold: ItemStore> HotColdDB /// Fetch all keys in the data_column column with prefix `block_root` pub fn get_data_column_keys(&self, block_root: Hash256) -> Result, Error> { self.blobs_db - .iter_column_from::>(DBColumn::BeaconDataColumn, block_root.as_slice())? + .iter_column_from::>(DBColumn::BeaconDataColumn, block_root.as_slice()) .take_while(|res| { let Ok((key, _)) = res else { return false }; @@ -2932,7 +2932,7 @@ impl, Cold: ItemStore> HotColdDB columns.extend(previous_schema_columns); for column in columns { - for res in self.cold_db.iter_column_keys::>(column)? { + for res in self.cold_db.iter_column_keys::>(column) { let key = res?; cold_ops.push(KeyValueStoreOp::DeleteKey(column, key)); } @@ -2976,7 +2976,7 @@ impl, Cold: ItemStore> HotColdDB let mut state_delete_batch = vec![]; for res in self .hot_db - .iter_column::(DBColumn::BeaconStateSummary)? + .iter_column::(DBColumn::BeaconStateSummary) { let (state_root, summary_bytes) = res?; let summary = HotStateSummary::from_ssz_bytes(&summary_bytes)?; diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 57dc846af7d..0cfc42ab156 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -47,9 +47,8 @@ pub use types::*; const DATA_COLUMN_DB_KEY_SIZE: usize = 32 + 8; -pub type ColumnIter<'a, K> = - Result), Error>> + 'a>, Error>; -pub type ColumnKeyIter<'a, K> = Result> + 'a>, Error>; +pub type ColumnIter<'a, K> = Box), Error>> + 'a>; +pub type ColumnKeyIter<'a, K> = Box> + 'a>; pub type RawEntryIter<'a> = Result, Vec), Error>> + 'a>, Error>; diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 16036eebe2f..6070a2d3f0c 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -94,19 +94,17 @@ impl KeyValueStore for MemoryStore { .take_while(|(k, _)| k.remove_column_variable(column).is_some()) .filter_map(|(k, _)| k.remove_column_variable(column).map(|k| k.to_vec())) .collect::>(); - Ok(Box::new(keys.into_iter().filter_map(move |key| { + Box::new(keys.into_iter().filter_map(move |key| { self.get_bytes(column, &key).transpose().map(|res| { let k = K::from_bytes(&key)?; let v = res?; Ok((k, v)) }) - }))) + })) } fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter { - Ok(Box::new( - self.iter_column(column)?.map(|res| res.map(|(k, _)| k)), - )) + Box::new(self.iter_column(column).map(|res| res.map(|(k, _)| k))) } fn begin_rw_transaction(&self) -> MutexGuard<()> { @@ -129,9 +127,7 @@ impl KeyValueStore for MemoryStore { .take_while(|(k, _)| k.remove_column_variable(column).is_some()) .filter_map(|(k, _)| k.remove_column_variable(column).map(|k| k.to_vec())) .collect::>(); - Ok(Box::new( - keys.into_iter().map(move |key| K::from_bytes(&key)), - )) + Box::new(keys.into_iter().map(move |key| K::from_bytes(&key))) } fn delete_batch(&self, col: DBColumn, ops: HashSet<&[u8]>) -> Result<(), DBError> { diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index 9d3ebc23676..bed90df9df0 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -172,7 +172,6 @@ pub fn inspect_db( for res in sub_db .iter_column::>(inspect_config.column) - .map_err(|e| format!("Unable to iterate column: {:?}", e))? .skip(skip) .take(limit) { diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index c3035113387..93fd81f107d 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -7,7 +7,7 @@ autotests = false rust-version = "1.80.0" [features] -default = ["slasher-lmdb", "beacon-node-leveldb"] +default = ["slasher-lmdb", "beacon-node-redb"] # Writes debugging .ssz files to /tmp during block processing. write_ssz_files = ["beacon_node/write_ssz_files"] # Compiles the BLS crypto code so that the binary is portable across machines. From 2d47cc1413052e5443d8d799d2d7c9d145b88c4c Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Tue, 28 Jan 2025 19:41:29 +0300 Subject: [PATCH 5/5] default to leveldb --- lighthouse/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index 93fd81f107d..c3035113387 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -7,7 +7,7 @@ autotests = false rust-version = "1.80.0" [features] -default = ["slasher-lmdb", "beacon-node-redb"] +default = ["slasher-lmdb", "beacon-node-leveldb"] # Writes debugging .ssz files to /tmp during block processing. write_ssz_files = ["beacon_node/write_ssz_files"] # Compiles the BLS crypto code so that the binary is portable across machines.