From 1701b9c3c9d1a32e66d7052c65c9e74b91b812db Mon Sep 17 00:00:00 2001 From: danliu Date: Thu, 8 Nov 2018 16:11:17 +0800 Subject: [PATCH] Add priority lock for CheckTx in mempool and ReCheck ABCI interface --- CHANGELOG.md | 13 ++- CHANGELOG_PENDING.md | 2 - abci/client/client.go | 1 + abci/client/grpc_client.go | 9 ++ abci/client/local_client.go | 10 ++ abci/client/socket_client.go | 4 + abci/example/kvstore/persistent_kvstore.go | 4 + abci/types/application.go | 7 +- consensus/mempool_test.go | 13 ++- mempool/mempool.go | 45 ++++++--- mempool/mempool_test.go | 102 +++++++++++++++++++++ proxy/app_conn.go | 5 + version/version.go | 4 +- 13 files changed, 194 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6032fc204..4d4d78789 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,14 @@ # Changelog +## v0.25.1 + +*October 17, 2018* + +BUG FIXES: + +- [state] [\#2616](https://github.com/tendermint/tendermint/issues/2616) Fix panic when genesis file's `validators` field is nil +- [consensus] [\#2634](https://github.com/tendermint/tendermint/issues/2634) Set `NextValidators` during replay + ## v0.25.0 *September 22, 2018* @@ -164,8 +173,8 @@ BUG FIXES: *August 22nd, 2018* BUG FIXES: -- [libs/autofile] \#2261 Fix log rotation so it actually happens. - - Fixes issues with consensus WAL growing unbounded ala \#2259 +- [libs/autofile] [\#2261](https://github.com/tendermint/tendermint/issues/2261) Fix log rotation so it actually happens. + - Fixes issues with consensus WAL growing unbounded ala [\#2259](https://github.com/tendermint/tendermint/issues/2259) ## 0.23.0 diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 1453630f3..81c7a3a29 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -15,5 +15,3 @@ FEATURES: IMPROVEMENTS: BUG FIXES: -- \#2616 Pass nil to NewValidatorSet() when genesis file's Validators field is nil -- \#2634 Set next validators along with validators while replay diff --git a/abci/client/client.go b/abci/client/client.go index 558588107..5b85c41ad 100644 --- a/abci/client/client.go +++ b/abci/client/client.go @@ -30,6 +30,7 @@ type Client interface { SetOptionAsync(types.RequestSetOption) *ReqRes DeliverTxAsync(tx []byte) *ReqRes CheckTxAsync(tx []byte) *ReqRes + ReCheckTxAsync(tx []byte) *ReqRes QueryAsync(types.RequestQuery) *ReqRes CommitAsync() *ReqRes InitChainAsync(types.RequestInitChain) *ReqRes diff --git a/abci/client/grpc_client.go b/abci/client/grpc_client.go index 4f37b17b6..178192920 100644 --- a/abci/client/grpc_client.go +++ b/abci/client/grpc_client.go @@ -177,6 +177,15 @@ func (cli *grpcClient) CheckTxAsync(tx []byte) *ReqRes { return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_CheckTx{res}}) } +func (cli *grpcClient) ReCheckTxAsync(tx []byte) *ReqRes { + req := types.ToRequestCheckTx(tx) + res, err := cli.client.CheckTx(context.Background(), req.GetCheckTx(), grpc.FailFast(true)) + if err != nil { + cli.StopForError(err) + } + return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_CheckTx{res}}) +} + func (cli *grpcClient) QueryAsync(params types.RequestQuery) *ReqRes { req := types.ToRequestQuery(params) res, err := cli.client.Query(context.Background(), req.GetQuery(), grpc.FailFast(true)) diff --git a/abci/client/local_client.go b/abci/client/local_client.go index 3ac3b6afa..22f08eb1b 100644 --- a/abci/client/local_client.go +++ b/abci/client/local_client.go @@ -91,6 +91,16 @@ func (app *localClient) CheckTxAsync(tx []byte) *ReqRes { ) } +func (app *localClient) ReCheckTxAsync(tx []byte) *ReqRes { + app.mtx.Lock() + res := app.Application.ReCheckTx(tx) + app.mtx.Unlock() + return app.callback( + types.ToRequestCheckTx(tx), + types.ToResponseCheckTx(res), + ) +} + func (app *localClient) QueryAsync(req types.RequestQuery) *ReqRes { app.mtx.Lock() res := app.Application.Query(req) diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index affea1a9e..8c9615449 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -246,6 +246,10 @@ func (cli *socketClient) CheckTxAsync(tx []byte) *ReqRes { return cli.queueRequest(types.ToRequestCheckTx(tx)) } +func (cli *socketClient) ReCheckTxAsync(tx []byte) *ReqRes { + return cli.queueRequest(types.ToRequestCheckTx(tx)) +} + func (cli *socketClient) QueryAsync(req types.RequestQuery) *ReqRes { return cli.queueRequest(types.ToRequestQuery(req)) } diff --git a/abci/example/kvstore/persistent_kvstore.go b/abci/example/kvstore/persistent_kvstore.go index f969eebfe..f67920222 100644 --- a/abci/example/kvstore/persistent_kvstore.go +++ b/abci/example/kvstore/persistent_kvstore.go @@ -78,6 +78,10 @@ func (app *PersistentKVStoreApplication) CheckTx(tx []byte) types.ResponseCheckT return app.app.CheckTx(tx) } +func (app *PersistentKVStoreApplication) ReCheckTx(tx []byte) types.ResponseCheckTx { + return app.app.CheckTx(tx) +} + // Commit will panic if InitChain was not called func (app *PersistentKVStoreApplication) Commit() types.ResponseCommit { return app.app.Commit() diff --git a/abci/types/application.go b/abci/types/application.go index 88f8d001e..bf5b6c059 100644 --- a/abci/types/application.go +++ b/abci/types/application.go @@ -15,7 +15,8 @@ type Application interface { Query(RequestQuery) ResponseQuery // Query for state // Mempool Connection - CheckTx(tx []byte) ResponseCheckTx // Validate a tx for the mempool + CheckTx(tx []byte) ResponseCheckTx // Validate a tx for the mempool + ReCheckTx(tx []byte) ResponseCheckTx // Validate a tx for the mempool // Consensus Connection InitChain(RequestInitChain) ResponseInitChain // Initialize blockchain with validators and other info from TendermintCore @@ -53,6 +54,10 @@ func (BaseApplication) CheckTx(tx []byte) ResponseCheckTx { return ResponseCheckTx{Code: CodeTypeOK} } +func (BaseApplication) ReCheckTx(tx []byte) ResponseCheckTx { + return ResponseCheckTx{Code: CodeTypeOK} +} + func (BaseApplication) Commit() ResponseCommit { return ResponseCommit{} } diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index 950cf67d8..d7af80a08 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -95,7 +95,11 @@ func deliverTxsRange(cs *ConsensusState, start, end int) { func TestMempoolTxConcurrentWithCommit(t *testing.T) { state, privVals := randGenesisState(1, false, 10) - cs := newConsensusState(state, privVals[0], NewCounterApplication()) + // checkTx and block mutex are not purely FIFO, so we don't need to stick + // to the counter sequence + app := NewCounterApplication() + app.serial = false + cs := newConsensusState(state, privVals[0], app) height, round := cs.Height, cs.Round newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock) @@ -182,10 +186,11 @@ type CounterApplication struct { txCount int mempoolTxCount int + serial bool } func NewCounterApplication() *CounterApplication { - return &CounterApplication{} + return &CounterApplication{serial: true} } func (app *CounterApplication) Info(req abci.RequestInfo) abci.ResponseInfo { @@ -194,7 +199,7 @@ func (app *CounterApplication) Info(req abci.RequestInfo) abci.ResponseInfo { func (app *CounterApplication) DeliverTx(tx []byte) abci.ResponseDeliverTx { txValue := txAsUint64(tx) - if txValue != uint64(app.txCount) { + if app.serial && txValue != uint64(app.txCount) { return abci.ResponseDeliverTx{ Code: code.CodeTypeBadNonce, Log: fmt.Sprintf("Invalid nonce. Expected %v, got %v", app.txCount, txValue)} @@ -205,7 +210,7 @@ func (app *CounterApplication) DeliverTx(tx []byte) abci.ResponseDeliverTx { func (app *CounterApplication) CheckTx(tx []byte) abci.ResponseCheckTx { txValue := txAsUint64(tx) - if txValue != uint64(app.mempoolTxCount) { + if app.serial && txValue != uint64(app.mempoolTxCount) { return abci.ResponseCheckTx{ Code: code.CodeTypeBadNonce, Log: fmt.Sprintf("Invalid nonce. Expected %v, got %v", app.mempoolTxCount, txValue)} diff --git a/mempool/mempool.go b/mempool/mempool.go index 2096912f5..268af5ed1 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -101,7 +101,9 @@ func TxID(tx []byte) string { type Mempool struct { config *cfg.MempoolConfig - proxyMtx sync.Mutex + proxyLowMtx sync.Mutex + proxyNextMtx sync.Mutex + proxyBlockingMtx sync.Mutex proxyAppConn proxy.AppConnMempool txs *clist.CList // concurrent linked-list of good txs counter int64 // simple incrementing counter @@ -196,8 +198,8 @@ func (mem *Mempool) CloseWAL() bool { return false } - mem.proxyMtx.Lock() - defer mem.proxyMtx.Unlock() + mem.Lock() + defer mem.Unlock() if mem.wal == nil { return false @@ -226,12 +228,27 @@ func (mem *Mempool) InitWAL() { // Lock locks the mempool. The consensus must be able to hold lock to safely update. func (mem *Mempool) Lock() { - mem.proxyMtx.Lock() + mem.proxyNextMtx.Lock() + mem.proxyBlockingMtx.Lock() + mem.proxyNextMtx.Unlock() } // Unlock unlocks the mempool. func (mem *Mempool) Unlock() { - mem.proxyMtx.Unlock() + mem.proxyBlockingMtx.Unlock() +} + +//LockLow uses triple mutex to low the priority of CheckTx() +func (mem *Mempool) LockLow() { + mem.proxyLowMtx.Lock() + mem.proxyNextMtx.Lock() + mem.proxyBlockingMtx.Lock() + mem.proxyNextMtx.Unlock() +} + +func (mem *Mempool) UnlockLow() { + mem.proxyBlockingMtx.Unlock() + mem.proxyLowMtx.Unlock() } // Size returns the number of transactions in the mempool. @@ -247,8 +264,8 @@ func (mem *Mempool) FlushAppConn() error { // Flush removes all transactions from the mempool and cache func (mem *Mempool) Flush() { - mem.proxyMtx.Lock() - defer mem.proxyMtx.Unlock() + mem.Lock() + defer mem.Unlock() mem.cache.Reset() @@ -278,8 +295,8 @@ func (mem *Mempool) TxsWaitChan() <-chan struct{} { // It gets called from another goroutine. // CONTRACT: Either cb will get called, or err returned. func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) { - mem.proxyMtx.Lock() - defer mem.proxyMtx.Unlock() + mem.LockLow() + defer mem.UnlockLow() if mem.Size() >= mem.config.Size { return ErrMempoolIsFull @@ -428,8 +445,8 @@ func (mem *Mempool) notifyTxsAvailable() { // If both maxes are negative, there is no cap on the size of all returned // transactions (~ all available transactions). func (mem *Mempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { - mem.proxyMtx.Lock() - defer mem.proxyMtx.Unlock() + mem.Lock() + defer mem.Unlock() for atomic.LoadInt32(&mem.rechecking) > 0 { // TODO: Something better? @@ -464,8 +481,8 @@ func (mem *Mempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { // If max is negative, there is no cap on the size of all returned // transactions (~ all available transactions). func (mem *Mempool) ReapMaxTxs(max int) types.Txs { - mem.proxyMtx.Lock() - defer mem.proxyMtx.Unlock() + mem.Lock() + defer mem.Unlock() if max < 0 { max = mem.txs.Len() @@ -560,7 +577,7 @@ func (mem *Mempool) recheckTxs(goodTxs []types.Tx) { // Push txs to proxyAppConn // NOTE: resCb() may be called concurrently. for _, tx := range goodTxs { - mem.proxyAppConn.CheckTxAsync(tx) + mem.proxyAppConn.ReCheckTxAsync(tx) } mem.proxyAppConn.FlushAsync() } diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index 4f66da36c..0ff566175 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -8,6 +8,7 @@ import ( "io/ioutil" "os" "path/filepath" + "sync" "testing" "time" @@ -219,6 +220,107 @@ func TestTxsAvailable(t *testing.T) { ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) } +type SleepCounterApplication struct { + *counter.CounterApplication + wg *sync.WaitGroup +} + +func NewSleepCounterApplication(f bool, i int) *SleepCounterApplication { + wg := &sync.WaitGroup{} + wg.Add(i) + return &SleepCounterApplication{counter.NewCounterApplication(f), wg} +} + +func (app *SleepCounterApplication) CheckTx(tx []byte) abci.ResponseCheckTx { + res := app.CounterApplication.CheckTx(tx) + app.wg.Wait() + return res +} + +func TestReapPriority(t *testing.T) { + TotalTx := 15 + app := NewSleepCounterApplication(false, TotalTx) + cc := proxy.NewLocalClientCreator(app) + + mempool := newMempoolWithApp(cc) + appConnCon, _ := cc.NewABCIClient() + appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus")) + err := appConnCon.Start() + require.Nil(t, err) + + testResult := make(chan string, 100) + var wg sync.WaitGroup + j := 0 + checkTxs := func(i int) { + txBytes := make([]byte, 8) + binary.BigEndian.PutUint64(txBytes, uint64(i)) + mempool.CheckTx(txBytes, nil) + wg.Done() + } + //seqReap := make(chan int, 5) + + reapCheck := func(threshold int) { + //for threshold := range seqReap { + txs := mempool.ReapMaxBytesMaxGas(-1, -1) + + if len(txs) >= threshold { + str := fmt.Sprintf("Reap failed to have priority, %v > %v\n", len(txs), threshold) + fmt.Print(str) + testResult <- str + } else { + fmt.Printf("Priority reaping: %v < %v\n", len(txs), threshold) + } + j += len(txs) + if err := mempool.Update(0, txs, nil, nil); err != nil { + testResult <- err.Error() + } + for _, txBytes := range txs { + + res, err := appConnCon.DeliverTxSync(txBytes) + if err != nil { + testResult <- fmt.Sprintf("Client error committing tx: %v", err) + } + if res.IsErr() { + testResult <- fmt.Sprintf("Error committing tx. Code:%v result:%X log:%v", + res.Code, res.Data, res.Log) + } + // fmt.Println("delivered") + } + + res, err := appConnCon.CommitSync() + if err != nil { + testResult <- fmt.Sprintf("Client error committing: %v", err) + } + if len(res.Data) != 8 { + testResult <- fmt.Sprintf("Error committing. Hash:%X", res.Data) + } + + //} + + } + + //go reapCheck() + wg.Add(TotalTx) + for i := 1; i <= TotalTx; i++ { + fmt.Printf("Insert checkTX:%v\n", i) + go checkTxs(i) + } + //close(seqReap) + time.Sleep(time.Millisecond) + for i := 1; i <= TotalTx; i++ { + app.wg.Done() + } + reapCheck(TotalTx) + wg.Wait() + close(testResult) + k := 0 + for s := range testResult { + t.Log(s) + k++ + } + require.Equal(t, 0, k) +} + func TestSerialReap(t *testing.T) { app := counter.NewCounterApplication(true) app.SetOption(abci.RequestSetOption{Key: "serial", Value: "on"}) diff --git a/proxy/app_conn.go b/proxy/app_conn.go index 2f792671e..1870adc61 100644 --- a/proxy/app_conn.go +++ b/proxy/app_conn.go @@ -25,6 +25,7 @@ type AppConnMempool interface { Error() error CheckTxAsync(tx []byte) *abcicli.ReqRes + ReCheckTxAsync(tx []byte) *abcicli.ReqRes FlushAsync() *abcicli.ReqRes FlushSync() error @@ -114,6 +115,10 @@ func (app *appConnMempool) CheckTxAsync(tx []byte) *abcicli.ReqRes { return app.appConn.CheckTxAsync(tx) } +func (app *appConnMempool) ReCheckTxAsync(tx []byte) *abcicli.ReqRes { + return app.appConn.ReCheckTxAsync(tx) +} + //------------------------------------------------ // Implements AppConnQuery (subset of abcicli.Client) diff --git a/version/version.go b/version/version.go index d8bab5772..5c09ce36e 100644 --- a/version/version.go +++ b/version/version.go @@ -4,13 +4,13 @@ package version const ( Maj = "0" Min = "25" - Fix = "0" + Fix = "1" ) var ( // Version is the current version of Tendermint // Must be a string because scripts like dist.sh read this file. - Version = "0.25.0" + Version = "0.25.1" // GitCommit is the current HEAD set using ldflags. GitCommit string