Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(mempool/v1): data race #1655

Merged
merged 4 commits into from
Mar 5, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions mempool/v1/mempool.go
Original file line number Diff line number Diff line change
@@ -205,6 +205,11 @@ func (txmp *TxMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo memp

txKey := tx.Key()

// At this point, we need to ensure that passing CheckTx and adding to
// the mempool is atomic.
txmp.Lock()
defer txmp.Unlock()

// Check for the transaction in the cache.
if !txmp.cache.Push(tx) {
// If the cached transaction is also in the pool, record its sender.
@@ -216,11 +221,6 @@ func (txmp *TxMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo memp
return mempool.ErrTxInCache
}

// At this point, we need to ensure that passing CheckTx and adding to
// the mempool is atomic.
txmp.Lock()
defer txmp.Unlock()

// Invoke an ABCI CheckTx for this transaction.
rsp, err := txmp.proxyAppConn.CheckTxSync(abci.RequestCheckTx{Tx: tx})
if err != nil {
84 changes: 84 additions & 0 deletions mempool/v1/mempool_test.go
Original file line number Diff line number Diff line change
@@ -748,3 +748,87 @@ func abciResponses(n int, code uint32) []*abci.ResponseDeliverTx {
}
return responses
}

// TestConcurrentCheckTxPanic reproduces the concurrent map read/write panic
// that happens when multiple goroutines call CheckTx simultaneously.
// This test should be run with the race detector enabled.
//
// See https://github.com/celestiaorg/celestia-app/issues/4379
func TestConcurrentCheckTxDataRace(t *testing.T) {
// Skip in short mode as this test is timing-dependent
if testing.Short() {
t.Skip("skipping in short mode")
}

// Use a WaitGroup to ensure all goroutines finish
var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()

// Create a mempool for testing
app := kvstore.NewApplication()
cc := proxy.NewLocalClientCreator(app)
appConnMem, _ := cc.NewABCIClient()
err := appConnMem.Start()
require.NoError(t, err)
defer func() {
err := appConnMem.Stop()
require.NoError(t, err)
}()

logger := log.TestingLogger()
cfg := config.DefaultMempoolConfig()
txmp := NewTxMempool(logger, cfg, appConnMem, 0)

// Create test transactions
tx1 := types.Tx("test_transaction_1")
tx2 := types.Tx("test_transaction_2")
tx3 := types.Tx("test_transaction_3")

// First, add tx1 to both the cache and mempool so it shows up in txByKey
_ = txmp.CheckTx(tx1, nil, mempool.TxInfo{})

// Create multiple goroutines that all try to check the same transaction
// simultaneously, which should trigger the race condition
numGoroutines := 100
var startWg sync.WaitGroup
var startCh = make(chan struct{})

for i := 0; i < numGoroutines; i++ {
startWg.Add(1)

go func(id int) {
startWg.Done()
// Wait for the signal to start
<-startCh

// Use different transactions for different goroutines
// but ensure some overlap to trigger the race
var tx types.Tx
if id%3 == 0 {
tx = tx1 // Already in cache, will access txByKey
} else if id%3 == 1 {
tx = tx2 // New transaction
} else {
tx = tx3 // New transaction
}

// This will race with other goroutines
_ = txmp.CheckTx(tx, nil, mempool.TxInfo{})
}(i)
}

// Wait for all goroutines to be ready
startWg.Wait()
// Signal all goroutines to start simultaneously
close(startCh)

// Wait a short time for the panic to occur
time.Sleep(1 * time.Second)
}()

// Wait for the test goroutine to complete
wg.Wait()
}
Loading