diff --git a/cmd/evm/internal/t8ntool/transition.go b/cmd/evm/internal/t8ntool/transition.go index d94c1c2d3bb..f998216f026 100644 --- a/cmd/evm/internal/t8ntool/transition.go +++ b/cmd/evm/internal/t8ntool/transition.go @@ -307,7 +307,7 @@ func Main(ctx *cli.Context) error { } defer tx.Rollback() - sd, err := libstate.NewSharedDomains(tx, log.New()) + sd, err := libstate.NewSharedDomains(tx, db, log.New()) if err != nil { return err } @@ -330,7 +330,7 @@ func Main(ctx *cli.Context) error { } // state root calculation - root, err := CalculateStateRoot(tx) + root, err := CalculateStateRoot(tx, db) if err != nil { return err } @@ -621,7 +621,7 @@ func NewHeader(env stEnv) *types.Header { return &header } -func CalculateStateRoot(tx kv.RwTx) (*libcommon.Hash, error) { +func CalculateStateRoot(tx kv.RwTx, db kv.RoDB) (*libcommon.Hash, error) { // Generate hashed state c, err := tx.RwCursor(kv.PlainState) if err != nil { @@ -630,7 +630,7 @@ func CalculateStateRoot(tx kv.RwTx) (*libcommon.Hash, error) { defer c.Close() h := libcommon.NewHasher() defer libcommon.ReturnHasherToPool(h) - domains, err := libstate.NewSharedDomains(tx, log.New()) + domains, err := libstate.NewSharedDomains(tx, db, log.New()) if err != nil { return nil, fmt.Errorf("NewSharedDomains: %w", err) } diff --git a/cmd/evm/runner.go b/cmd/evm/runner.go index be5330beb0f..04e02fa79ff 100644 --- a/cmd/evm/runner.go +++ b/cmd/evm/runner.go @@ -186,7 +186,7 @@ func runCmd(ctx *cli.Context) error { } defer tx.Rollback() - sd, err := state2.NewSharedDomains(tx, log.Root()) + sd, err := state2.NewSharedDomains(tx, db, log.Root()) if err != nil { return err } diff --git a/cmd/evm/staterunner.go b/cmd/evm/staterunner.go index a231ca88576..a60367ce7ff 100644 --- a/cmd/evm/staterunner.go +++ b/cmd/evm/staterunner.go @@ -165,7 +165,7 @@ func aggregateResultsFromStateTests( // Run the test and aggregate the result result := &StatetestResult{Name: key, Fork: st.Fork, Pass: true} - statedb, root, err := test.Run(tx, st, cfg, dirs) + statedb, root, err := test.Run(tx, db, st, cfg, dirs) if err != nil { // Test failed, mark as so and dump any state to aid debugging result.Pass, result.Error = false, err.Error() diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 81c44355106..98f4ed56ffb 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -664,7 +664,7 @@ func stageSnapshots(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) ac := agg.BeginFilesRo() defer ac.Close() - domains, err := libstate.NewSharedDomains(tx, logger) + domains, err := libstate.NewSharedDomains(tx, db, logger) if err != nil { return err } @@ -1116,7 +1116,7 @@ func stageExec(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error if unwind > 0 { u := sync.NewUnwindState(stages.Execution, s.BlockNumber-unwind, s.BlockNumber, true, false) - err := stagedsync.UnwindExecutionStage(u, s, txc, ctx, cfg, logger) + err := stagedsync.UnwindExecutionStage(u, s, txc, db, ctx, cfg, logger) if err != nil { return err } @@ -1143,7 +1143,7 @@ func stageExec(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error return err } if execProgress == 0 { - doms, err := libstate.NewSharedDomains(tx, log.New()) + doms, err := libstate.NewSharedDomains(tx, db, log.New()) if err != nil { panic(err) } @@ -1170,7 +1170,7 @@ func stageExec(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error defer tx.Rollback() for bn := execProgress; bn < block; bn++ { txc.Tx = tx - if err := stagedsync.SpawnExecuteBlocksStage(s, sync, txc, bn, ctx, cfg, logger); err != nil { + if err := stagedsync.SpawnExecuteBlocksStage(s, sync, txc, db, bn, ctx, cfg, logger); err != nil { return err } } @@ -1178,7 +1178,7 @@ func stageExec(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error if err := db.Update(ctx, func(tx kv.RwTx) error { for bn := execProgress; bn < block; bn++ { txc.Tx = tx - if err := stagedsync.SpawnExecuteBlocksStage(s, sync, txc, bn, ctx, cfg, logger); err != nil { + if err := stagedsync.SpawnExecuteBlocksStage(s, sync, txc, db, bn, ctx, cfg, logger); err != nil { return err } } @@ -1190,7 +1190,7 @@ func stageExec(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error return nil } - if err := stagedsync.SpawnExecuteBlocksStage(s, sync, txc, block, ctx, cfg, logger); err != nil { + if err := stagedsync.SpawnExecuteBlocksStage(s, sync, txc, db, block, ctx, cfg, logger); err != nil { return err } diff --git a/cmd/integration/commands/state_domains.go b/cmd/integration/commands/state_domains.go index 5d839881b14..d231c2fd847 100644 --- a/cmd/integration/commands/state_domains.go +++ b/cmd/integration/commands/state_domains.go @@ -471,7 +471,7 @@ func requestDomains(chainDb, stateDb kv.RwDB, ctx context.Context, readDomain st stateTx, err := stateDb.BeginRw(ctx) must(err) defer stateTx.Rollback() - domains, err := state3.NewSharedDomains(stateTx, logger) + domains, err := state3.NewSharedDomains(stateTx, chainDb, logger) if err != nil { return err } diff --git a/cmd/integration/commands/state_stages.go b/cmd/integration/commands/state_stages.go index a70d9398594..ece73cb077f 100644 --- a/cmd/integration/commands/state_stages.go +++ b/cmd/integration/commands/state_stages.go @@ -172,7 +172,7 @@ func syncBySmallSteps(db kv.TemporalRwDB, miningConfig params.MiningConfig, ctx } defer tx.Rollback() - sd, err := stateLib.NewSharedDomains(tx, logger1) + sd, err := stateLib.NewSharedDomains(tx, db, logger1) if err != nil { return err } @@ -193,7 +193,7 @@ func syncBySmallSteps(db kv.TemporalRwDB, miningConfig params.MiningConfig, ctx execUntilFunc := func(execToBlock uint64) stagedsync.ExecFunc { return func(badBlockUnwind bool, s *stagedsync.StageState, unwinder stagedsync.Unwinder, txc wrap.TxContainer, logger log.Logger) error { - if err := stagedsync.SpawnExecuteBlocksStage(s, unwinder, txc, execToBlock, ctx, execCfg, logger); err != nil { + if err := stagedsync.SpawnExecuteBlocksStage(s, unwinder, txc, db, execToBlock, ctx, execCfg, logger); err != nil { return fmt.Errorf("spawnExecuteBlocksStage: %w", err) } return nil @@ -425,7 +425,7 @@ func loopExec(db kv.TemporalRwDB, ctx context.Context, unwind uint64, logger log // set block limit of execute stage sync.MockExecFunc(stages.Execution, func(badBlockUnwind bool, stageState *stagedsync.StageState, unwinder stagedsync.Unwinder, txc wrap.TxContainer, logger log.Logger) error { - if err = stagedsync.SpawnExecuteBlocksStage(stageState, sync, txc, to, ctx, cfg, logger); err != nil { + if err = stagedsync.SpawnExecuteBlocksStage(stageState, sync, txc, db, to, ctx, cfg, logger); err != nil { return fmt.Errorf("spawnExecuteBlocksStage: %w", err) } return nil diff --git a/core/chain_makers.go b/core/chain_makers.go index f167f52d554..47f3567e032 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -332,7 +332,7 @@ func GenerateChain(config *chain.Config, parent *types.Block, engine consensus.E defer tx.Rollback() logger := log.New("generate-chain", config.ChainName) - domains, err := libstate.NewSharedDomains(tx, logger) + domains, err := libstate.NewSharedDomains(tx, db, logger) if err != nil { return nil, err } @@ -454,8 +454,8 @@ func hashKeyAndAddIncarnation(k []byte, h *libcommon.Hasher) (newK []byte, err e return newK, nil } -func CalcHashRootForTests(tx kv.RwTx, header *types.Header, histV4, trace bool) (hashRoot libcommon.Hash, err error) { - domains, err := libstate.NewSharedDomains(tx, log.New()) +func CalcHashRootForTests(tx kv.RwTx, db kv.RwDB, header *types.Header, histV4, trace bool) (hashRoot libcommon.Hash, err error) { + domains, err := libstate.NewSharedDomains(tx, db, log.New()) if err != nil { return hashRoot, fmt.Errorf("NewSharedDomains: %w", err) } diff --git a/core/genesis_write.go b/core/genesis_write.go index 11ec524f70c..53144214ae9 100644 --- a/core/genesis_write.go +++ b/core/genesis_write.go @@ -522,7 +522,7 @@ func GenesisToBlock(g *types.Genesis, dirs datadir.Dirs, logger log.Logger) (*ty } defer tx.Rollback() - sd, err := state2.NewSharedDomains(tx, logger) + sd, err := state2.NewSharedDomains(tx, tdb, logger) if err != nil { return err } diff --git a/core/rawdb/rawtemporaldb/accessors_receipt_test.go b/core/rawdb/rawtemporaldb/accessors_receipt_test.go index 43dcca7249a..d6177318295 100644 --- a/core/rawdb/rawtemporaldb/accessors_receipt_test.go +++ b/core/rawdb/rawtemporaldb/accessors_receipt_test.go @@ -21,7 +21,7 @@ func TestAppendReceipt(t *testing.T) { defer tx.Rollback() ttx := tx.(kv.TemporalTx) - doms, err := state.NewSharedDomains(ttx, log.New()) + doms, err := state.NewSharedDomains(ttx, db, log.New()) require.NoError(err) defer doms.Close() doms.SetTx(ttx) diff --git a/core/state/access_list_test.go b/core/state/access_list_test.go index cd53e62af65..c07e3f08368 100644 --- a/core/state/access_list_test.go +++ b/core/state/access_list_test.go @@ -89,9 +89,9 @@ func TestAccessList(t *testing.T) { addr := common.HexToAddress slot := common.HexToHash - _, tx, _ := NewTestTemporalDb(t) + db, tx, _ := NewTestTemporalDb(t) - domains, err := stateLib.NewSharedDomains(tx, log.New()) + domains, err := stateLib.NewSharedDomains(tx, db, log.New()) require.NoError(t, err) defer domains.Close() diff --git a/core/state/database_test.go b/core/state/database_test.go index 4ca21c21839..20dadcc1473 100644 --- a/core/state/database_test.go +++ b/core/state/database_test.go @@ -923,8 +923,8 @@ func TestReproduceCrash(t *testing.T) { storageKey2 := libcommon.HexToHash("0x0e4c0e7175f9d22279a4f63ff74f7fa28b7a954a6454debaa62ce43dd9132542") value2 := uint256.NewInt(0x58c00a51) - _, tx, _ := state.NewTestTemporalDb(t) - sd, err := state3.NewSharedDomains(tx, log.New()) + db, tx, _ := state.NewTestTemporalDb(t) + sd, err := state3.NewSharedDomains(tx, db, log.New()) require.NoError(t, err) t.Cleanup(sd.Close) @@ -1343,8 +1343,8 @@ func TestChangeAccountCodeBetweenBlocks(t *testing.T) { t.Parallel() contract := libcommon.HexToAddress("0x71dd1027069078091B3ca48093B00E4735B20624") - _, tx, _ := state.NewTestTemporalDb(t) - sd, err := state3.NewSharedDomains(tx, log.New()) + db, tx, _ := state.NewTestTemporalDb(t) + sd, err := state3.NewSharedDomains(tx, db, log.New()) require.NoError(t, err) t.Cleanup(sd.Close) @@ -1393,8 +1393,8 @@ func TestCacheCodeSizeSeparately(t *testing.T) { contract := libcommon.HexToAddress("0x71dd1027069078091B3ca48093B00E4735B20624") //root := libcommon.HexToHash("0xb939e5bcf5809adfb87ab07f0795b05b95a1d64a90f0eddd0c3123ac5b433854") - _, tx, _ := state.NewTestTemporalDb(t) - sd, err := state3.NewSharedDomains(tx, log.New()) + db, tx, _ := state.NewTestTemporalDb(t) + sd, err := state3.NewSharedDomains(tx, db, log.New()) require.NoError(t, err) t.Cleanup(sd.Close) @@ -1431,8 +1431,8 @@ func TestCacheCodeSizeInTrie(t *testing.T) { contract := libcommon.HexToAddress("0x71dd1027069078091B3ca48093B00E4735B20624") root := libcommon.HexToHash("0xb939e5bcf5809adfb87ab07f0795b05b95a1d64a90f0eddd0c3123ac5b433854") - _, tx, _ := state.NewTestTemporalDb(t) - sd, err := state3.NewSharedDomains(tx, log.New()) + db, tx, _ := state.NewTestTemporalDb(t) + sd, err := state3.NewSharedDomains(tx, db, log.New()) require.NoError(t, err) t.Cleanup(sd.Close) diff --git a/core/state/intra_block_state_logger_test.go b/core/state/intra_block_state_logger_test.go index 64cc73d59f6..8381b6fb162 100644 --- a/core/state/intra_block_state_logger_test.go +++ b/core/state/intra_block_state_logger_test.go @@ -114,9 +114,9 @@ func TestStateLogger(t *testing.T) { for _, tt := range cases { t.Run(tt.name, func(t *testing.T) { - _, tx, _ := NewTestTemporalDb(t) + db, tx, _ := NewTestTemporalDb(t) - domains, err := stateLib.NewSharedDomains(tx, log.New()) + domains, err := stateLib.NewSharedDomains(tx, db, log.New()) require.NoError(t, err) defer domains.Close() diff --git a/core/state/intra_block_state_test.go b/core/state/intra_block_state_test.go index 0c463a473c7..b244156dad3 100644 --- a/core/state/intra_block_state_test.go +++ b/core/state/intra_block_state_test.go @@ -261,7 +261,7 @@ func (test *snapshotTest) run() bool { } defer tx.Rollback() - domains, err := stateLib.NewSharedDomains(tx, log.New()) + domains, err := stateLib.NewSharedDomains(tx, _db, log.New()) if err != nil { test.err = err return false diff --git a/core/state/state_test.go b/core/state/state_test.go index c1a3e561f08..904ee6c959b 100644 --- a/core/state/state_test.go +++ b/core/state/state_test.go @@ -139,7 +139,7 @@ func (s *StateSuite) SetUpTest(c *checker.C) { } defer tx.Rollback() - domains, err := stateLib.NewSharedDomains(tx, log.New()) + domains, err := stateLib.NewSharedDomains(tx, _db, log.New()) if err != nil { panic(err) } @@ -250,9 +250,9 @@ func (s *StateSuite) TestSnapshotEmpty(c *checker.C) { func TestSnapshot2(t *testing.T) { //TODO: why I shouldn't recreate writer here? And why domains.SetBlockNum(1) is enough for green test? t.Parallel() - _, tx, _ := NewTestTemporalDb(t) + db, tx, _ := NewTestTemporalDb(t) - domains, err := stateLib.NewSharedDomains(tx, log.New()) + domains, err := stateLib.NewSharedDomains(tx, db, log.New()) require.NoError(t, err) defer domains.Close() @@ -414,9 +414,9 @@ func NewTestTemporalDb(tb testing.TB) (kv.TemporalRwDB, kv.TemporalRwTx, *state. func TestDump(t *testing.T) { t.Parallel() - _, tx, _ := NewTestTemporalDb(t) + db, tx, _ := NewTestTemporalDb(t) - domains, err := state.NewSharedDomains(tx, log.New()) + domains, err := state.NewSharedDomains(tx, db, log.New()) require.NoError(t, err) defer domains.Close() diff --git a/core/test/domains_restart_test.go b/core/test/domains_restart_test.go index cb542d46b18..a90fa0f8328 100644 --- a/core/test/domains_restart_test.go +++ b/core/test/domains_restart_test.go @@ -103,7 +103,7 @@ func Test_AggregatorV3_RestartOnDatadir_WithoutDB(t *testing.T) { domCtx := agg.BeginFilesRo() defer domCtx.Close() - domains, err := state.NewSharedDomains(tx, log.New()) + domains, err := state.NewSharedDomains(tx, db, log.New()) require.NoError(t, err) defer domains.Close() domains.SetTxNum(0) @@ -215,7 +215,7 @@ func Test_AggregatorV3_RestartOnDatadir_WithoutDB(t *testing.T) { require.NoError(t, err) domCtx = agg.BeginFilesRo() defer domCtx.Close() - domains, err = state.NewSharedDomains(tx, log.New()) + domains, err = state.NewSharedDomains(tx, db, log.New()) require.NoError(t, err) defer domains.Close() @@ -249,7 +249,7 @@ func Test_AggregatorV3_RestartOnDatadir_WithoutDB(t *testing.T) { defer tx.Rollback() domCtx = agg.BeginFilesRo() defer domCtx.Close() - domains, err = state.NewSharedDomains(tx, log.New()) + domains, err = state.NewSharedDomains(tx, db, log.New()) require.NoError(t, err) defer domains.Close() writer = state2.NewWriterV4(domains) @@ -309,7 +309,7 @@ func Test_AggregatorV3_RestartOnDatadir_WithoutAnything(t *testing.T) { domCtx := agg.BeginFilesRo() defer domCtx.Close() - domains, err := state.NewSharedDomains(tx, log.New()) + domains, err := state.NewSharedDomains(tx, db, log.New()) require.NoError(t, err) defer domains.Close() domains.SetTxNum(0) @@ -399,7 +399,7 @@ func Test_AggregatorV3_RestartOnDatadir_WithoutAnything(t *testing.T) { domCtx = agg.BeginFilesRo() defer domCtx.Close() - domains, err = state.NewSharedDomains(tx, log.New()) + domains, err = state.NewSharedDomains(tx, db, log.New()) require.NoError(t, err) defer domains.Close() @@ -419,7 +419,7 @@ func Test_AggregatorV3_RestartOnDatadir_WithoutAnything(t *testing.T) { defer tx.Rollback() domCtx = agg.BeginFilesRo() defer domCtx.Close() - domains, err = state.NewSharedDomains(tx, log.New()) + domains, err = state.NewSharedDomains(tx, db, log.New()) require.NoError(t, err) defer domains.Close() @@ -486,7 +486,7 @@ func TestCommit(t *testing.T) { domCtx := agg.BeginFilesRo() defer domCtx.Close() - domains, err := state.NewSharedDomains(tx, log.New()) + domains, err := state.NewSharedDomains(tx, db, log.New()) require.NoError(t, err) defer domains.Close() @@ -519,7 +519,7 @@ func TestCommit(t *testing.T) { require.NoError(t, err) core.GenerateTrace = true - oldHash, err := core.CalcHashRootForTests(tx, &types.Header{Number: big.NewInt(1)}, true, true) + oldHash, err := core.CalcHashRootForTests(tx, db, &types.Header{Number: big.NewInt(1)}, true, true) require.NoError(t, err) t.Logf("old hash %x\n", oldHash) diff --git a/core/vm/gas_table_test.go b/core/vm/gas_table_test.go index 3a2bd414611..3cfecbd8686 100644 --- a/core/vm/gas_table_test.go +++ b/core/vm/gas_table_test.go @@ -118,7 +118,7 @@ func testTemporalTxSD(t *testing.T, db *temporal.DB) (kv.RwTx, *state3.SharedDom require.NoError(t, err) t.Cleanup(tx.Rollback) - sd, err := state3.NewSharedDomains(tx, log.New()) + sd, err := state3.NewSharedDomains(tx, db, log.New()) require.NoError(t, err) t.Cleanup(sd.Close) @@ -200,7 +200,7 @@ func TestCreateGas(t *testing.T) { eface := *(*[2]uintptr)(unsafe.Pointer(&tx)) fmt.Printf("init tx %x\n", eface[1]) - domains, err := state3.NewSharedDomains(txc.Tx, log.New()) + domains, err := state3.NewSharedDomains(txc.Tx, db, log.New()) require.NoError(t, err) defer domains.Close() txc.Doms = domains diff --git a/core/vm/runtime/runtime.go b/core/vm/runtime/runtime.go index cdf3d26dd89..396bb5cd48d 100644 --- a/core/vm/runtime/runtime.go +++ b/core/vm/runtime/runtime.go @@ -145,7 +145,7 @@ func Execute(code, input []byte, cfg *Config, tempdir string) ([]byte, *state.In return nil, nil, err } defer tx.Rollback() - sd, err := state3.NewSharedDomains(tx, log.New()) + sd, err := state3.NewSharedDomains(tx, db, log.New()) if err != nil { return nil, nil, err } @@ -207,7 +207,7 @@ func Create(input []byte, cfg *Config, blockNr uint64) ([]byte, libcommon.Addres return nil, [20]byte{}, 0, err } defer tx.Rollback() - sd, err := state3.NewSharedDomains(tx, log.New()) + sd, err := state3.NewSharedDomains(tx, db, log.New()) if err != nil { return nil, [20]byte{}, 0, err } diff --git a/core/vm/runtime/runtime_test.go b/core/vm/runtime/runtime_test.go index 9df1c03717b..9ae2b8a1161 100644 --- a/core/vm/runtime/runtime_test.go +++ b/core/vm/runtime/runtime_test.go @@ -146,8 +146,8 @@ func TestExecute(t *testing.T) { func TestCall(t *testing.T) { t.Parallel() - _, tx, _ := NewTestTemporalDb(t) - domains, err := stateLib.NewSharedDomains(tx, log.New()) + db, tx, _ := NewTestTemporalDb(t) + domains, err := stateLib.NewSharedDomains(tx, db, log.New()) require.NoError(t, err) defer domains.Close() state := state.New(state.NewReaderV3(domains)) @@ -191,7 +191,7 @@ func testTemporalTxSD(t testing.TB, db *temporal.DB) (kv.RwTx, *stateLib.SharedD require.NoError(t, err) t.Cleanup(tx.Rollback) - sd, err := stateLib.NewSharedDomains(tx, log.New()) + sd, err := stateLib.NewSharedDomains(tx, db, log.New()) require.NoError(t, err) t.Cleanup(sd.Close) @@ -243,7 +243,7 @@ func benchmarkEVM_Create(b *testing.B, code string) { db := testTemporalDB(b) tx, err := db.BeginTemporalRw(context.Background()) require.NoError(b, err) - domains, err := stateLib.NewSharedDomains(tx, log.New()) + domains, err := stateLib.NewSharedDomains(tx, db, log.New()) require.NoError(b, err) defer domains.Close() @@ -467,7 +467,7 @@ func benchmarkNonModifyingCode(b *testing.B, gas uint64, code []byte, name strin db := testTemporalDB(b) tx, err := db.BeginTemporalRw(context.Background()) require.NoError(b, err) - domains, err := stateLib.NewSharedDomains(tx, log.New()) + domains, err := stateLib.NewSharedDomains(tx, db, log.New()) require.NoError(b, err) defer domains.Close() diff --git a/erigon-lib/commitment/commitment.go b/erigon-lib/commitment/commitment.go index a6a6d61d2af..c1c936d7e32 100644 --- a/erigon-lib/commitment/commitment.go +++ b/erigon-lib/commitment/commitment.go @@ -22,12 +22,13 @@ import ( "encoding/binary" "errors" "fmt" - "github.com/erigontech/erigon-lib/types/accounts" "math/bits" "sort" "strings" "unsafe" + "github.com/erigontech/erigon-lib/types/accounts" + "github.com/holiman/uint256" "github.com/google/btree" @@ -136,7 +137,7 @@ func InitializeTrieAndUpdates(tv TrieVariant, mode Mode, tmpdir string) (Trie, * fallthrough default: - trie := NewHexPatriciaHashed(length.Addr, nil, tmpdir) + trie := NewParallelHexPatriciaHashed(length.Addr, nil, tmpdir) tree := NewUpdates(mode, tmpdir, KeyToHexNibbleHash) return trie, tree } diff --git a/erigon-lib/commitment/hex_patricia_hashed.go b/erigon-lib/commitment/hex_patricia_hashed.go index 1ee923607e0..655da0c7f8c 100644 --- a/erigon-lib/commitment/hex_patricia_hashed.go +++ b/erigon-lib/commitment/hex_patricia_hashed.go @@ -30,6 +30,7 @@ import ( "runtime" "sort" "strings" + "sync" "sync/atomic" "time" @@ -48,6 +49,7 @@ import ( ecrypto "github.com/erigontech/erigon-lib/crypto" "github.com/erigontech/erigon-lib/rlp" "golang.org/x/crypto/sha3" + "golang.org/x/sync/errgroup" ) // keccakState wraps sha3.state. In addition to the usual hash methods, it also supports @@ -95,6 +97,84 @@ type HexPatriciaHashed struct { accValBuf rlp.RlpEncodedBytes } +type nibbleUpdate struct { + hashedKey, plainKey []byte + stateUpdate *Update +} + +type ParallelPatriciaHashed struct { + HexPatriciaHashed + rootLocker sync.Mutex +} + +func (phph *ParallelPatriciaHashed) StartKeyProcessing(ctx context.Context, keyUpdates []*nibbleUpdate) error { + var ( + update *Update + err error + ) + phph.rootLocker.Lock() + defer phph.rootLocker.Unlock() + + for _, keyUpdate := range keyUpdates { + hashedKey := keyUpdate.hashedKey + plainKey := keyUpdate.plainKey + stateUpdate := keyUpdate.stateUpdate + // Keep folding until the currentKey is the prefix of the key we modify + for phph.needFolding(hashedKey) { + if err = phph.fold(); err != nil { + return err + } + } + + // Now unfold until we step on an empty cell + for unfolding := phph.needUnfolding(hashedKey); unfolding > 0; unfolding = phph.needUnfolding(hashedKey) { + if err = phph.unfold(hashedKey, unfolding); err != nil { + return fmt.Errorf("unfold: %w", err) + } + } + + if stateUpdate == nil { + // Update the cell + if len(plainKey) == phph.accountKeyLen { + update, err = phph.ctx.Account(plainKey) + if err != nil { + return fmt.Errorf("GetAccount for key %x failed: %w", plainKey, err) + } + } else { + update, err = phph.ctx.Storage(plainKey) + if err != nil { + return fmt.Errorf("GetStorage for key %x failed: %w", plainKey, err) + } + } + } else { + if update == nil { + update = stateUpdate + } else { + update.Reset() + update.Merge(stateUpdate) + } + } + phph.updateCell(plainKey, hashedKey, update) + mxTrieProcessedKeys.Inc() + + } + return nil +} + +func NewParallelHexPatriciaHashed(accountKeyLen int, ctx PatriciaContext, tmpdir string) *ParallelPatriciaHashed { + hph := &HexPatriciaHashed{ + ctx: ctx, + keccak: sha3.NewLegacyKeccak256().(keccakState), + keccak2: sha3.NewLegacyKeccak256().(keccakState), + accountKeyLen: accountKeyLen, + auxBuffer: bytes.NewBuffer(make([]byte, 8192)), + hadToLoadL: make(map[uint64]skipStat), + accValBuf: make(rlp.RlpEncodedBytes, 128), + } + hph.branchEncoder = NewBranchEncoder(1024, filepath.Join(tmpdir, "branch-encoder")) + return &ParallelPatriciaHashed{HexPatriciaHashed: *hph} +} + func NewHexPatriciaHashed(accountKeyLen int, ctx PatriciaContext, tmpdir string) *HexPatriciaHashed { hph := &HexPatriciaHashed{ ctx: ctx, @@ -162,6 +242,7 @@ func (f loadFlags) addFlag(loadFlags loadFlags) loadFlags { } const ( + TotalNibbles = 16 cellLoadNone = loadFlags(0) cellLoadAccount = loadFlags(1) cellLoadStorage = loadFlags(2) @@ -1998,6 +2079,97 @@ func (hph *HexPatriciaHashed) GenerateWitness(ctx context.Context, updates *Upda return witnessTrie, rootHash, nil } +func (phph *ParallelPatriciaHashed) Process(ctx context.Context, updates *Updates, logPrefix string) (rootHash []byte, err error) { + var ( + ki uint64 + + updatesCount = updates.Size() + start = time.Now() + logEvery = time.NewTicker(20 * time.Second) + ) + defer logEvery.Stop() + //hph.trace = true + nibbleUpdates := make([]*nibbleUpdate, 0) + currentNibble := 16 // invalid nibble value + + keyProcessing, ctx := errgroup.WithContext(ctx) + keyProcessing.SetLimit(TotalNibbles) + + // collect updates for each starting nibble and submit it to goroutine for asynchronous unfolding/processing + updates.HashSort(ctx, func(hashedKey, plainKey []byte, stateUpdate *Update) error { + if currentNibble != int(hashedKey[0]) { + nu := append([]*nibbleUpdate{}, nibbleUpdates...) + keyProcessing.Go(func() error { + return phph.StartKeyProcessing(ctx, nu) + }) + nibbleUpdates = make([]*nibbleUpdate, 0) + } + currentNibble = int(hashedKey[0]) + nibbleUpdates = append(nibbleUpdates, &nibbleUpdate{hashedKey: append([]byte{}, hashedKey...), plainKey: append([]byte{}, plainKey...), stateUpdate: stateUpdate}) + return nil + }) + // for the last nibble from the upadtes + keyProcessing.Go(func() error { + return phph.StartKeyProcessing(ctx, append([]*nibbleUpdate{}, nibbleUpdates...)) + }) + + if err := keyProcessing.Wait(); err != nil { + return nil, fmt.Errorf("processing keys failed: %w", err) + } + + // Folding everything up to the root + for phph.activeRows > 0 { + if err := phph.fold(); err != nil { + return nil, fmt.Errorf("final fold: %w", err) + } + } + + rootHash, err = phph.RootHash() + if err != nil { + return nil, fmt.Errorf("root hash evaluation failed: %w", err) + } + if phph.trace { + fmt.Printf("root hash %x updates %d\n", rootHash, updatesCount) + } + err = phph.branchEncoder.Load(phph.ctx, etl.TransformArgs{Quit: ctx.Done()}) + if err != nil { + return nil, fmt.Errorf("branch update failed: %w", err) + } + if dbg.KVReadLevelledMetrics { + log.Debug("commitment finished, counters updated (no reset)", + //"hadToLoad", common.PrettyCounter(hadToLoad.Load()), "skippedLoad", common.PrettyCounter(skippedLoad.Load()), + //"hadToReset", common.PrettyCounter(hadToReset.Load()), + "skip ratio", fmt.Sprintf("%.1f%%", 100*(float64(skippedLoad.Load())/float64(hadToLoad.Load()+skippedLoad.Load()))), + "reset ratio", fmt.Sprintf("%.1f%%", 100*(float64(hadToReset.Load())/float64(hadToLoad.Load()))), + "keys", common.PrettyCounter(ki), "spent", time.Since(start), + ) + ends := make([]uint64, 0, len(phph.hadToLoadL)) + for k := range phph.hadToLoadL { + ends = append(ends, k) + } + sort.Slice(ends, func(i, j int) bool { return ends[i] > ends[j] }) + var Li int + for _, k := range ends { + v := phph.hadToLoadL[k] + accs := fmt.Sprintf("load=%s skip=%s (%.1f%%) reset %.1f%%", common.PrettyCounter(v.accLoaded), common.PrettyCounter(v.accSkipped), 100*(float64(v.accSkipped)/float64(v.accLoaded+v.accSkipped)), 100*(float64(v.accReset)/float64(v.accReset+v.accSkipped))) + stors := fmt.Sprintf("load=%s skip=%s (%.1f%%) reset %.1f%%", common.PrettyCounter(v.storLoaded), common.PrettyCounter(v.storSkipped), 100*(float64(v.storSkipped)/float64(v.storLoaded+v.storSkipped)), 100*(float64(v.storReset)/float64(v.storReset+v.storSkipped))) + if k == 0 { + log.Debug("branchData memoization, new branches", "endStep", k, "accounts", accs, "storages", stors) + } else { + log.Debug("branchData memoization", "L", Li, "endStep", k, "accounts", accs, "storages", stors) + Li++ + + mxTrieStateLevelledSkipRatesAccount[min(Li, 5)].Add(float64(v.accSkipped)) + mxTrieStateLevelledSkipRatesStorage[min(Li, 5)].Add(float64(v.storSkipped)) + mxTrieStateLevelledLoadRatesAccount[min(Li, 5)].Add(float64(v.accLoaded)) + mxTrieStateLevelledLoadRatesStorage[min(Li, 5)].Add(float64(v.storLoaded)) + } + } + } + + return rootHash, nil +} + func (hph *HexPatriciaHashed) Process(ctx context.Context, updates *Updates, logPrefix string) (rootHash []byte, err error) { var ( m runtime.MemStats diff --git a/erigon-lib/kv/kvcache/cache_test.go b/erigon-lib/kv/kvcache/cache_test.go index ced48cd0d3e..d48e419dfd8 100644 --- a/erigon-lib/kv/kvcache/cache_test.go +++ b/erigon-lib/kv/kvcache/cache_test.go @@ -228,7 +228,7 @@ func TestAPI(t *testing.T) { var txID uint64 require.NoError(db.Update(context.Background(), func(tx kv.RwTx) error { txID = tx.ViewID() - d, err := state.NewSharedDomains(tx, log.New()) + d, err := state.NewSharedDomains(tx, db, log.New()) if err != nil { return err } diff --git a/erigon-lib/state/aggregator_bench_test.go b/erigon-lib/state/aggregator_bench_test.go index 24ee3871c78..4652949733d 100644 --- a/erigon-lib/state/aggregator_bench_test.go +++ b/erigon-lib/state/aggregator_bench_test.go @@ -80,7 +80,7 @@ func BenchmarkAggregator_Processing(b *testing.B) { ac := agg.BeginFilesRo() defer ac.Close() - domains, err := NewSharedDomains(WrapTxWithCtx(tx, ac), log.New()) + domains, err := NewSharedDomains(WrapTxWithCtx(tx, ac), db, log.New()) require.NoError(b, err) defer domains.Close() diff --git a/erigon-lib/state/aggregator_fuzz_test.go b/erigon-lib/state/aggregator_fuzz_test.go index 470aa21fe23..97234979e65 100644 --- a/erigon-lib/state/aggregator_fuzz_test.go +++ b/erigon-lib/state/aggregator_fuzz_test.go @@ -21,10 +21,11 @@ package state import ( "context" "encoding/binary" - "github.com/erigontech/erigon-lib/types/accounts" "testing" "time" + "github.com/erigontech/erigon-lib/types/accounts" + "github.com/c2h5oh/datasize" "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/common/datadir" @@ -64,7 +65,7 @@ func Fuzz_AggregatorV3_Merge(f *testing.F) { ac := agg.BeginFilesRo() defer ac.Close() - domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), db, log.New()) require.NoError(f, err) defer domains.Close() @@ -191,7 +192,7 @@ func Fuzz_AggregatorV3_MergeValTransform(f *testing.F) { }() ac := agg.BeginFilesRo() defer ac.Close() - domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), db, log.New()) require.NoError(f, err) defer domains.Close() diff --git a/erigon-lib/state/aggregator_test.go b/erigon-lib/state/aggregator_test.go index 10fe639777b..0b83eb0c2da 100644 --- a/erigon-lib/state/aggregator_test.go +++ b/erigon-lib/state/aggregator_test.go @@ -31,6 +31,8 @@ import ( "testing" "time" + "github.com/erigontech/erigon-lib/types/accounts" + "github.com/c2h5oh/datasize" "github.com/erigontech/erigon-lib/commitment" @@ -47,7 +49,6 @@ import ( "github.com/erigontech/erigon-lib/kv/stream" "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon-lib/seg" - "github.com/erigontech/erigon-lib/types/accounts" "github.com/holiman/uint256" "github.com/stretchr/testify/require" ) @@ -65,7 +66,7 @@ func TestAggregatorV3_Merge(t *testing.T) { ac := agg.BeginFilesRo() defer ac.Close() - domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), db, log.New()) require.NoError(t, err) defer domains.Close() @@ -186,7 +187,7 @@ func TestAggregatorV3_MergeValTransform(t *testing.T) { }() ac := agg.BeginFilesRo() defer ac.Close() - domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), db, log.New()) require.NoError(t, err) defer domains.Close() @@ -317,7 +318,7 @@ func aggregatorV3_RestartOnDatadir(t *testing.T, rc runCfg) { ac := agg.BeginFilesRo() defer ac.Close() - domains, err := NewSharedDomains(WrapTxWithCtx(tx, ac), log.New()) + domains, err := NewSharedDomains(WrapTxWithCtx(tx, ac), db, log.New()) require.NoError(t, err) defer domains.Close() @@ -394,7 +395,7 @@ func aggregatorV3_RestartOnDatadir(t *testing.T, rc runCfg) { startTx := anotherAgg.EndTxNumMinimax() ac2 := anotherAgg.BeginFilesRo() defer ac2.Close() - dom2, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac2), log.New()) + dom2, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac2), db, log.New()) require.NoError(t, err) defer dom2.Close() @@ -460,7 +461,7 @@ func TestAggregatorV3_PruneSmallBatches(t *testing.T) { ac := agg.BeginFilesRo() defer ac.Close() - domains, err := NewSharedDomains(WrapTxWithCtx(tx, ac), log.New()) + domains, err := NewSharedDomains(WrapTxWithCtx(tx, ac), db, log.New()) require.NoError(t, err) defer domains.Close() @@ -792,7 +793,7 @@ func TestAggregatorV3_RestartOnFiles(t *testing.T) { }() ac := agg.BeginFilesRo() defer ac.Close() - domains, err := NewSharedDomains(WrapTxWithCtx(tx, ac), log.New()) + domains, err := NewSharedDomains(WrapTxWithCtx(tx, ac), db, log.New()) require.NoError(t, err) defer domains.Close() @@ -864,7 +865,7 @@ func TestAggregatorV3_RestartOnFiles(t *testing.T) { ac = newAgg.BeginFilesRo() defer ac.Close() - newDoms, err := NewSharedDomains(WrapTxWithCtx(newTx, ac), log.New()) + newDoms, err := NewSharedDomains(WrapTxWithCtx(newTx, ac), newDb, log.New()) require.NoError(t, err) defer newDoms.Close() @@ -922,7 +923,7 @@ func TestAggregatorV3_ReplaceCommittedKeys(t *testing.T) { ac := agg.BeginFilesRo() defer ac.Close() - domains, err := NewSharedDomains(WrapTxWithCtx(tx, ac), log.New()) + domains, err := NewSharedDomains(WrapTxWithCtx(tx, ac), db, log.New()) require.NoError(t, err) defer domains.Close() @@ -936,7 +937,7 @@ func TestAggregatorV3_ReplaceCommittedKeys(t *testing.T) { tx, err = db.BeginRw(context.Background()) require.NoError(t, err) ac = agg.BeginFilesRo() - domains, err = NewSharedDomains(WrapTxWithCtx(tx, ac), log.New()) + domains, err = NewSharedDomains(WrapTxWithCtx(tx, ac), db, log.New()) require.NoError(t, err) atomic.StoreUint64(&latestCommitTxNum, txn) return nil @@ -1175,7 +1176,7 @@ func TestAggregatorV3_SharedDomains(t *testing.T) { require.NoError(t, err) defer rwTx.Rollback() - domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), db, log.New()) require.NoError(t, err) defer domains.Close() changesetAt5 := &StateChangeSet{} @@ -1227,7 +1228,7 @@ func TestAggregatorV3_SharedDomains(t *testing.T) { ac = agg.BeginFilesRo() defer ac.Close() - domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), db, log.New()) require.NoError(t, err) defer domains.Close() diffs := [kv.DomainLen][]DomainEntryDiff{} @@ -1272,7 +1273,7 @@ func TestAggregatorV3_SharedDomains(t *testing.T) { ac = agg.BeginFilesRo() defer ac.Close() - domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), db, log.New()) require.NoError(t, err) defer domains.Close() for idx := range changesetAt3.Diffs { diff --git a/erigon-lib/state/domain_shared.go b/erigon-lib/state/domain_shared.go index 811b7476753..285ba995b75 100644 --- a/erigon-lib/state/domain_shared.go +++ b/erigon-lib/state/domain_shared.go @@ -109,14 +109,17 @@ type HasAgg interface { Agg() any } -func NewSharedDomains(tx kv.Tx, logger log.Logger) (*SharedDomains, error) { +func NewSharedDomains(tx kv.Tx, db kv.RoDB, logger log.Logger) (*SharedDomains, error) { sd := &SharedDomains{ logger: logger, storage: btree2.NewMap[string, dataWithPrevStep](128), //trace: true, } - sd.SetTx(tx) + + if err := sd.SetTx(tx); err != nil { + return nil, err + } sd.iiWriters = make([]*invertedIndexBufferedWriter, len(sd.aggTx.iis)) for id, ii := range sd.aggTx.iis { @@ -670,21 +673,22 @@ func (sd *SharedDomains) IndexAdd(table kv.InvertedIdx, key []byte) (err error) panic(fmt.Errorf("unknown index %s", table)) } -func (sd *SharedDomains) SetTx(tx kv.Tx) { +func (sd *SharedDomains) SetTx(tx kv.Tx) error { if tx == nil { - panic("tx is nil") + return errors.New("tx is nil") } sd.roTx = tx casted, ok := tx.(HasAggTx) if !ok { - panic(fmt.Errorf("type %T need AggTx method", tx)) + return fmt.Errorf("type %T need AggTx method", tx) } sd.aggTx = casted.AggTx().(*AggregatorRoTx) if sd.aggTx == nil { - panic(errors.New("aggtx is nil")) + return errors.New("aggtx is nil") } + return nil } func (sd *SharedDomains) StepSize() uint64 { return sd.aggTx.StepSize() } @@ -1181,6 +1185,11 @@ func (sdc *SharedDomainsCommitmentContext) Witness(ctx context.Context, expected return hexPatriciaHashed.GenerateWitness(ctx, sdc.updates, nil, expectedRoot, logPrefix) } + parallelHexPatriciaHashed, ok := sdc.Trie().(*commitment.ParallelPatriciaHashed) + if ok { + return parallelHexPatriciaHashed.HexPatriciaHashed.GenerateWitness(ctx, sdc.updates, nil, expectedRoot, logPrefix) + } + return nil, nil, errors.New("shared domains commitment context doesn't have HexPatriciaHashed") } @@ -1260,6 +1269,11 @@ func (sdc *SharedDomainsCommitmentContext) encodeCommitmentState(blockNum, txNum if err != nil { return nil, err } + case *commitment.ParallelPatriciaHashed: + state, err = trie.EncodeCurrentState(nil) + if err != nil { + return nil, err + } default: return nil, fmt.Errorf("unsupported state storing for patricia trie type: %T", sdc.patriciaTrie) } @@ -1337,10 +1351,25 @@ func (sdc *SharedDomainsCommitmentContext) restorePatriciaState(value []byte) (u } fmt.Printf("[commitment] restored state: block=%d txn=%d rootHash=%x\n", cs.blockNum, cs.txNum, rootHash) } - } else { - return 0, 0, errors.New("state storing is only supported hex patricia trie") + return cs.blockNum, cs.txNum, nil + } + + if hext, ok := sdc.patriciaTrie.(*commitment.ParallelPatriciaHashed); ok { + if err := hext.SetState(cs.trieState); err != nil { + return 0, 0, fmt.Errorf("failed restore state : %w", err) + } + sdc.justRestored.Store(true) // to prevent double reset + if sdc.sharedDomains.trace { + rootHash, err := hext.RootHash() + if err != nil { + return 0, 0, fmt.Errorf("failed to get root hash after state restore: %w", err) + } + fmt.Printf("[commitment] restored state: block=%d txn=%d rootHash=%x\n", cs.blockNum, cs.txNum, rootHash) + } + return cs.blockNum, cs.txNum, nil } - return cs.blockNum, cs.txNum, nil + + return 0, 0, errors.New("state storing is only supported hex patricia trie or parallel hex patricia trie") } func toStringZeroCopy(v []byte) string { return unsafe.String(&v[0], len(v)) } diff --git a/erigon-lib/state/domain_shared_bench_test.go b/erigon-lib/state/domain_shared_bench_test.go index f5157b4c17a..7358a169ed9 100644 --- a/erigon-lib/state/domain_shared_bench_test.go +++ b/erigon-lib/state/domain_shared_bench_test.go @@ -40,7 +40,7 @@ func Benchmark_SharedDomains_GetLatest(t *testing.B) { ac := agg.BeginFilesRo() defer ac.Close() - domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), db, log.New()) require.NoError(t, err) defer domains.Close() maxTx := stepSize * 258 @@ -127,7 +127,7 @@ func BenchmarkSharedDomains_ComputeCommitment(b *testing.B) { ac := agg.BeginFilesRo() defer ac.Close() - domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), db, log.New()) require.NoError(b, err) defer domains.Close() diff --git a/erigon-lib/state/domain_shared_test.go b/erigon-lib/state/domain_shared_test.go index 7fa9ea86e3a..761c87c359f 100644 --- a/erigon-lib/state/domain_shared_test.go +++ b/erigon-lib/state/domain_shared_test.go @@ -20,11 +20,12 @@ import ( "context" "encoding/binary" "fmt" - "github.com/erigontech/erigon-lib/common" - accounts3 "github.com/erigontech/erigon-lib/types/accounts" "testing" "time" + "github.com/erigontech/erigon-lib/common" + accounts3 "github.com/erigontech/erigon-lib/types/accounts" + "github.com/holiman/uint256" "github.com/stretchr/testify/require" @@ -48,7 +49,7 @@ func TestSharedDomain_CommitmentKeyReplacement(t *testing.T) { ac := agg.BeginFilesRo() defer ac.Close() - domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), db, log.New()) require.NoError(t, err) defer domains.Close() @@ -93,7 +94,7 @@ func TestSharedDomain_CommitmentKeyReplacement(t *testing.T) { defer rwTx.Rollback() // 4. restart on same (replaced keys) files - domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), db, log.New()) require.NoError(t, err) defer domains.Close() @@ -123,7 +124,7 @@ func TestSharedDomain_Unwind(t *testing.T) { ac := agg.BeginFilesRo() defer ac.Close() - domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), db, log.New()) require.NoError(t, err) defer domains.Close() @@ -145,7 +146,7 @@ Loop: ac = agg.BeginFilesRo() defer ac.Close() - domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), db, log.New()) require.NoError(t, err) defer domains.Close() @@ -244,7 +245,7 @@ func TestSharedDomain_IteratePrefix(t *testing.T) { ac = agg.BeginFilesRo() defer ac.Close() wrwTx := WrapTxWithCtx(rwTx, ac) - domains, err := NewSharedDomains(wrwTx, log.New()) + domains, err := NewSharedDomains(wrwTx, db, log.New()) require.NoError(err) defer domains.Close() @@ -274,7 +275,7 @@ func TestSharedDomain_IteratePrefix(t *testing.T) { require.NoError(err) domains.Close() - domains, err = NewSharedDomains(wrwTx, log.New()) + domains, err = NewSharedDomains(wrwTx, db, log.New()) require.NoError(err) defer domains.Close() require.Equal(int(stepSize), iterCount(domains)) @@ -282,7 +283,7 @@ func TestSharedDomain_IteratePrefix(t *testing.T) { { // delete marker is in RAM require.NoError(domains.Flush(ctx, rwTx)) domains.Close() - domains, err = NewSharedDomains(wrwTx, log.New()) + domains, err = NewSharedDomains(wrwTx, db, log.New()) require.NoError(err) defer domains.Close() require.Equal(int(stepSize), iterCount(domains)) @@ -312,7 +313,7 @@ func TestSharedDomain_IteratePrefix(t *testing.T) { require.NoError(err) domains.Close() - domains, err = NewSharedDomains(wrwTx, log.New()) + domains, err = NewSharedDomains(wrwTx, db, log.New()) require.NoError(err) defer domains.Close() require.Equal(int(stepSize*2+2-2), iterCount(domains)) @@ -337,7 +338,7 @@ func TestSharedDomain_IteratePrefix(t *testing.T) { require.NoError(err) wrwTx = WrapTxWithCtx(rwTx, ac) - domains, err = NewSharedDomains(wrwTx, log.New()) + domains, err = NewSharedDomains(wrwTx, db, log.New()) require.NoError(err) defer domains.Close() require.Equal(int(stepSize*2+2-2), iterCount(domains)) @@ -346,7 +347,7 @@ func TestSharedDomain_IteratePrefix(t *testing.T) { { // delete/update more keys in RAM require.NoError(domains.Flush(ctx, rwTx)) domains.Close() - domains, err = NewSharedDomains(wrwTx, log.New()) + domains, err = NewSharedDomains(wrwTx, db, log.New()) require.NoError(err) defer domains.Close() @@ -366,7 +367,7 @@ func TestSharedDomain_IteratePrefix(t *testing.T) { require.NoError(err) domains.Close() - domains, err = NewSharedDomains(wrwTx, log.New()) + domains, err = NewSharedDomains(wrwTx, db, log.New()) require.NoError(err) defer domains.Close() require.Equal(int(stepSize*2+2-3), iterCount(domains)) @@ -376,7 +377,7 @@ func TestSharedDomain_IteratePrefix(t *testing.T) { require.NoError(err) domains.Close() - domains, err = NewSharedDomains(wrwTx, log.New()) + domains, err = NewSharedDomains(wrwTx, db, log.New()) require.NoError(err) defer domains.Close() domains.SetTxNum(domains.TxNum() + 1) @@ -403,14 +404,14 @@ func TestSharedDomain_StorageIter(t *testing.T) { defer ac.Close() wtxRw := WrapTxWithCtx(rwTx, ac) - domains, err := NewSharedDomains(wtxRw, log.New()) + domains, err := NewSharedDomains(wtxRw, db, log.New()) require.NoError(t, err) defer domains.Close() maxTx := 3*stepSize + 10 hashes := make([][]byte, maxTx) - domains, err = NewSharedDomains(wtxRw, log.New()) + domains, err = NewSharedDomains(wtxRw, db, log.New()) require.NoError(t, err) defer domains.Close() @@ -488,7 +489,7 @@ func TestSharedDomain_StorageIter(t *testing.T) { rwTx, err = db.BeginRw(ctx) require.NoError(t, err) - domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), db, log.New()) require.NoError(t, err) defer domains.Close() diff --git a/erigon-lib/state/squeeze.go b/erigon-lib/state/squeeze.go index 72f6595c6c9..91a2d0a5b15 100644 --- a/erigon-lib/state/squeeze.go +++ b/erigon-lib/state/squeeze.go @@ -457,7 +457,7 @@ func (a *Aggregator) RebuildCommitmentFiles(ctx context.Context, rwDb kv.RwDB, t } defer rwTx.Rollback() - domains, err = NewSharedDomains(rwTx, log.New()) + domains, err = NewSharedDomains(rwTx, rwDb, log.New()) if err != nil { return nil, err } @@ -472,7 +472,7 @@ func (a *Aggregator) RebuildCommitmentFiles(ctx context.Context, rwDb kv.RwDB, t // case when we do testing and temporal db with aggtx is not available ac = a.BeginFilesRo() - domains, err = NewSharedDomains(wrapTxWithCtxForTest(roTx, ac), log.New()) + domains, err = NewSharedDomains(wrapTxWithCtxForTest(roTx, ac), a.db, log.New()) if err != nil { ac.Close() return nil, err diff --git a/erigon-lib/state/squeeze_test.go b/erigon-lib/state/squeeze_test.go index e5529f378d4..fad08c1a807 100644 --- a/erigon-lib/state/squeeze_test.go +++ b/erigon-lib/state/squeeze_test.go @@ -2,11 +2,12 @@ package state import ( "context" - "github.com/erigontech/erigon-lib/common" - accounts3 "github.com/erigontech/erigon-lib/types/accounts" "math" "testing" + "github.com/erigontech/erigon-lib/common" + accounts3 "github.com/erigontech/erigon-lib/types/accounts" + "github.com/erigontech/erigon-lib/commitment" "github.com/erigontech/erigon-lib/common/length" "github.com/erigontech/erigon-lib/kv" @@ -37,7 +38,7 @@ func testDbAggregatorWithFiles(tb testing.TB, cfg *testAggConfig) (kv.RwDB, *Agg require.NoError(tb, err) defer rwTx.Rollback() - domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), db, log.New()) require.NoError(tb, err) defer domains.Close() @@ -87,7 +88,6 @@ func TestAggregator_SqueezeCommitment(t *testing.T) { cfgd := &testAggConfig{stepSize: 32, disableCommitmentBranchTransform: true} db, agg := testDbAggregatorWithFiles(t, cfgd) defer db.Close() - ac := agg.BeginFilesRo() defer ac.Close() @@ -95,10 +95,9 @@ func TestAggregator_SqueezeCommitment(t *testing.T) { require.NoError(t, err) defer rwTx.Rollback() - domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err := NewSharedDomains(WrapTxWithCtx(rwTx, ac), db, log.New()) require.NoError(t, err) defer domains.Close() - // get latest commited root latestRoot, err := domains.ComputeCommitment(context.Background(), false, domains.BlockNum(), "") require.NoError(t, err) @@ -117,7 +116,7 @@ func TestAggregator_SqueezeCommitment(t *testing.T) { ac = agg.BeginFilesRo() defer ac.Close() - domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), log.New()) + domains, err = NewSharedDomains(WrapTxWithCtx(rwTx, ac), db, log.New()) require.NoError(t, err) // collect account keys to trigger commitment diff --git a/eth/stagedsync/default_stages.go b/eth/stagedsync/default_stages.go index a71ba0ff041..cdec7410682 100644 --- a/eth/stagedsync/default_stages.go +++ b/eth/stagedsync/default_stages.go @@ -28,6 +28,7 @@ import ( ) func DefaultStages(ctx context.Context, + db kv.RwDB, snapshots SnapshotsCfg, headers HeadersCfg, borHeimdallCfg BorHeimdallCfg, @@ -62,7 +63,7 @@ func DefaultStages(ctx context.Context, if badBlockUnwind { return nil } - return SpawnStageHeaders(s, u, ctx, txc.Tx, headers, test, logger) + return SpawnStageHeaders(s, u, ctx, txc.Tx, db, headers, test, logger) }, Unwind: func(u *UnwindState, s *StageState, txc wrap.TxContainer, logger log.Logger) error { return HeadersUnwind(ctx, u, s, txc.Tx, headers, test) @@ -131,10 +132,10 @@ func DefaultStages(ctx context.Context, Description: "Execute blocks w/o hash checks", Disabled: dbg.StagesOnlyBlocks, Forward: func(badBlockUnwind bool, s *StageState, u Unwinder, txc wrap.TxContainer, logger log.Logger) error { - return SpawnExecuteBlocksStage(s, u, txc, 0, ctx, exec, logger) + return SpawnExecuteBlocksStage(s, u, txc, db, 0, ctx, exec, logger) }, Unwind: func(u *UnwindState, s *StageState, txc wrap.TxContainer, logger log.Logger) error { - return UnwindExecutionStage(u, s, txc, ctx, exec, logger) + return UnwindExecutionStage(u, s, txc, db, ctx, exec, logger) }, Prune: func(p *PruneState, tx kv.RwTx, logger log.Logger) error { return PruneExecutionStage(p, tx, exec, ctx, logger) @@ -187,7 +188,7 @@ func DefaultStages(ctx context.Context, } } -func PipelineStages(ctx context.Context, snapshots SnapshotsCfg, blockHashCfg BlockHashesCfg, senders SendersCfg, exec ExecuteBlockCfg, txLookup TxLookupCfg, finish FinishCfg, test bool) []*Stage { +func PipelineStages(ctx context.Context, db kv.RwDB, snapshots SnapshotsCfg, blockHashCfg BlockHashesCfg, senders SendersCfg, exec ExecuteBlockCfg, txLookup TxLookupCfg, finish FinishCfg, test bool) []*Stage { return []*Stage{ { ID: stages.Snapshots, @@ -235,10 +236,10 @@ func PipelineStages(ctx context.Context, snapshots SnapshotsCfg, blockHashCfg Bl ID: stages.Execution, Description: "Execute blocks w/o hash checks", Forward: func(badBlockUnwind bool, s *StageState, u Unwinder, txc wrap.TxContainer, logger log.Logger) error { - return SpawnExecuteBlocksStage(s, u, txc, 0, ctx, exec, logger) + return SpawnExecuteBlocksStage(s, u, txc, db, 0, ctx, exec, logger) }, Unwind: func(u *UnwindState, s *StageState, txc wrap.TxContainer, logger log.Logger) error { - return UnwindExecutionStage(u, s, txc, ctx, exec, logger) + return UnwindExecutionStage(u, s, txc, db, ctx, exec, logger) }, Prune: func(p *PruneState, tx kv.RwTx, logger log.Logger) error { return PruneExecutionStage(p, tx, exec, ctx, logger) @@ -275,7 +276,7 @@ func PipelineStages(ctx context.Context, snapshots SnapshotsCfg, blockHashCfg Bl } // UploaderPipelineStages when uploading - potentially from zero we need to include headers and bodies stages otherwise we won't recover the POW portion of the chain -func UploaderPipelineStages(ctx context.Context, snapshots SnapshotsCfg, headers HeadersCfg, blockHashCfg BlockHashesCfg, senders SendersCfg, bodies BodiesCfg, exec ExecuteBlockCfg, txLookup TxLookupCfg, finish FinishCfg, test bool) []*Stage { +func UploaderPipelineStages(ctx context.Context, db kv.RwDB, snapshots SnapshotsCfg, headers HeadersCfg, blockHashCfg BlockHashesCfg, senders SendersCfg, bodies BodiesCfg, exec ExecuteBlockCfg, txLookup TxLookupCfg, finish FinishCfg, test bool) []*Stage { return []*Stage{ { ID: stages.Snapshots, @@ -300,7 +301,7 @@ func UploaderPipelineStages(ctx context.Context, snapshots SnapshotsCfg, headers if badBlockUnwind { return nil } - return SpawnStageHeaders(s, u, ctx, txc.Tx, headers, test, logger) + return SpawnStageHeaders(s, u, ctx, txc.Tx, db, headers, test, logger) }, Unwind: func(u *UnwindState, s *StageState, txc wrap.TxContainer, logger log.Logger) error { return HeadersUnwind(ctx, u, s, txc.Tx, headers, test) @@ -352,10 +353,10 @@ func UploaderPipelineStages(ctx context.Context, snapshots SnapshotsCfg, headers ID: stages.Execution, Description: "Execute blocks w/o hash checks", Forward: func(badBlockUnwind bool, s *StageState, u Unwinder, txc wrap.TxContainer, logger log.Logger) error { - return SpawnExecuteBlocksStage(s, u, txc, 0, ctx, exec, logger) + return SpawnExecuteBlocksStage(s, u, txc, db, 0, ctx, exec, logger) }, Unwind: func(u *UnwindState, s *StageState, txc wrap.TxContainer, logger log.Logger) error { - return UnwindExecutionStage(u, s, txc, ctx, exec, logger) + return UnwindExecutionStage(u, s, txc, db, ctx, exec, logger) }, Prune: func(p *PruneState, tx kv.RwTx, logger log.Logger) error { return PruneExecutionStage(p, tx, exec, ctx, logger) @@ -391,7 +392,7 @@ func UploaderPipelineStages(ctx context.Context, snapshots SnapshotsCfg, headers } // StateStages are all stages necessary for basic unwind and stage computation, it is primarily used to process side forks and memory execution. -func StateStages(ctx context.Context, headers HeadersCfg, bodies BodiesCfg, blockHashCfg BlockHashesCfg, senders SendersCfg, exec ExecuteBlockCfg) []*Stage { +func StateStages(ctx context.Context, headers HeadersCfg, db kv.RwDB, bodies BodiesCfg, blockHashCfg BlockHashesCfg, senders SendersCfg, exec ExecuteBlockCfg) []*Stage { return []*Stage{ { ID: stages.Headers, @@ -437,10 +438,10 @@ func StateStages(ctx context.Context, headers HeadersCfg, bodies BodiesCfg, bloc ID: stages.Execution, Description: "Execute blocks w/o hash checks", Forward: func(badBlockUnwind bool, s *StageState, u Unwinder, txc wrap.TxContainer, logger log.Logger) error { - return SpawnExecuteBlocksStage(s, u, txc, 0, ctx, exec, logger) + return SpawnExecuteBlocksStage(s, u, txc, db, 0, ctx, exec, logger) }, Unwind: func(u *UnwindState, s *StageState, txc wrap.TxContainer, logger log.Logger) error { - return UnwindExecutionStage(u, s, txc, ctx, exec, logger) + return UnwindExecutionStage(u, s, txc, db, ctx, exec, logger) }, }, } @@ -448,6 +449,7 @@ func StateStages(ctx context.Context, headers HeadersCfg, bodies BodiesCfg, bloc func PolygonSyncStages( ctx context.Context, + db kv.RwDB, snapshots SnapshotsCfg, polygonSyncStageCfg PolygonSyncStageCfg, senders SendersCfg, @@ -503,10 +505,10 @@ func PolygonSyncStages( Description: "Execute blocks w/o hash checks", Disabled: dbg.StagesOnlyBlocks, Forward: func(badBlockUnwind bool, s *StageState, u Unwinder, txc wrap.TxContainer, logger log.Logger) error { - return SpawnExecuteBlocksStage(s, u, txc, 0, ctx, exec, logger) + return SpawnExecuteBlocksStage(s, u, txc, db, 0, ctx, exec, logger) }, Unwind: func(u *UnwindState, s *StageState, txc wrap.TxContainer, logger log.Logger) error { - return UnwindExecutionStage(u, s, txc, ctx, exec, logger) + return UnwindExecutionStage(u, s, txc, db, ctx, exec, logger) }, Prune: func(p *PruneState, tx kv.RwTx, logger log.Logger) error { return PruneExecutionStage(p, tx, exec, ctx, logger) diff --git a/eth/stagedsync/exec3.go b/eth/stagedsync/exec3.go index 98bbe1170f0..3fcf3f37e05 100644 --- a/eth/stagedsync/exec3.go +++ b/eth/stagedsync/exec3.go @@ -199,7 +199,7 @@ func nothingToExec(applyTx kv.Tx, txNumsReader rawdbv3.TxNumsReader, inputTxNum } func ExecV3(ctx context.Context, - execStage *StageState, u Unwinder, workerCount int, cfg ExecuteBlockCfg, txc wrap.TxContainer, + execStage *StageState, u Unwinder, workerCount int, cfg ExecuteBlockCfg, txc wrap.TxContainer, db kv.RwDB, parallel bool, //nolint maxBlockNum uint64, logger log.Logger, @@ -255,7 +255,7 @@ func ExecV3(ctx context.Context, doms = txc.Doms } else { var err error - doms, err = state2.NewSharedDomains(applyTx, log.New()) + doms, err = state2.NewSharedDomains(applyTx, db, log.New()) // if we are behind the commitment, we can't execute anything // this can heppen if progress in domain is higher than progress in blocks if errors.Is(err, state2.ErrBehindCommitment) { diff --git a/eth/stagedsync/exec3_serial.go b/eth/stagedsync/exec3_serial.go index 6d2214be0b2..e3f2959c3f9 100644 --- a/eth/stagedsync/exec3_serial.go +++ b/eth/stagedsync/exec3_serial.go @@ -170,7 +170,7 @@ func (se *serialExecutor) commit(ctx context.Context, txNum uint64, blockNum uin return t2, err } } - se.doms, err = state2.NewSharedDomains(se.applyTx, se.logger) + se.doms, err = state2.NewSharedDomains(se.applyTx, se.cfg.db, se.logger) if err != nil { return t2, err } diff --git a/eth/stagedsync/stage_custom_trace.go b/eth/stagedsync/stage_custom_trace.go index a53dc54ab09..41d6d241207 100644 --- a/eth/stagedsync/stage_custom_trace.go +++ b/eth/stagedsync/stage_custom_trace.go @@ -134,7 +134,7 @@ func customTraceBatchProduce(ctx context.Context, cfg *exec3.ExecArgs, db kv.Tem } defer tx.Rollback() - doms, err := state2.NewSharedDomains(tx, logger) + doms, err := state2.NewSharedDomains(tx, db, logger) if err != nil { return err } diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index 9c0c9fda10f..2f8e460c774 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -139,7 +139,7 @@ func StageExecuteBlocksCfg( // ================ Erigon3 ================ -func ExecBlockV3(s *StageState, u Unwinder, txc wrap.TxContainer, toBlock uint64, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool, logger log.Logger, isMining bool) (err error) { +func ExecBlockV3(s *StageState, u Unwinder, txc wrap.TxContainer, db kv.RwDB, toBlock uint64, ctx context.Context, cfg ExecuteBlockCfg, initialCycle bool, logger log.Logger, isMining bool) (err error) { workersCount := cfg.syncCfg.ExecWorkerCount if !initialCycle { workersCount = 1 @@ -159,7 +159,7 @@ func ExecBlockV3(s *StageState, u Unwinder, txc wrap.TxContainer, toBlock uint64 } parallel := txc.Tx == nil - if err := ExecV3(ctx, s, u, workersCount, cfg, txc, parallel, to, logger, initialCycle, isMining); err != nil { + if err := ExecV3(ctx, s, u, workersCount, cfg, txc, db, parallel, to, logger, initialCycle, isMining); err != nil { return err } return nil @@ -167,10 +167,10 @@ func ExecBlockV3(s *StageState, u Unwinder, txc wrap.TxContainer, toBlock uint64 var ErrTooDeepUnwind = errors.New("too deep unwind") -func unwindExec3(u *UnwindState, s *StageState, txc wrap.TxContainer, ctx context.Context, br services.FullBlockReader, accumulator *shards.Accumulator, logger log.Logger) (err error) { +func unwindExec3(u *UnwindState, s *StageState, txc wrap.TxContainer, ctx context.Context, db kv.RoDB, br services.FullBlockReader, accumulator *shards.Accumulator, logger log.Logger) (err error) { var domains *libstate.SharedDomains if txc.Doms == nil { - domains, err = libstate.NewSharedDomains(txc.Tx, logger) + domains, err = libstate.NewSharedDomains(txc.Tx, db, logger) if err != nil { return err } @@ -249,11 +249,11 @@ func BorHeimdallStageProgress(tx kv.Tx, cfg BorHeimdallCfg) (prevStageProgress u // ================ Erigon3 End ================ -func SpawnExecuteBlocksStage(s *StageState, u Unwinder, txc wrap.TxContainer, toBlock uint64, ctx context.Context, cfg ExecuteBlockCfg, logger log.Logger) (err error) { +func SpawnExecuteBlocksStage(s *StageState, u Unwinder, txc wrap.TxContainer, db kv.RwDB, toBlock uint64, ctx context.Context, cfg ExecuteBlockCfg, logger log.Logger) (err error) { if dbg.StagesOnlyBlocks { return nil } - if err = ExecBlockV3(s, u, txc, toBlock, ctx, cfg, s.CurrentSyncCycle.IsInitialCycle, logger, false); err != nil { + if err = ExecBlockV3(s, u, txc, db, toBlock, ctx, cfg, s.CurrentSyncCycle.IsInitialCycle, logger, false); err != nil { return err } return nil @@ -320,7 +320,7 @@ func blocksReadAheadFunc(ctx context.Context, tx kv.Tx, cfg *ExecuteBlockCfg, bl return nil } -func UnwindExecutionStage(u *UnwindState, s *StageState, txc wrap.TxContainer, ctx context.Context, cfg ExecuteBlockCfg, logger log.Logger) (err error) { +func UnwindExecutionStage(u *UnwindState, s *StageState, txc wrap.TxContainer, db kv.RoDB, ctx context.Context, cfg ExecuteBlockCfg, logger log.Logger) (err error) { //fmt.Printf("unwind: %d -> %d\n", u.CurrentBlockNumber, u.UnwindPoint) if u.UnwindPoint >= s.BlockNumber { return nil @@ -344,7 +344,7 @@ func UnwindExecutionStage(u *UnwindState, s *StageState, txc wrap.TxContainer, c return fmt.Errorf("%w: %d < %d", ErrTooDeepUnwind, u.UnwindPoint, unwindToLimit) } - if err = unwindExecutionStage(u, s, txc, ctx, cfg, logger); err != nil { + if err = unwindExecutionStage(u, s, txc, db, ctx, cfg, logger); err != nil { return err } if err = u.Done(txc.Tx); err != nil { @@ -360,7 +360,7 @@ func UnwindExecutionStage(u *UnwindState, s *StageState, txc wrap.TxContainer, c return nil } -func unwindExecutionStage(u *UnwindState, s *StageState, txc wrap.TxContainer, ctx context.Context, cfg ExecuteBlockCfg, logger log.Logger) error { +func unwindExecutionStage(u *UnwindState, s *StageState, txc wrap.TxContainer, db kv.RoDB, ctx context.Context, cfg ExecuteBlockCfg, logger log.Logger) error { var accumulator *shards.Accumulator if cfg.stateStream && s.BlockNumber-u.UnwindPoint < stateStreamLimit { accumulator = cfg.notifications.Accumulator @@ -386,7 +386,7 @@ func unwindExecutionStage(u *UnwindState, s *StageState, txc wrap.TxContainer, c accumulator.StartChange(header, txs, true) } - return unwindExec3(u, s, txc, ctx, cfg.blockReader, accumulator, logger) + return unwindExec3(u, s, txc, ctx, db, cfg.blockReader, accumulator, logger) } func PruneExecutionStage(s *PruneState, tx kv.RwTx, cfg ExecuteBlockCfg, ctx context.Context, logger log.Logger) (err error) { diff --git a/eth/stagedsync/stage_execute_test.go b/eth/stagedsync/stage_execute_test.go index f2f58c9d098..4119ba98c5e 100644 --- a/eth/stagedsync/stage_execute_test.go +++ b/eth/stagedsync/stage_execute_test.go @@ -26,8 +26,8 @@ import ( "github.com/erigontech/erigon/params" ) -func apply(tx kv.RwTx, logger log.Logger) (beforeBlock, afterBlock testGenHook, w state.StateWriter) { - domains, err := libstate.NewSharedDomains(tx, logger) +func apply(tx kv.RwTx, db kv.RoDB, logger log.Logger) (beforeBlock, afterBlock testGenHook, w state.StateWriter) { + domains, err := libstate.NewSharedDomains(tx, db, logger) if err != nil { panic(err) } diff --git a/eth/stagedsync/stage_headers.go b/eth/stagedsync/stage_headers.go index 3ada10f2c3b..2438552b9b3 100644 --- a/eth/stagedsync/stage_headers.go +++ b/eth/stagedsync/stage_headers.go @@ -104,7 +104,7 @@ func StageHeadersCfg( } } -func SpawnStageHeaders(s *StageState, u Unwinder, ctx context.Context, tx kv.RwTx, cfg HeadersCfg, test bool, logger log.Logger) error { +func SpawnStageHeaders(s *StageState, u Unwinder, ctx context.Context, tx kv.RwTx, db kv.RwDB, cfg HeadersCfg, test bool, logger log.Logger) error { useExternalTx := tx != nil if !useExternalTx { var err error @@ -120,12 +120,12 @@ func SpawnStageHeaders(s *StageState, u Unwinder, ctx context.Context, tx kv.RwT } } cfg.hd.Progress() - return HeadersPOW(s, u, ctx, tx, cfg, test, useExternalTx, logger) + return HeadersPOW(s, u, ctx, tx, db, cfg, test, useExternalTx, logger) } // HeadersPOW progresses Headers stage for Proof-of-Work headers -func HeadersPOW(s *StageState, u Unwinder, ctx context.Context, tx kv.RwTx, cfg HeadersCfg, test bool, useExternalTx bool, logger log.Logger) error { +func HeadersPOW(s *StageState, u Unwinder, ctx context.Context, tx kv.RwTx, db kv.RwDB, cfg HeadersCfg, test bool, useExternalTx bool, logger log.Logger) error { var err error startTime := time.Now() @@ -313,7 +313,7 @@ Loop: } if headerInserter.Unwind() { unwindTo := headerInserter.UnwindPoint() - doms, err := state.NewSharedDomains(tx, logger) //TODO: if remove this line TestBlockchainHeaderchainReorgConsistency failing + doms, err := state.NewSharedDomains(tx, db, logger) //TODO: if remove this line TestBlockchainHeaderchainReorgConsistency failing if err != nil { return err } diff --git a/eth/stagedsync/stage_mining_exec.go b/eth/stagedsync/stage_mining_exec.go index 70a48bcd221..be41d8427ac 100644 --- a/eth/stagedsync/stage_mining_exec.go +++ b/eth/stagedsync/stage_mining_exec.go @@ -92,7 +92,7 @@ func StageMiningExecCfg( // SpawnMiningExecStage // TODO: // - resubmitAdjustCh - variable is not implemented -func SpawnMiningExecStage(s *StageState, txc wrap.TxContainer, cfg MiningExecCfg, sendersCfg SendersCfg, execCfg ExecuteBlockCfg, ctx context.Context, logger log.Logger, u Unwinder) error { +func SpawnMiningExecStage(s *StageState, txc wrap.TxContainer, db kv.RwDB, cfg MiningExecCfg, sendersCfg SendersCfg, execCfg ExecuteBlockCfg, ctx context.Context, logger log.Logger, u Unwinder) error { cfg.vmConfig.NoReceipts = false chainID, _ := uint256.FromBig(cfg.chainConfig.ChainID) logPrefix := s.LogPrefix() @@ -135,7 +135,7 @@ func SpawnMiningExecStage(s *StageState, txc wrap.TxContainer, cfg MiningExecCfg mb := membatchwithdb.NewMemoryBatch(txc.Tx, cfg.tmpdir, logger) defer mb.Close() - sd, err := state2.NewSharedDomains(mb, logger) + sd, err := state2.NewSharedDomains(mb, db, logger) if err != nil { return err } @@ -232,7 +232,7 @@ func SpawnMiningExecStage(s *StageState, txc wrap.TxContainer, cfg MiningExecCfg // This flag will skip checking the state root execCfg.blockProduction = true execS := &StageState{state: s.state, ID: stages.Execution, BlockNumber: blockHeight - 1} - if err = ExecBlockV3(execS, u, txc, blockHeight, context.Background(), execCfg, false, logger, true); err != nil { + if err = ExecBlockV3(execS, u, txc, db, blockHeight, context.Background(), execCfg, false, logger, true); err != nil { logger.Error("cannot execute block execution", "err", err) return err } diff --git a/eth/stagedsync/stage_witness.go b/eth/stagedsync/stage_witness.go index 2c62ac65e8e..fc841887fa2 100644 --- a/eth/stagedsync/stage_witness.go +++ b/eth/stagedsync/stage_witness.go @@ -95,7 +95,7 @@ func PrepareForWitness(tx kv.TemporalTx, block *types.Block, prevRoot libcommon. } // RewindStagesForWitness rewinds the Execution stage to previous block. -func RewindStagesForWitness(batch *membatchwithdb.MemoryMutation, blockNr, latestBlockNr uint64, cfg *WitnessCfg, regenerateHash bool, ctx context.Context, logger log.Logger) error { +func RewindStagesForWitness(batch *membatchwithdb.MemoryMutation, db kv.RoDB, blockNr, latestBlockNr uint64, cfg *WitnessCfg, regenerateHash bool, ctx context.Context, logger log.Logger) error { // Rewind the Execution stage to previous block unwindState := &UnwindState{ID: stages.Execution, UnwindPoint: blockNr - 1, CurrentBlockNumber: latestBlockNr} stageState := &StageState{ID: stages.Execution, BlockNumber: blockNr} @@ -119,7 +119,7 @@ func RewindStagesForWitness(batch *membatchwithdb.MemoryMutation, blockNr, lates /*stateStream=*/ false, /*badBlockHalt=*/ true, dirs, blockReader, nil, nil, syncCfg, nil) - if err := UnwindExecutionStage(unwindState, stageState, txc, ctx, execCfg, logger); err != nil { + if err := UnwindExecutionStage(unwindState, stageState, txc, db, ctx, execCfg, logger); err != nil { return err } diff --git a/eth/stagedsync/stagebuilder.go b/eth/stagedsync/stagebuilder.go index cbde166e0ea..ded9f502dcf 100644 --- a/eth/stagedsync/stagebuilder.go +++ b/eth/stagedsync/stagebuilder.go @@ -76,7 +76,7 @@ func MiningStages( ID: stages.MiningExecution, Description: "Mining: execute new block from txn pool", Forward: func(badBlockUnwind bool, s *StageState, u Unwinder, txc wrap.TxContainer, logger log.Logger) error { - return SpawnMiningExecStage(s, txc, execCfg, sendersCfg, executeBlockCfg, ctx, logger, nil) + return SpawnMiningExecStage(s, txc, createBlockCfg.db, execCfg, sendersCfg, executeBlockCfg, ctx, logger, nil) }, Unwind: func(u *UnwindState, s *StageState, txc wrap.TxContainer, logger log.Logger) error { return nil diff --git a/eth/stagedsync/stagedsynctest/harness.go b/eth/stagedsync/stagedsynctest/harness.go index 69865589fe6..48a2f6537ee 100644 --- a/eth/stagedsync/stagedsynctest/harness.go +++ b/eth/stagedsync/stagedsynctest/harness.go @@ -86,7 +86,7 @@ func InitHarness(ctx context.Context, t *testing.T, cfg HarnessCfg) Harness { false, nil, ) - stateSyncStages := stagedsync.DefaultStages(ctx, stagedsync.SnapshotsCfg{}, stagedsync.HeadersCfg{}, bhCfg, stagedsync.BlockHashesCfg{}, stagedsync.BodiesCfg{}, stagedsync.SendersCfg{}, stagedsync.ExecuteBlockCfg{}, stagedsync.TxLookupCfg{}, stagedsync.FinishCfg{}, true) + stateSyncStages := stagedsync.DefaultStages(ctx, m.DB, stagedsync.SnapshotsCfg{}, stagedsync.HeadersCfg{}, bhCfg, stagedsync.BlockHashesCfg{}, stagedsync.BodiesCfg{}, stagedsync.SendersCfg{}, stagedsync.ExecuteBlockCfg{}, stagedsync.TxLookupCfg{}, stagedsync.FinishCfg{}, true) stateSync := stagedsync.New( ethconfig.Defaults.Sync, stateSyncStages, diff --git a/eth/tracers/internal/tracetest/calltrace_test.go b/eth/tracers/internal/tracetest/calltrace_test.go index 45f84822c10..05e0edadf76 100644 --- a/eth/tracers/internal/tracetest/calltrace_test.go +++ b/eth/tracers/internal/tracetest/calltrace_test.go @@ -153,7 +153,7 @@ func testCallTracer(tracerName string, dirPath string, t *testing.T) { dbTx, err := m.DB.BeginRw(m.Ctx) require.NoError(t, err) defer dbTx.Rollback() - statedb, err := tests.MakePreState(rules, dbTx, test.Genesis.Alloc, uint64(test.Context.Number)) + statedb, err := tests.MakePreState(rules, dbTx, m.DB, test.Genesis.Alloc, uint64(test.Context.Number)) require.NoError(t, err) tracer, err := tracers.New(tracerName, new(tracers.Context), test.TracerConfig) if err != nil { @@ -262,7 +262,7 @@ func benchTracer(b *testing.B, tracerName string, test *callTracerTest) { dbTx, err := m.DB.BeginRw(m.Ctx) require.NoError(b, err) defer dbTx.Rollback() - statedb, _ := tests.MakePreState(rules, dbTx, test.Genesis.Alloc, uint64(test.Context.Number)) + statedb, _ := tests.MakePreState(rules, dbTx, m.DB, test.Genesis.Alloc, uint64(test.Context.Number)) b.ReportAllocs() b.ResetTimer() @@ -339,7 +339,7 @@ func TestZeroValueToNotExitCall(t *testing.T) { require.NoError(t, err) defer dbTx.Rollback() - statedb, _ := tests.MakePreState(rules, dbTx, alloc, context.BlockNumber) + statedb, _ := tests.MakePreState(rules, dbTx, m.DB, alloc, context.BlockNumber) // Create the tracer, the EVM environment and run it tracer, err := tracers.New("callTracer", nil, nil) if err != nil { diff --git a/eth/tracers/internal/tracetest/prestate_test.go b/eth/tracers/internal/tracetest/prestate_test.go index 7598f0a3261..ed6a6d4322d 100644 --- a/eth/tracers/internal/tracetest/prestate_test.go +++ b/eth/tracers/internal/tracetest/prestate_test.go @@ -118,7 +118,7 @@ func testPrestateTracer(tracerName string, dirPath string, t *testing.T) { dbTx, err := m.DB.BeginRw(m.Ctx) require.NoError(t, err) defer dbTx.Rollback() - statedb, err := tests.MakePreState(rules, dbTx, test.Genesis.Alloc, context.BlockNumber) + statedb, err := tests.MakePreState(rules, dbTx, m.DB, test.Genesis.Alloc, context.BlockNumber) require.NoError(t, err) tracer, err := tracers.New(tracerName, new(tracers.Context), test.TracerConfig) if err != nil { diff --git a/eth/tracers/tracers_test.go b/eth/tracers/tracers_test.go index 5b33f6b736b..156ce93f9a8 100644 --- a/eth/tracers/tracers_test.go +++ b/eth/tracers/tracers_test.go @@ -105,7 +105,7 @@ func TestPrestateTracerCreate2(t *testing.T) { require.NoError(t, err) defer tx.Rollback() rules := params.AllProtocolChanges.Rules(context.BlockNumber, context.Time) - statedb, _ := tests.MakePreState(rules, tx, alloc, context.BlockNumber) + statedb, _ := tests.MakePreState(rules, tx, m.DB, alloc, context.BlockNumber) // Create the tracer, the EVM environment and run it tracer, err := tracers.New("prestateTracer", new(tracers.Context), json.RawMessage("{}")) diff --git a/tests/state_test.go b/tests/state_test.go index 9b308a99a57..74cd089a560 100644 --- a/tests/state_test.go +++ b/tests/state_test.go @@ -71,7 +71,7 @@ func TestState(t *testing.T) { t.Fatal(err) } defer tx.Rollback() - _, _, err = test.Run(tx, subtest, vmconfig, dirs) + _, _, err = test.Run(tx, db, subtest, vmconfig, dirs) tx.Rollback() if err != nil && len(test.json.Post[subtest.Fork][subtest.Index].ExpectException) > 0 { // Ignore expected errors diff --git a/tests/state_test_util.go b/tests/state_test_util.go index 292f4d16440..863e1f95814 100644 --- a/tests/state_test_util.go +++ b/tests/state_test_util.go @@ -166,8 +166,8 @@ func (t *StateTest) Subtests() []StateSubtest { } // Run executes a specific subtest and verifies the post-state and logs -func (t *StateTest) Run(tx kv.RwTx, subtest StateSubtest, vmconfig vm.Config, dirs datadir.Dirs) (*state.IntraBlockState, libcommon.Hash, error) { - state, root, err := t.RunNoVerify(tx, subtest, vmconfig, dirs) +func (t *StateTest) Run(tx kv.RwTx, db kv.RoDB, subtest StateSubtest, vmconfig vm.Config, dirs datadir.Dirs) (*state.IntraBlockState, libcommon.Hash, error) { + state, root, err := t.RunNoVerify(tx, db, subtest, vmconfig, dirs) if err != nil { return state, types.EmptyRootHash, err } @@ -184,7 +184,7 @@ func (t *StateTest) Run(tx kv.RwTx, subtest StateSubtest, vmconfig vm.Config, di } // RunNoVerify runs a specific subtest and returns the statedb and post-state root -func (t *StateTest) RunNoVerify(tx kv.RwTx, subtest StateSubtest, vmconfig vm.Config, dirs datadir.Dirs) (*state.IntraBlockState, libcommon.Hash, error) { +func (t *StateTest) RunNoVerify(tx kv.RwTx, db kv.RoDB, subtest StateSubtest, vmconfig vm.Config, dirs datadir.Dirs) (*state.IntraBlockState, libcommon.Hash, error) { config, eips, err := GetChainConfig(subtest.Fork) if err != nil { return nil, libcommon.Hash{}, UnsupportedForkError{subtest.Fork} @@ -198,14 +198,14 @@ func (t *StateTest) RunNoVerify(tx kv.RwTx, subtest StateSubtest, vmconfig vm.Co readBlockNr := block.NumberU64() writeBlockNr := readBlockNr + 1 - _, err = MakePreState(&chain.Rules{}, tx, t.json.Pre, readBlockNr) + _, err = MakePreState(&chain.Rules{}, tx, db, t.json.Pre, readBlockNr) if err != nil { return nil, libcommon.Hash{}, UnsupportedForkError{subtest.Fork} } var txc wrap.TxContainer txc.Tx = tx - domains, err := state2.NewSharedDomains(tx, log.New()) + domains, err := state2.NewSharedDomains(tx, db, log.New()) if err != nil { return nil, libcommon.Hash{}, UnsupportedForkError{subtest.Fork} } @@ -288,7 +288,7 @@ func (t *StateTest) RunNoVerify(tx kv.RwTx, subtest StateSubtest, vmconfig vm.Co return statedb, libcommon.BytesToHash(rootBytes), nil } -func MakePreState(rules *chain.Rules, tx kv.RwTx, accounts types.GenesisAlloc, blockNr uint64) (*state.IntraBlockState, error) { +func MakePreState(rules *chain.Rules, tx kv.RwTx, db kv.RoDB, accounts types.GenesisAlloc, blockNr uint64) (*state.IntraBlockState, error) { r := rpchelper.NewLatestStateReader(tx) statedb := state.New(r) statedb.SetTxContext(0) @@ -320,7 +320,7 @@ func MakePreState(rules *chain.Rules, tx kv.RwTx, accounts types.GenesisAlloc, b var txc wrap.TxContainer txc.Tx = tx - domains, err := state2.NewSharedDomains(tx, log.New()) + domains, err := state2.NewSharedDomains(tx, db, log.New()) if err != nil { return nil, err } diff --git a/turbo/engineapi/engine_helpers/fork_validator.go b/turbo/engineapi/engine_helpers/fork_validator.go index 87fd2ddf7ae..805b4787325 100644 --- a/turbo/engineapi/engine_helpers/fork_validator.go +++ b/turbo/engineapi/engine_helpers/fork_validator.go @@ -179,7 +179,7 @@ type HasDiff interface { // if the payload extends the canonical chain, then we stack it in extendingFork without any unwind. // if the payload is a fork then we unwind to the point where the fork meets the canonical chain, and there we check whether it is valid. // if for any reason none of the actions above can be performed due to lack of information, we accept the payload and avoid validation. -func (fv *ForkValidator) ValidatePayload(tx kv.RwTx, header *types.Header, body *types.RawBody, logger log.Logger) (status engine_types.EngineStatus, latestValidHash libcommon.Hash, validationError error, criticalError error) { +func (fv *ForkValidator) ValidatePayload(tx kv.RwTx, db kv.RwDB, header *types.Header, body *types.RawBody, logger log.Logger) (status engine_types.EngineStatus, latestValidHash libcommon.Hash, validationError error, criticalError error) { fv.lock.Lock() defer fv.lock.Unlock() if fv.validatePayload == nil { @@ -264,7 +264,7 @@ func (fv *ForkValidator) ValidatePayload(tx kv.RwTx, header *types.Header, body if fv.sharedDom != nil { fv.sharedDom.Close() } - fv.sharedDom, criticalError = state.NewSharedDomains(tx, logger) + fv.sharedDom, criticalError = state.NewSharedDomains(tx, db, logger) if criticalError != nil { criticalError = fmt.Errorf("failed to create shared domains: %w", criticalError) return diff --git a/turbo/execution/eth1/ethereum_execution.go b/turbo/execution/eth1/ethereum_execution.go index 5fd3d876150..4450e548529 100644 --- a/turbo/execution/eth1/ethereum_execution.go +++ b/turbo/execution/eth1/ethereum_execution.go @@ -249,7 +249,7 @@ func (e *EthereumExecutionModule) ValidateChain(ctx context.Context, req *execut } defer tx.Rollback() - status, lvh, validationError, criticalError := e.forkValidator.ValidatePayload(tx, header, body.RawBody(), e.logger) + status, lvh, validationError, criticalError := e.forkValidator.ValidatePayload(tx, e.db, header, body.RawBody(), e.logger) if criticalError != nil { return nil, criticalError } diff --git a/turbo/execution/eth1/forkchoice.go b/turbo/execution/eth1/forkchoice.go index 6da592bd324..4b86542b9ec 100644 --- a/turbo/execution/eth1/forkchoice.go +++ b/turbo/execution/eth1/forkchoice.go @@ -69,8 +69,8 @@ func sendForkchoiceErrorWithoutWaiting(logger log.Logger, ch chan forkchoiceOutc } } -func isDomainAheadOfBlocks(tx kv.RwTx) bool { - doms, err := state.NewSharedDomains(tx, log.New()) +func isDomainAheadOfBlocks(tx kv.RwTx, db kv.RoDB) bool { + doms, err := state.NewSharedDomains(tx, db, log.New()) if err != nil { return errors.Is(err, state.ErrBehindCommitment) } @@ -395,7 +395,7 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original } } } - if isDomainAheadOfBlocks(tx) { + if isDomainAheadOfBlocks(tx, e.db) { if err := tx.Commit(); err != nil { sendForkchoiceErrorWithoutWaiting(e.logger, outcomeCh, err, false) return diff --git a/turbo/jsonrpc/eth_call.go b/turbo/jsonrpc/eth_call.go index c73b020458e..a3562ae84a2 100644 --- a/turbo/jsonrpc/eth_call.go +++ b/turbo/jsonrpc/eth_call.go @@ -335,7 +335,7 @@ func (api *APIImpl) getProof(ctx context.Context, roTx *kv.Tx, address libcommon return nil, err } - domains, err := libstate.NewSharedDomains(*roTx, log.New()) + domains, err := libstate.NewSharedDomains(*roTx, db, log.New()) if err != nil { return nil, err } @@ -579,7 +579,7 @@ func (api *BaseAPI) getWitness(ctx context.Context, db kv.RoDB, blockNrOrHash rp // Unwind to blockNr cfg := stagedsync.StageWitnessCfg(true, 0, chainConfig, engine, api._blockReader, api.dirs) - err = stagedsync.RewindStagesForWitness(txBatch2, blockNr, latestBlock, &cfg, regenerateHash, ctx, logger) + err = stagedsync.RewindStagesForWitness(txBatch2, db, blockNr, latestBlock, &cfg, regenerateHash, ctx, logger) if err != nil { return nil, err } @@ -589,7 +589,7 @@ func (api *BaseAPI) getWitness(ctx context.Context, db kv.RoDB, blockNrOrHash rp return nil, err } - domains, err := libstate.NewSharedDomains(txBatch2, log.New()) + domains, err := libstate.NewSharedDomains(txBatch2, db, log.New()) if err != nil { return nil, err } diff --git a/turbo/jsonrpc/trace_adhoc_test.go b/turbo/jsonrpc/trace_adhoc_test.go index fe63ba1edaa..61b829189dc 100644 --- a/turbo/jsonrpc/trace_adhoc_test.go +++ b/turbo/jsonrpc/trace_adhoc_test.go @@ -411,7 +411,7 @@ func TestOeTracer(t *testing.T) { require.NoError(t, err) defer dbTx.Rollback() - statedb, _ := tests.MakePreState(rules, dbTx, test.Genesis.Alloc, context.BlockNumber) + statedb, _ := tests.MakePreState(rules, dbTx, m.DB, test.Genesis.Alloc, context.BlockNumber) msg, err := tx.AsMessage(*signer, (*big.Int)(test.Context.BaseFee), rules) require.NoError(t, err) txContext := core.NewEVMTxContext(msg) diff --git a/turbo/stages/mock/mock_sentry.go b/turbo/stages/mock/mock_sentry.go index 4cc66899f9e..cb38ef7dddd 100644 --- a/turbo/stages/mock/mock_sentry.go +++ b/turbo/stages/mock/mock_sentry.go @@ -519,7 +519,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK mock.agg.SetProduceMod(mock.BlockReader.FreezingCfg().ProduceE3) mock.Sync = stagedsync.New( cfg.Sync, - stagedsync.DefaultStages(mock.Ctx, stagedsync.StageSnapshotsCfg(mock.DB, *mock.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, mock.BlockReader, mock.Notifications, false, false, false, nil, prune), stagedsync.StageHeadersCfg(mock.DB, mock.sentriesClient.Hd, mock.sentriesClient.Bd, *mock.ChainConfig, cfg.Sync, sendHeaderRequest, propagateNewBlockHashes, penalize, cfg.BatchSize, false, mock.BlockReader, blockWriter, dirs.Tmp, mock.Notifications), stagedsync.StageBorHeimdallCfg(mock.DB, snapDb, stagedsync.MiningState{}, *mock.ChainConfig, nil, nil, nil, mock.BlockReader, nil, nil, recents, signatures, false, nil), stagedsync.StageBlockHashesCfg(mock.DB, mock.Dirs.Tmp, mock.ChainConfig, blockWriter), stagedsync.StageBodiesCfg(mock.DB, mock.sentriesClient.Bd, sendBodyRequest, penalize, blockPropagator, cfg.Sync.BodyDownloadTimeoutSeconds, *mock.ChainConfig, mock.BlockReader, blockWriter), stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, cfg.Sync, false, dirs.Tmp, prune, mock.BlockReader, mock.sentriesClient.Hd), stagedsync.StageExecuteBlocksCfg( + stagedsync.DefaultStages(mock.Ctx, db, stagedsync.StageSnapshotsCfg(mock.DB, *mock.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, mock.BlockReader, mock.Notifications, false, false, false, nil, prune), stagedsync.StageHeadersCfg(mock.DB, mock.sentriesClient.Hd, mock.sentriesClient.Bd, *mock.ChainConfig, cfg.Sync, sendHeaderRequest, propagateNewBlockHashes, penalize, cfg.BatchSize, false, mock.BlockReader, blockWriter, dirs.Tmp, mock.Notifications), stagedsync.StageBorHeimdallCfg(mock.DB, snapDb, stagedsync.MiningState{}, *mock.ChainConfig, nil, nil, nil, mock.BlockReader, nil, nil, recents, signatures, false, nil), stagedsync.StageBlockHashesCfg(mock.DB, mock.Dirs.Tmp, mock.ChainConfig, blockWriter), stagedsync.StageBodiesCfg(mock.DB, mock.sentriesClient.Bd, sendBodyRequest, penalize, blockPropagator, cfg.Sync.BodyDownloadTimeoutSeconds, *mock.ChainConfig, mock.BlockReader, blockWriter), stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, cfg.Sync, false, dirs.Tmp, prune, mock.BlockReader, mock.sentriesClient.Hd), stagedsync.StageExecuteBlocksCfg( mock.DB, prune, cfg.BatchSize, diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index a956ebbbb13..8f7ae06e068 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -520,7 +520,7 @@ func MiningStep(ctx context.Context, db kv.RwDB, mining *stagedsync.Sync, tmpDir defer mb.Close() txc := wrap.TxContainer{Tx: mb} - sd, err := state.NewSharedDomains(mb, logger) + sd, err := state.NewSharedDomains(mb, db, logger) if err != nil { return err } @@ -692,6 +692,7 @@ func NewDefaultStages(ctx context.Context, runInTestMode := cfg.ImportMode return stagedsync.DefaultStages(ctx, + db, stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, cfg.InternalCL && cfg.CaplinConfig.ArchiveBlocks, cfg.CaplinConfig.ArchiveBlobs, cfg.CaplinConfig.ArchiveStates, silkworm, cfg.Prune), stagedsync.StageHeadersCfg(db, controlServer.Hd, controlServer.Bd, *controlServer.ChainConfig, cfg.Sync, controlServer.SendHeaderRequest, controlServer.PropagateNewBlockHashes, controlServer.Penalize, cfg.BatchSize, p2pCfg.NoDiscovery, blockReader, blockWriter, dirs.Tmp, notifications), stagedsync.StageBorHeimdallCfg(db, snapDb, stagedsync.MiningState{}, *controlServer.ChainConfig, heimdallClient, heimdallStore, bridgeStore, blockReader, controlServer.Hd, controlServer.Penalize, recents, signatures, cfg.WithHeimdallWaypointRecording, nil), @@ -732,6 +733,7 @@ func NewPipelineStages(ctx context.Context, if len(cfg.Sync.UploadLocation) == 0 { return stagedsync.PipelineStages(ctx, + db, stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, cfg.InternalCL && cfg.CaplinConfig.ArchiveBlocks, cfg.CaplinConfig.ArchiveBlobs, cfg.CaplinConfig.ArchiveStates, silkworm, cfg.Prune), stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter), stagedsync.StageSendersCfg(db, controlServer.ChainConfig, cfg.Sync, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd), @@ -741,6 +743,7 @@ func NewPipelineStages(ctx context.Context, } return stagedsync.UploaderPipelineStages(ctx, + db, stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, cfg.InternalCL && cfg.CaplinConfig.ArchiveBlocks, cfg.CaplinConfig.ArchiveBlobs, cfg.CaplinConfig.ArchiveStates, silkworm, cfg.Prune), stagedsync.StageHeadersCfg(db, controlServer.Hd, controlServer.Bd, *controlServer.ChainConfig, cfg.Sync, controlServer.SendHeaderRequest, controlServer.PropagateNewBlockHashes, controlServer.Penalize, cfg.BatchSize, p2pCfg.NoDiscovery, blockReader, blockWriter, dirs.Tmp, notifications), stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter), @@ -756,6 +759,7 @@ func NewInMemoryExecution(ctx context.Context, db kv.RwDB, cfg *ethconfig.Config return stagedsync.New( cfg.Sync, stagedsync.StateStages(ctx, stagedsync.StageHeadersCfg(db, controlServer.Hd, controlServer.Bd, *controlServer.ChainConfig, cfg.Sync, controlServer.SendHeaderRequest, controlServer.PropagateNewBlockHashes, controlServer.Penalize, cfg.BatchSize, false, blockReader, blockWriter, dirs.Tmp, nil), + db, stagedsync.StageBodiesCfg(db, controlServer.Bd, controlServer.SendBodyRequest, controlServer.Penalize, controlServer.BroadcastNewBlock, cfg.Sync.BodyDownloadTimeoutSeconds, *controlServer.ChainConfig, blockReader, blockWriter), stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter), stagedsync.StageSendersCfg(db, controlServer.ChainConfig, cfg.Sync, true, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd), stagedsync.StageExecuteBlocksCfg(db, cfg.Prune, cfg.BatchSize, controlServer.ChainConfig, controlServer.Engine, &vm.Config{}, notifications, cfg.StateStream, true, cfg.Dirs, blockReader, controlServer.Hd, cfg.Genesis, cfg.Sync, SilkwormForExecutionStage(silkworm, cfg))), stagedsync.StateUnwindOrder, @@ -789,6 +793,7 @@ func NewPolygonSyncStages( ) []*stagedsync.Stage { return stagedsync.PolygonSyncStages( ctx, + db, stagedsync.StageSnapshotsCfg( db, *chainConfig, diff --git a/txnprovider/txpool/pool_test.go b/txnprovider/txpool/pool_test.go index 31f9b513eed..c07142626d6 100644 --- a/txnprovider/txpool/pool_test.go +++ b/txnprovider/txpool/pool_test.go @@ -930,7 +930,7 @@ func TestShanghaiValidateTxn(t *testing.T) { tx, err := coreDB.BeginTemporalRw(ctx) defer tx.Rollback() asrt.NoError(err) - sd, err := state.NewSharedDomains(tx, logger) + sd, err := state.NewSharedDomains(tx, coreDB, logger) asrt.NoError(err) defer sd.Close() cache := kvcache.NewDummy() @@ -1052,7 +1052,7 @@ func TestSetCodeTxnValidationWithLargeAuthorizationValues(t *testing.T) { tx, err := coreDB.BeginRw(ctx) defer tx.Rollback() assert.NoError(t, err) - sd, err := state.NewSharedDomains(tx, logger) + sd, err := state.NewSharedDomains(tx, coreDB, logger) assert.NoError(t, err) defer sd.Close()