Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prototype: compact blocks #1191

Draft
wants to merge 75 commits into
base: v0.34.x-celestia
Choose a base branch
from
Draft
Changes from 1 commit
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
bf6edc0
implement prototype for compact blocks
cmwaters Jan 20, 2024
211694b
add compact block messages to the reactor
cmwaters Jan 24, 2024
567cf3f
checkpoint
cmwaters Jan 24, 2024
22639ab
checkpoint
cmwaters Jan 25, 2024
d4f62fd
fix e2e tests
cmwaters Jan 25, 2024
27a6fe3
add has compact block message
cmwaters Jan 26, 2024
f0e972e
use go 1.19
cmwaters Jan 26, 2024
1cb6666
fix tests
cmwaters Jan 27, 2024
4874e77
fix consensus tests
cmwaters Jan 27, 2024
187d26c
fix test I just broke
cmwaters Jan 27, 2024
0f529f6
linting
cmwaters Jan 27, 2024
64d3f6d
lint
cmwaters Jan 27, 2024
c530d1e
fix data race
cmwaters Jan 27, 2024
3e6c075
go fmt
cmwaters Jan 27, 2024
507f3d2
Merge remote-tracking branch 'origin/v0.34.x-celestia' into callum/co…
cmwaters Jul 19, 2024
9ab18d4
gofmt
cmwaters Jul 19, 2024
8f63838
save metrics
cmwaters Jul 19, 2024
71b752e
enable metrics
cmwaters Jul 19, 2024
c2e6331
shift where we measure compact blocks
cmwaters Jul 19, 2024
1cbd361
fix: panic in prometheus
cmwaters Jul 24, 2024
1fe8522
add the square size to the constructed data
cmwaters Jul 29, 2024
1846523
add more metrics for compact blocks
cmwaters Jul 30, 2024
1c4f76e
fix usage of metrics
cmwaters Jul 30, 2024
9ad05ed
fix error handling in cat pool and consensus
cmwaters Jul 30, 2024
93774a7
checkpoint
cmwaters Jul 31, 2024
6f8f70a
print out account sequence
cmwaters Jul 31, 2024
82a8958
track if we see missing transcations which are invalid
cmwaters Aug 8, 2024
d1b3c4d
add more logs around the mempool
cmwaters Aug 9, 2024
8a5d797
remove jitter
cmwaters Aug 9, 2024
e937057
add inclusion delay
cmwaters Aug 9, 2024
74cac47
jump to 3 second delay
cmwaters Aug 9, 2024
dfa4382
improve logs
cmwaters Aug 9, 2024
a56bbb7
checkpoint
cmwaters Aug 9, 2024
905ba36
Revert "checkpoint"
cmwaters Aug 12, 2024
2273f83
update logger
cmwaters Aug 12, 2024
513949c
checkpoint
cmwaters Aug 13, 2024
567a281
checkpoint
cmwaters Aug 13, 2024
dcea04c
add more tracing data
cmwaters Aug 13, 2024
c5fd230
return some logs to debug
cmwaters Aug 13, 2024
b4ebb57
checkpoint
cmwaters Aug 14, 2024
33004a3
sneaky fix
cmwaters Aug 14, 2024
1bb12bf
fix
cmwaters Aug 14, 2024
b1086d2
checkpoint
cmwaters Aug 15, 2024
aab522a
fix nil pointer dereference
cmwaters Aug 15, 2024
d46ad2d
fix pruning and resetting
cmwaters Aug 16, 2024
5aefb63
define a send queue capacity
cmwaters Aug 16, 2024
178f81a
add more tracing data
cmwaters Aug 20, 2024
aaf89cf
register mempool recovery table
cmwaters Aug 23, 2024
c42c673
allow the fetching of compact blocks to last the entire round
cmwaters Aug 26, 2024
9ff3319
repropose the valid block if it exists
cmwaters Aug 26, 2024
7578e10
use updateRoundStep to track consensus state changes
cmwaters Aug 27, 2024
34774cd
add compact hash to the proposal type
cmwaters Aug 27, 2024
955714f
chore: remove maverick directory
cmwaters Aug 27, 2024
5ab5034
use compact hash
cmwaters Aug 27, 2024
ec3335c
remove cases in e2e test
cmwaters Aug 27, 2024
bc72dc4
cover up a few other missing cases
cmwaters Aug 27, 2024
5b25a7c
fix the fallback mechanism
cmwaters Aug 28, 2024
cf354c5
don't evict transcactions that have been proposed
cmwaters Aug 30, 2024
dddd61e
mark peer as having sent compact proposal
cmwaters Aug 30, 2024
92e20de
don't evict transactions in a proposal
cmwaters Aug 30, 2024
0a5adc0
make sure the lock is correctly used
cmwaters Aug 30, 2024
99b4727
play around with p2p settings
cmwaters Sep 2, 2024
7218a79
add a channel and modify some of the vote gossiping logic
cmwaters Sep 3, 2024
bc85fc2
implement priority based broadcast system
cmwaters Sep 5, 2024
79947c6
remove print statement
cmwaters Sep 5, 2024
a801fa3
checkpoint
cmwaters Sep 6, 2024
9f48af6
increase the send queue capacity
cmwaters Sep 6, 2024
594e303
make modifications to the consensus reactor
cmwaters Sep 6, 2024
7b76868
fix panic
cmwaters Sep 6, 2024
caeb9ef
checkpoint
cmwaters Sep 6, 2024
91f1703
Merge branch 'v0.34.x-celestia' into callum/compact-blocks
cmwaters Oct 28, 2024
a55e1c9
fix test
cmwaters Oct 28, 2024
77db525
revert some modifications to the consensus reactor logic
cmwaters Oct 28, 2024
f170aad
repurpose listen only in the mempool
cmwaters Oct 28, 2024
a31a8fe
lint
cmwaters Oct 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
implement prototype for compact blocks
cmwaters committed Jan 24, 2024
commit bf6edc023364c8d9f227b20f2212d83b041ef93f
9 changes: 5 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -692,9 +692,7 @@ func DefaultFuzzConnConfig() *FuzzConnConfig {
// MempoolConfig defines the configuration options for the CometBFT mempool
type MempoolConfig struct {
// Mempool version to use:
// 1) "v0" - FIFO mempool.
// 2) "v1" - (default) prioritized mempool.
// 3) "v2" - content addressable transaction pool
// 1) "v2" - (default) content addressable transaction pool
Version string `mapstructure:"version"`
// RootDir is the root directory for all data. This should be configured via
// the $CMTHOME env variable or --home cmd flag rather than overriding this
@@ -763,7 +761,7 @@ type MempoolConfig struct {
// DefaultMempoolConfig returns a default configuration for the CometBFT mempool
func DefaultMempoolConfig() *MempoolConfig {
return &MempoolConfig{
Version: MempoolV1,
Version: MempoolV2,
Recheck: true,
Broadcast: true,
WalPath: "",
@@ -798,6 +796,9 @@ func (cfg *MempoolConfig) WalEnabled() bool {
// ValidateBasic performs basic validation (checking param bounds, etc.) and
// returns an error if any check fails.
func (cfg *MempoolConfig) ValidateBasic() error {
if cfg.Version != MempoolV2 {
return errors.New("only v2 mempool is supported for compact blocks")
}
if cfg.Size < 0 {
return errors.New("size can't be negative")
}
4 changes: 1 addition & 3 deletions config/toml.go
Original file line number Diff line number Diff line change
@@ -343,9 +343,7 @@ dial_timeout = "{{ .P2P.DialTimeout }}"
[mempool]
# Mempool version to use:
# 1) "v0" - FIFO mempool.
# 2) "v1" - (default) prioritized mempool.
# 3) "v2" - content addressable transaction pool
# 1) "v2" - (default) content addressable transaction pool
version = "{{ .Mempool.Version }}"
# Recheck (default: true) defines whether CometBFT should recheck the
61 changes: 25 additions & 36 deletions consensus/byzantine_test.go
Original file line number Diff line number Diff line change
@@ -20,12 +20,8 @@ import (
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
cmtsync "github.com/tendermint/tendermint/libs/sync"
mempl "github.com/tendermint/tendermint/mempool"

cfg "github.com/tendermint/tendermint/config"
mempoolv2 "github.com/tendermint/tendermint/mempool/cat"
mempoolv0 "github.com/tendermint/tendermint/mempool/v0"
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
"github.com/tendermint/tendermint/mempool/cat"
"github.com/tendermint/tendermint/p2p"
cmtcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
cmtproto "github.com/tendermint/tendermint/proto/tendermint/types"
@@ -48,6 +44,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {

genDoc, privVals := randGenesisDoc(nValidators, false, 30)
css := make([]*State, nValidators)
catReactors := make([]*cat.Reactor, nValidators)

for i := 0; i < nValidators; i++ {
logger := consensusLogger().With("test", "byzantine", "validator", i)
@@ -72,33 +69,21 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
proxyAppConnConMem := abcicli.NewLocalClient(mtx, app)

// Make Mempool
var mempool mempl.Mempool

switch thisConfig.Mempool.Version {
case cfg.MempoolV0:
mempool = mempoolv0.NewCListMempool(config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
mempoolv0.WithPreCheck(sm.TxPreCheck(state)),
mempoolv0.WithPostCheck(sm.TxPostCheck(state)))
case cfg.MempoolV1:
mempool = mempoolv1.NewTxMempool(logger,
config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
)
case cfg.MempoolV2:
mempool = mempoolv2.NewTxPool(
logger,
config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
mempoolv2.WithPreCheck(sm.TxPreCheck(state)),
mempoolv2.WithPostCheck(sm.TxPostCheck(state)),
)
}
mempool := cat.NewTxPool(
logger,
config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
cat.WithPreCheck(sm.TxPreCheck(state)),
cat.WithPostCheck(sm.TxPostCheck(state)),
)
var err error
catReactors[i], err = cat.NewReactor(mempool, &cat.ReactorOptions{
ListenOnly: !config.Mempool.Broadcast,
MaxTxSize: config.Mempool.MaxTxBytes,
MaxGossipDelay: config.Mempool.MaxGossipDelay,
})
require.NoError(t, err)

if thisConfig.Consensus.WaitForTxs() {
mempool.EnableTxsAvailable()
@@ -112,7 +97,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {

// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, catReactors[i], evpool)
cs.SetLogger(cs.Logger)
// set private validator
pv := privVals[i]
@@ -154,6 +139,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// make connected switches and start all reactors
p2p.MakeConnectedSwitches(config.P2P, nValidators, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("CONSENSUS", reactors[i])
s.AddReactor("MEMPOOL", catReactors[i])
s.SetLogger(reactors[i].conS.Logger.With("module", "p2p"))
return s
}, p2p.Connect2Switches)
@@ -230,9 +216,10 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
}
proposerAddr := lazyProposer.privValidatorPubKey.Address()

block, blockParts := lazyProposer.blockExec.CreateProposalBlock(
block := lazyProposer.blockExec.CreateProposalBlock(
lazyProposer.Height, lazyProposer.state, commit, proposerAddr,
)
blockParts := block.MakePartSet(types.BlockPartSizeBytes)

// Flush the WAL. Otherwise, we may not recompute the same proposal to sign,
// and the privValidator will refuse to sign anything.
@@ -480,7 +467,8 @@ func byzantineDecideProposalFunc(t *testing.T, height int64, round int32, cs *St
// Avoid sending on internalMsgQueue and running consensus state.

// Create a new proposal block from state/txs from the mempool.
block1, blockParts1 := cs.createProposalBlock()
block1 := cs.createProposalBlock()
blockParts1 := block1.MakePartSet(types.BlockPartSizeBytes)
polRound := cs.TwoThirdPrevoteRound
propBlockID := types.BlockID{Hash: block1.Hash(), PartSetHeader: blockParts1.Header()}
proposal1 := types.NewProposal(height, round, polRound, propBlockID)
@@ -495,7 +483,8 @@ func byzantineDecideProposalFunc(t *testing.T, height int64, round int32, cs *St
deliverTxsRange(cs, 0, 1)

// Create a new proposal block from state/txs from the mempool.
block2, blockParts2 := cs.createProposalBlock()
block2 := cs.createProposalBlock()
blockParts2 := block2.MakePartSet(types.BlockPartSizeBytes)
polRound = cs.TwoThirdPrevoteRound
propBlockID = types.BlockID{Hash: block2.Hash(), PartSetHeader: blockParts2.Header()}
proposal2 := types.NewProposal(height, round, polRound, propBlockID)
6 changes: 4 additions & 2 deletions consensus/common_test.go
Original file line number Diff line number Diff line change
@@ -207,7 +207,9 @@ func decideProposal(
round int32,
) (proposal *types.Proposal, block *types.Block) {
cs1.mtx.Lock()
block, blockParts := cs1.createProposalBlock()
block = cs1.createProposalBlock()
blockParts := block.MakePartSet(types.BlockPartSizeBytes)

validRound := cs1.TwoThirdPrevoteRound
chainID := cs1.state.ChainID
cs1.mtx.Unlock()
@@ -447,7 +449,7 @@ func newStateWithConfigAndBlockStore(
}

blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, nil, evpool)
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetPrivValidator(pv)

50 changes: 50 additions & 0 deletions consensus/metrics.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package consensus

import (
"encoding/json"
"fmt"
"path/filepath"
"strings"
"time"

"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/discard"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/libs/os"

prometheus "github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
@@ -299,3 +303,49 @@ func (m *Metrics) MarkStep(s cstypes.RoundStepType) {
}
m.stepStart = time.Now()
}

type JSONMetrics struct {
dir string
interval int
StartTime time.Time
EndTime time.Time
Blocks uint64
Rounds uint64
SentConsensusBytes uint64
SentCompactBlocks uint64
SentCompactBytes uint64
CompactBlockFailures uint64
SentBlockParts uint64
SentBlockPartsBytes uint64
}

func NewJSONMetrics(dir string) *JSONMetrics {
return &JSONMetrics{
dir: dir,
StartTime: time.Now().UTC(),
}
}

func (m *JSONMetrics) Save() {
m.EndTime = time.Now().UTC()
content, err := json.MarshalIndent(m, "", " ")
if err != nil {
panic(err)
}
path := filepath.Join(m.dir, fmt.Sprintf("metrics_%d.json", m.interval))
os.MustWriteFile(path, content, 0644)
m.StartTime = m.EndTime
m.interval++
m.reset()
}

func (m *JSONMetrics) reset() {
m.Blocks = 0
m.Rounds = 0
m.SentConsensusBytes = 0
m.SentBlockParts = 0
m.SentBlockPartsBytes = 0
m.SentCompactBlocks = 0
m.SentCompactBytes = 0
m.CompactBlockFailures = 0
}
2 changes: 1 addition & 1 deletion consensus/reactor_test.go
Original file line number Diff line number Diff line change
@@ -215,7 +215,7 @@ func TestReactorWithEvidence(t *testing.T) {

// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool2)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, nil, evpool2)
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetPrivValidator(pv)

4 changes: 2 additions & 2 deletions consensus/replay_file.go
Original file line number Diff line number Diff line change
@@ -130,7 +130,7 @@ func (pb *playback) replayReset(count int, newStepSub types.Subscription) error
pb.cs.Wait()

newCS := NewState(pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec,
pb.cs.blockStore, pb.cs.txNotifier, pb.cs.evpool)
pb.cs.blockStore, pb.cs.txNotifier, pb.cs.txFetcher, pb.cs.evpool)
newCS.SetEventBus(pb.cs.eventBus)
newCS.startForReplay()

@@ -333,7 +333,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)

consensusState := NewState(csConfig, state.Copy(), blockExec,
blockStore, mempool, evpool)
blockStore, mempool, nil, evpool)

consensusState.SetEventBus(eventBus)
return consensusState
8 changes: 4 additions & 4 deletions consensus/replay_test.go
Original file line number Diff line number Diff line change
@@ -373,7 +373,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
newValidatorTx1 := kvstore.MakeValSetChangeTx(valPubKey1ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx1, nil, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ := css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlock := css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlockParts := propBlock.MakePartSet(partSize)
blockID := types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()}

@@ -403,7 +403,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25)
err = assertMempool(css[0].txNotifier).CheckTx(updateValidatorTx1, nil, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlock = css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)
blockID = types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()}

@@ -440,7 +440,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
newValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, testMinPower)
err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx3, nil, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlock = css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)
blockID = types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()}
newVss := make([]*validatorStub, nVals+1)
@@ -515,7 +515,7 @@ func TestSimulateValidatorsChange(t *testing.T) {
removeValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, 0)
err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx3, nil, mempl.TxInfo{})
assert.Nil(t, err)
propBlock, _ = css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlock = css[0].createProposalBlock() // changeProposer(t, cs1, vs2)
propBlockParts = propBlock.MakePartSet(partSize)
blockID = types.BlockID{Hash: propBlock.Hash(), PartSetHeader: propBlockParts.Header()}
newVss = make([]*validatorStub, nVals+3)
76 changes: 70 additions & 6 deletions consensus/state.go
Original file line number Diff line number Diff line change
@@ -2,10 +2,12 @@ package consensus

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"runtime/debug"
"sort"
"time"
@@ -67,6 +69,27 @@ type txNotifier interface {
TxsAvailable() <-chan struct{}
}

type TxFetcher interface {
// For constructing the compact block
FetchKeysFromTxs(context.Context, [][]byte) ([][]byte, error)
// For reconstructing the full block from the compact block
FetchTxsFromKeys(context.Context, []byte, [][]byte) ([][]byte, error)
Comment on lines +75 to +77
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note for after prototype: we need var names plus more dovs here

}

var _ TxFetcher = (*StandardTxFetcher)(nil)

// StandardTxFetcher implements TxFetcher. Unlike the CATPool, it gossips the entire transaction set
// instead of the set of keys. This aligns with Tendermint's vanilla block propagation method
type StandardTxFetcher struct{}

func (tf *StandardTxFetcher) FetchKeysFromTxs(_ context.Context, txs [][]byte) ([][]byte, error) {
return txs, nil
}

func (tf *StandardTxFetcher) FetchTxsFromKeys(_ context.Context, _ []byte, txs [][]byte) ([][]byte, error) {
return txs, nil
}

// interface to the evidence pool
type evidencePool interface {
// reports conflicting votes to the evidence pool to be processed into evidence
@@ -92,6 +115,8 @@ type State struct {

// notify us if txs are available
txNotifier txNotifier
// fetch txs based on tx keys. Used for compact blocks.
txFetcher TxFetcher

// add evidence to the pool
// when it's detected
@@ -141,8 +166,8 @@ type State struct {
evsw cmtevents.EventSwitch

// for reporting metrics
metrics *Metrics

metrics *Metrics
jsonMetrics *JSONMetrics
traceClient *trace.Client
}

@@ -156,14 +181,21 @@ func NewState(
blockExec *sm.BlockExecutor,
blockStore sm.BlockStore,
txNotifier txNotifier,
txFetcher TxFetcher,
evpool evidencePool,
options ...StateOption,
) *State {
path := filepath.Join(config.RootDir, "data", "consensus")
if err := cmtos.EnsureDir(path, 0700); err != nil {
panic(err)
}

cs := &State{
config: config,
blockExec: blockExec,
blockStore: blockStore,
txNotifier: txNotifier,
txFetcher: txFetcher,
peerMsgQueue: make(chan msgInfo, msgQueueSize),
internalMsgQueue: make(chan msgInfo, msgQueueSize),
timeoutTicker: NewTimeoutTicker(),
@@ -174,9 +206,14 @@ func NewState(
evpool: evpool,
evsw: cmtevents.NewEventSwitch(),
metrics: NopMetrics(),
jsonMetrics: NewJSONMetrics(path),
traceClient: &trace.Client{},
}

if cs.txFetcher == nil {
cs.txFetcher = &StandardTxFetcher{}
}

// set function defaults (may be overwritten before calling Start)
cs.decideProposal = cs.defaultDecideProposal
cs.doPrevote = cs.defaultDoPrevote
@@ -1156,10 +1193,19 @@ func (cs *State) defaultDecideProposal(height int64, round int32) {
block, blockParts = cs.TwoThirdPrevoteBlock, cs.TwoThirdPrevoteBlockParts
} else {
// Create a new proposal block from state/txs from the mempool.
block, blockParts = cs.createProposalBlock()
block = cs.createProposalBlock()
if block == nil {
return
}

keys, err := cs.txFetcher.FetchKeysFromTxs(context.Background(), block.Txs.ToSliceOfBytes())
if err != nil {
cs.Logger.Error("failed to fetch tx keys", "err", err)
return
}

block.Txs = types.ToTxs(keys)
blockParts = block.MakePartSet(types.BlockPartSizeBytes)
}

// Flush the WAL. Otherwise, we may not recompute the same proposal to sign,
@@ -1211,7 +1257,7 @@ func (cs *State) isProposalComplete() bool {
//
// NOTE: keep it side-effect free for clarity.
// CONTRACT: cs.privValidator is not nil.
func (cs *State) createProposalBlock() (block *types.Block, blockParts *types.PartSet) {
func (cs *State) createProposalBlock() *types.Block {
if cs.privValidator == nil {
panic("entered createProposalBlock with privValidator being nil")
}
@@ -1229,14 +1275,14 @@ func (cs *State) createProposalBlock() (block *types.Block, blockParts *types.Pa

default: // This shouldn't happen.
cs.Logger.Error("propose step; cannot propose anything without commit for the previous block")
return
return nil
}

if cs.privValidatorPubKey == nil {
// If this node is a validator & proposer in the current round, it will
// miss the opportunity to create a block.
cs.Logger.Error("propose step; empty priv validator public key", "err", errPubKeyIsNotSet)
return
return nil
}

proposerAddr := cs.privValidatorPubKey.Address()
@@ -1949,7 +1995,25 @@ func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.ID) (add
if err != nil {
return added, err
}
blockHash := cs.ProposalBlockParts.Header().Hash
timeout := cs.config.Propose(round)

// Yield the lock while we fetch the transactions from the mempool so that votes
// and other operations can be processed.
cs.mtx.Unlock()

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

txs, err := cs.txFetcher.FetchTxsFromKeys(ctx, blockHash, block.Data.Txs.ToSliceOfBytes())

cs.mtx.Lock()
if err != nil {
cs.Logger.Error("failed to fetch transactions for compact block", "err", err)
cs.jsonMetrics.CompactBlockFailures++
return true, err
}
block.Data.Txs = types.ToTxs(txs)
cs.ProposalBlock = block

// NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal
2 changes: 1 addition & 1 deletion consensus/state_test.go
Original file line number Diff line number Diff line change
@@ -192,7 +192,7 @@ func TestStateBadProposal(t *testing.T) {
proposalCh := subscribe(cs1.eventBus, types.EventQueryCompleteProposal)
voteCh := subscribe(cs1.eventBus, types.EventQueryVote)

propBlock, _ := cs1.createProposalBlock() // changeProposer(t, cs1, vs2)
propBlock := cs1.createProposalBlock() // changeProposer(t, cs1, vs2)

// make the second validator the proposer by incrementing round
round++
2 changes: 1 addition & 1 deletion consensus/wal_generator.go
Original file line number Diff line number Diff line change
@@ -85,7 +85,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) {
mempool := emptyMempool{}
evpool := sm.EmptyEvidencePool{}
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
consensusState := NewState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
consensusState := NewState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, nil, evpool)
consensusState.SetLogger(logger)
consensusState.SetEventBus(eventBus)
if privValidator != nil {
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module github.com/tendermint/tendermint

go 1.19
go 1.21

toolchain go1.21.6

require (
github.com/BurntSushi/toml v1.2.1
32 changes: 32 additions & 0 deletions go.sum

Large diffs are not rendered by default.

263 changes: 263 additions & 0 deletions mempool/cat/block_builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
package cat

import (
"context"
"fmt"
"sync"
"time"

"github.com/tendermint/tendermint/types"
)

// FetchTxsFromKeys is called upon by consensus upon receiving a complete compact block.
// The method iterates through the keys in the compact block. For the transactions it
// already has it adds them to a list. For the transactions that are missing it uses a
// block request to track and retrieve them. Once all transactions are retrieved, it returns
// the complete set to the consensus engine. This can be called multiple times sequentially
// with the same blockID and is thread safe
func (memR *Reactor) FetchTxsFromKeys(ctx context.Context, blockID []byte, compactData [][]byte) ([][]byte, error) {
if request, ok := memR.blockFetcher.GetRequest(blockID); ok {
memR.Logger.Debug("tracking existing request for block transactions")
// we already have a request for this block
return request.WaitForBlock(ctx)
}

txs := make([][]byte, len(compactData))
missingKeys := make(map[int]types.TxKey, len(compactData))

// iterate through the keys to know what transactions we have and what are missing
for i, key := range compactData {
txKey, err := types.TxKeyFromBytes(key)
if err != nil {
return nil, fmt.Errorf("incorrect compact blocks format: %w", err)
}
wtx := memR.mempool.store.get(txKey)
if wtx != nil {
txs[i] = wtx.tx
} else {
missingKeys[i] = txKey
}
}
memR.Logger.Info("fetching transactions from peers", "blockID", blockID, "numTxs", len(txs), "numMissing", len(missingKeys))

memR.mempool.jsonMetrics.Lock()
memR.mempool.jsonMetrics.TransactionsMissing = append(memR.mempool.jsonMetrics.TransactionsMissing, uint64(len(missingKeys)))
memR.mempool.jsonMetrics.Transactions = append(memR.mempool.jsonMetrics.Transactions, uint64(len(compactData)))
// Check if we got lucky and already had all the transactions.
if len(missingKeys) == 0 {
memR.mempool.jsonMetrics.TimeTakenFetchingTxs = append(memR.mempool.jsonMetrics.TimeTakenFetchingTxs, 0)
memR.mempool.jsonMetrics.Unlock()
return txs, nil
}
memR.mempool.jsonMetrics.Unlock()

// setup a request for this block and begin to track and retrieve all missing transactions
request := memR.blockFetcher.NewRequest(
blockID,
memR.mempool.Height(),
missingKeys,
txs,
)
defer func() {
timeTaken := request.TimeTaken()
if timeTaken == 0 {
return
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does this do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's for gathering metrics

memR.mempool.jsonMetrics.Lock()
memR.mempool.jsonMetrics.TimeTakenFetchingTxs = append(memR.mempool.jsonMetrics.TimeTakenFetchingTxs, timeTaken)
memR.mempool.jsonMetrics.Unlock()
}()

// Wait for the reactor to retrieve and post all transactions.
return request.WaitForBlock(ctx)
}

// FetchKeysFromTxs is in many ways the opposite method. It takes a full block generated by the application
// and reduces it to the set of keys that need to be gossiped from one mempool to another nodes mempool
// in order to recreate the full block.
func (memR *Reactor) FetchKeysFromTxs(ctx context.Context, txs [][]byte) ([][]byte, error) {
keys := make([][]byte, len(txs))
for idx, tx := range txs {
// check if the context has been cancelled
if ctx.Err() != nil {
return nil, ctx.Err()
}
key := types.Tx(tx).Key()
keys[idx] = key[:]
has := memR.mempool.store.has(key)
if !has {
// If the mempool provided the initial transactions yet received from
// consensus a transaction it doesn't recognize, this implies that
// either a tx was mutated or was added by the application. In either
// case, it is likely no other mempool has this transaction so we
// preemptively broadcast it to all other peers
//
// We don't set the priority, gasWanted or sender fields because we
// don't know them.
wtx := newWrappedTx(tx, key, memR.mempool.Height(), 0, 0, "")
memR.broadcastNewTx(wtx)
// For safety we also store this transaction in the mempool (ignoring
// all size limits) so that we can retrieve it later if needed. Note
// as we're broadcasting it to all peers, we should not receive a `WantTx`
// unless it gets rejected by the application in CheckTx.
//
// Consensus will have an in memory copy of the entire block which includes
// this transaction so it should not need it.
memR.mempool.store.set(wtx)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to call checktx to avoid accidently including invalid txs?

}

// return the keys back to the consensus engine
return keys, nil
}

type blockFetcher struct {
// mutex to manage concurrent calls to different parts
mtx sync.Mutex
// requests are a map of all processing block requests
// by blockID.
requests map[string]*blockRequest
}

// NewBlockFetcher returns a new blockFetcher for managing block requests
func NewBlockFetcher() *blockFetcher {
return &blockFetcher{
requests: make(map[string]*blockRequest),
}
}

func (bf *blockFetcher) GetRequest(blockID []byte) (*blockRequest, bool) {
bf.mtx.Lock()
defer bf.mtx.Unlock()
request, ok := bf.requests[string(blockID)]
return request, ok
}

// NewRequest creates a new block request and returns it.
// If a request already exists it returns that instead
func (bf *blockFetcher) NewRequest(
blockID []byte,
height int64,
missingKeys map[int]types.TxKey,
txs [][]byte,
) *blockRequest {
bf.mtx.Lock()
defer bf.mtx.Unlock()
if request, ok := bf.requests[string(blockID)]; ok {
return request
}
request := NewBlockRequest(height, missingKeys, txs)
bf.requests[string(blockID)] = request
bf.pruneOldRequests(height)
return request
}

// TryAddMissingTx loops through all current requests and tries to add
// the given transaction (if it is missing).
func (bf *blockFetcher) TryAddMissingTx(key types.TxKey, tx []byte) {
bf.mtx.Lock()
defer bf.mtx.Unlock()
for _, request := range bf.requests {
request.TryAddMissingTx(key, tx)
}
}

// PruneOldRequests removes any requests that are older than the given height.
func (bf *blockFetcher) pruneOldRequests(height int64) {
for blockID, request := range bf.requests {
if request.height < height {
delete(bf.requests, blockID)
}
}
}

// blockRequests handle the lifecycle of individual block requests.
type blockRequest struct {
// immutable fields
height int64
doneCh chan struct{}

mtx sync.Mutex
// track the remaining keys that are missing
missingKeysByIndex map[int]types.TxKey
missingKeys map[string]int
// the txs in the block
txs [][]byte

// used for metrics
startTime time.Time
endTime time.Time
}

func NewBlockRequest(
height int64,
missingKeys map[int]types.TxKey,
txs [][]byte,
) *blockRequest {
mk := make(map[string]int, len(missingKeys))
for i, key := range missingKeys {
mk[key.String()] = i
}
return &blockRequest{
height: height,
missingKeysByIndex: missingKeys,
missingKeys: mk,
txs: txs,
doneCh: make(chan struct{}),
startTime: time.Now().UTC(),
}
}

// WaitForBlock is a blocking call that waits for the block to be fetched and completed.
// It can be called concurrently. If the block was already fetched it returns immediately.
func (br *blockRequest) WaitForBlock(ctx context.Context) ([][]byte, error) {
if br.IsDone() {
return br.txs, nil
}

for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-br.doneCh:
br.mtx.Lock()
defer br.mtx.Unlock()
br.endTime = time.Now().UTC()
return br.txs, nil
}
}
}

// TryAddMissingTx checks if a given transactions was missing and if so
// adds it to the block request.
func (br *blockRequest) TryAddMissingTx(key types.TxKey, tx []byte) bool {
br.mtx.Lock()
defer br.mtx.Unlock()
if index, ok := br.missingKeys[key.String()]; ok {
delete(br.missingKeys, key.String())
delete(br.missingKeysByIndex, index)
br.txs[index] = tx
// check if there is any more transactions remaining
if len(br.missingKeys) == 0 {
// Yaay! We're done!
close(br.doneCh)
}
return true
}
return false
}

// IsDone returns whether all transactions in the block have been received.
// This is done by measuring the amount of missing keys.
func (br *blockRequest) IsDone() bool {
br.mtx.Lock()
defer br.mtx.Unlock()
return len(br.missingKeys) == 0
}

func (br *blockRequest) TimeTaken() uint64 {
if br.endTime.IsZero() {
return 0
}
return uint64(br.endTime.Sub(br.startTime).Milliseconds())
}
243 changes: 243 additions & 0 deletions mempool/cat/block_builder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
package cat

import (
"context"
"fmt"
"math/rand"
"sync"
"testing"
"time"

"github.com/tendermint/tendermint/crypto/tmhash"
"github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
memproto "github.com/tendermint/tendermint/proto/tendermint/mempool"
"github.com/tendermint/tendermint/types"
"github.com/stretchr/testify/require"
)

func TestBlockRequest(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
tx1, tx2 := types.Tx("hello"), types.Tx("world")
key1, key2 := tx1.Key(), tx2.Key()
txs := make([][]byte, 2)
missingKeys := map[int]types.TxKey{
0: key1,
1: key2,
}

request := NewBlockRequest(1, missingKeys, txs)

require.True(t, request.TryAddMissingTx(key1, tx1))
// cannot add the same missing tx twice
require.False(t, request.TryAddMissingTx(key1, tx1))
require.False(t, request.IsDone())

// test that we adhere to the context deadline
shortCtx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
_, err := request.WaitForBlock(shortCtx)
require.Error(t, err)

// test that all txs mean the block `IsDone`
require.True(t, request.TryAddMissingTx(key2, tx2))
require.True(t, request.IsDone())

// waiting for the block should instantly return
txs, err = request.WaitForBlock(ctx)
require.NoError(t, err)
require.Equal(t, txs, [][]byte{tx1, tx2})
}

func TestBlockRequestConcurrently(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
const numTxs = 10
allTxs := make([][]byte, numTxs)
txKeys := make([]types.TxKey, numTxs)
missingKeys := make(map[int]types.TxKey)
txs := make([][]byte, numTxs)
for i := 0; i < numTxs; i++ {
tx := types.Tx(fmt.Sprintf("tx%d", i))
allTxs[i] = tx
txKeys[i] = tx.Key()
if i%3 == 0 {
txs[i] = tx
} else {
missingKeys[i] = txKeys[i]
}
}

request := NewBlockRequest(1, missingKeys, txs)

wg := sync.WaitGroup{}
for i := 0; i < numTxs; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
request.TryAddMissingTx(txKeys[i], txs[i])
}(i)
}

// wait for the block
result, err := request.WaitForBlock(ctx)
require.NoError(t, err)
require.Len(t, result, numTxs)
for i := 0; i < numTxs; i++ {
require.Equal(t, string(result[i]), string(txs[i]))
}
wg.Wait()
}

func TestBlockFetcherSimple(t *testing.T) {
bf := NewBlockFetcher()
tx := types.Tx("hello world")
key := tx.Key()
missingKeys := map[int]types.TxKey{
0: key,
}
blockID := []byte("blockID")
req := bf.NewRequest(blockID, 1, missingKeys, make([][]byte, 1))
req2, ok := bf.GetRequest(blockID)
require.True(t, ok)
require.Equal(t, req, req2)
// a different request for the same blockID should
// return the same original request object.
req3 := bf.NewRequest(blockID, 2, missingKeys, make([][]byte, 2))
require.Equal(t, req, req3)

req4 := bf.NewRequest([]byte("differentBlockID"), 1, missingKeys, make([][]byte, 1))

bf.TryAddMissingTx(key, tx)
require.False(t, req4.TryAddMissingTx(key, tx))
require.True(t, req.IsDone())
require.Len(t, bf.requests, 2)
}

func TestBlockFetcherConcurrentRequests(t *testing.T) {
var (
bf = NewBlockFetcher()
numBlocks = 5
numRequestsPerBlock = 5
numTxs = 5
requestWG = sync.WaitGroup{}
goRoutinesWG = sync.WaitGroup{}
allTxs = make([][]byte, numTxs)
txs = make([][]byte, numTxs)
missingKeys = make(map[int]types.TxKey)
)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

for i := 0; i < numTxs; i++ {
tx := types.Tx(fmt.Sprintf("tx%d", i))
allTxs[i] = tx
if i%3 == 0 {
txs[i] = tx
} else {
missingKeys[i] = tx.Key()
}
}

for i := 0; i < numBlocks; i++ {
requestWG.Add(1)
for j := 0; j < numRequestsPerBlock; j++ {
goRoutinesWG.Add(1)
go func(blockID []byte, routine int) {
defer goRoutinesWG.Done()
// create a copy of the missingKeys and txs
mk := make(map[int]types.TxKey)
for i, k := range missingKeys {
mk[i] = k
}
txsCopy := make([][]byte, len(txs))
copy(txsCopy, txs)
request := bf.NewRequest(blockID, 1, mk, txs)
if routine == 0 {
requestWG.Done()
}
_, _ = request.WaitForBlock(ctx)
}([]byte(fmt.Sprintf("blockID%d", i)), j)
}
goRoutinesWG.Add(1)
go func() {
defer goRoutinesWG.Done()
// Wait until all the request have started
requestWG.Wait()
for _, tx := range allTxs {
bf.TryAddMissingTx(types.Tx(tx).Key(), tx)
}
}()
}
goRoutinesWG.Wait()

for i := 0; i < numBlocks; i++ {
blockID := []byte(fmt.Sprintf("blockID%d", i))
request, ok := bf.GetRequest(blockID)
require.True(t, ok)
require.True(t, request.IsDone())
result, err := request.WaitForBlock(ctx)
require.NoError(t, err)
require.Equal(t, result, txs)
}
}

func TestFetchTxsFromKeys(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
reactor, pool := setupReactor(t)

numTxs := 10
txs := make([][]byte, numTxs)
keys := make([][]byte, numTxs)
peer := genPeer()
blockID := tmhash.Sum([]byte("blockID"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the block id used to check anything or does it just serve as an index?

wg := sync.WaitGroup{}
for i := 0; i < numTxs; i++ {
tx := newDefaultTx(fmt.Sprintf("tx%d", i))
txs[i] = tx
key := tx.Key()
keys[i] = key[:]
// every 1 in 3 transactions proposed in the block, the node
// already has in their mempool and doesn't need to fetch
if i%3 == 0 {
t.Log("adding tx to mempool", i)
err := pool.CheckTx(tx, nil, mempool.TxInfo{})
require.NoError(t, err)
} else {
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
reactor.ReceiveEnvelope(p2p.Envelope{
Src: peer,
Message: &memproto.Txs{Txs: [][]byte{tx}},
ChannelID: mempool.MempoolChannel,
})
}()
}
}

reactor.InitPeer(peer)

go func() {
reactor.ReceiveEnvelope(p2p.Envelope{
Src: peer,
Message: &memproto.Txs{Txs: txs},
ChannelID: mempool.MempoolChannel,
})
}()
Comment on lines +224 to +230
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we intenting to send all the txs to ourselves? I'm assuming no, since we are explicitly delaying the txs earlier and we can delete this and the test still passes.


resultTxs, err := reactor.FetchTxsFromKeys(ctx, blockID, keys)
require.NoError(t, err)
require.Equal(t, len(txs), len(resultTxs))
for idx, tx := range resultTxs {
require.Equal(t, txs[idx], tx)
}
repeatResult, err := reactor.FetchTxsFromKeys(ctx, blockID, keys)
require.NoError(t, err)
require.Equal(t, resultTxs, repeatResult)
wg.Wait()
Comment on lines +232 to +241
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does repeating and comparing test? can we document this if its something not obvious or benign?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be to hit the cache

}
10 changes: 10 additions & 0 deletions mempool/cat/pool.go
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ package cat
import (
"errors"
"fmt"
"path/filepath"
"runtime"
"sort"
"sync"
@@ -13,6 +14,7 @@ import (
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
tmos "github.com/tendermint/tendermint/libs/os"
"github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
@@ -52,6 +54,7 @@ type TxPool struct {
config *config.MempoolConfig
proxyAppConn proxy.AppConnMempool
metrics *mempool.Metrics
jsonMetrics *mempool.JSONMetrics

// these values are modified once per height
updateMtx sync.Mutex
@@ -86,6 +89,12 @@ func NewTxPool(
height int64,
options ...TxPoolOption,
) *TxPool {
// set up the directory for tracking metrics
path := filepath.Join(cfg.RootDir, "data", "mempool")
if err := tmos.EnsureDir(path, 0700); err != nil {
panic(err)
}

txmp := &TxPool{
logger: logger,
config: cfg,
@@ -99,6 +108,7 @@ func NewTxPool(
store: newStore(),
broadcastCh: make(chan *wrappedTx),
txsToBeBroadcast: make([]types.TxKey, 0),
jsonMetrics: mempool.NewJSONMetrics(path),
}

for _, opt := range options {
25 changes: 15 additions & 10 deletions mempool/cat/reactor.go
Original file line number Diff line number Diff line change
@@ -37,11 +37,12 @@ const (
// spec under /.spec.md
type Reactor struct {
p2p.BaseReactor
opts *ReactorOptions
mempool *TxPool
ids *mempoolIDs
requests *requestScheduler
traceClient *trace.Client
opts *ReactorOptions
mempool *TxPool
ids *mempoolIDs
requests *requestScheduler
blockFetcher *blockFetcher
traceClient *trace.Client
}

type ReactorOptions struct {
@@ -87,11 +88,12 @@ func NewReactor(mempool *TxPool, opts *ReactorOptions) (*Reactor, error) {
return nil, err
}
memR := &Reactor{
opts: opts,
mempool: mempool,
ids: newMempoolIDs(),
requests: newRequestScheduler(opts.MaxGossipDelay, defaultGlobalRequestTimeout),
traceClient: &trace.Client{},
opts: opts,
mempool: mempool,
ids: newMempoolIDs(),
requests: newRequestScheduler(opts.MaxGossipDelay, defaultGlobalRequestTimeout),
blockFetcher: NewBlockFetcher(),
traceClient: &trace.Client{},
}
memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR)
return memR, nil
@@ -256,6 +258,9 @@ func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) {
memR.Logger.Info("Could not add tx", "txKey", key, "err", err)
return
}
// If a block has been proposed with this transaction and
// consensus was waiting for it, it will now be published.
memR.blockFetcher.TryAddMissingTx(key, tx)
if !memR.opts.ListenOnly {
// We broadcast only transactions that we deem valid and actually have in our mempool.
memR.broadcastSeenTx(key)
50 changes: 50 additions & 0 deletions mempool/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
package mempool

import (
"encoding/json"
"fmt"
"path/filepath"
"sync"
"time"

"github.com/tendermint/tendermint/libs/os"
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/discard"
"github.com/go-kit/kit/metrics/prometheus"
@@ -154,3 +161,46 @@ func NopMetrics() *Metrics {
RerequestedTxs: discard.NewCounter(),
}
}

type JSONMetrics struct {
dir string
interval int
sync.Mutex
StartTime time.Time
EndTime time.Time
Blocks uint64
Transactions []uint64
TransactionsMissing []uint64
// measured in ms
TimeTakenFetchingTxs []uint64
}

func NewJSONMetrics(dir string) *JSONMetrics {
return &JSONMetrics{
dir: dir,
StartTime: time.Now().UTC(),
Transactions: make([]uint64, 0),
TransactionsMissing: make([]uint64, 0),
TimeTakenFetchingTxs: make([]uint64, 0),
}
}

func (m *JSONMetrics) Save() {
m.EndTime = time.Now().UTC()
content, err := json.MarshalIndent(m, "", " ")
if err != nil {
panic(err)
}
path := filepath.Join(m.dir, fmt.Sprintf("metrics_%d.json", m.interval))
os.MustWriteFile(path, content, 0644)
m.StartTime = m.EndTime
m.interval++
m.reset()
}

func (m *JSONMetrics) reset() {
m.Blocks = 0
m.Transactions = make([]uint64, 0)
m.TransactionsMissing = make([]uint64, 0)
m.TimeTakenFetchingTxs = make([]uint64, 0)
}
74 changes: 14 additions & 60 deletions node/node.go
Original file line number Diff line number Diff line change
@@ -33,9 +33,7 @@ import (
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/light"
mempl "github.com/tendermint/tendermint/mempool"
mempoolv2 "github.com/tendermint/tendermint/mempool/cat"
mempoolv0 "github.com/tendermint/tendermint/mempool/v0"
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
"github.com/tendermint/tendermint/mempool/cat"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/pex"
"github.com/tendermint/tendermint/privval"
@@ -379,22 +377,22 @@ func createMempoolAndMempoolReactor(
memplMetrics *mempl.Metrics,
logger log.Logger,
traceClient *trace.Client,
) (mempl.Mempool, p2p.Reactor) {
) (*cat.TxPool, *cat.Reactor) {
switch config.Mempool.Version {
case cfg.MempoolV2:
mp := mempoolv2.NewTxPool(
mp := cat.NewTxPool(
logger,
config.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempoolv2.WithMetrics(memplMetrics),
mempoolv2.WithPreCheck(sm.TxPreCheck(state)),
mempoolv2.WithPostCheck(sm.TxPostCheck(state)),
cat.WithMetrics(memplMetrics),
cat.WithPreCheck(sm.TxPreCheck(state)),
cat.WithPostCheck(sm.TxPostCheck(state)),
)

reactor, err := mempoolv2.NewReactor(
reactor, err := cat.NewReactor(
mp,
&mempoolv2.ReactorOptions{
&cat.ReactorOptions{
ListenOnly: !config.Mempool.Broadcast,
MaxTxSize: config.Mempool.MaxTxBytes,
TraceClient: traceClient,
@@ -410,52 +408,6 @@ func createMempoolAndMempoolReactor(
}
reactor.SetLogger(logger)

return mp, reactor
case cfg.MempoolV1:
mp := mempoolv1.NewTxMempool(
logger,
config.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempoolv1.WithMetrics(memplMetrics),
mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
mempoolv1.WithTraceClient(traceClient),
)

reactor := mempoolv1.NewReactor(
config.Mempool,
mp,
traceClient,
)
if config.Consensus.WaitForTxs() {
mp.EnableTxsAvailable()
}
reactor.SetLogger(logger)

return mp, reactor

case cfg.MempoolV0:
mp := mempoolv0.NewCListMempool(
config.Mempool,
proxyApp.Mempool(),
state.LastBlockHeight,
mempoolv0.WithMetrics(memplMetrics),
mempoolv0.WithPreCheck(sm.TxPreCheck(state)),
mempoolv0.WithPostCheck(sm.TxPostCheck(state)),
)

mp.SetLogger(logger)

reactor := mempoolv0.NewReactor(
config.Mempool,
mp,
)
if config.Consensus.WaitForTxs() {
mp.EnableTxsAvailable()
}
reactor.SetLogger(logger)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it won't let me highlight the part I want, the default will return nil (if not selecting v2 in the config), which could be confusing. The consensus reactor will pick a nil txFetcher, which uses will just return the keys instead of actually getting the txs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It assumes that compact blocks is not being used in which case the keys are the full transactions

return mp, reactor

default:
@@ -508,7 +460,8 @@ func createConsensusReactor(config *cfg.Config,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore sm.BlockStore,
mempool mempl.Mempool,
catpool *cat.TxPool,
catReactor *cat.Reactor,
evidencePool *evidence.Pool,
privValidator types.PrivValidator,
csMetrics *cs.Metrics,
@@ -522,7 +475,8 @@ func createConsensusReactor(config *cfg.Config,
state.Copy(),
blockExec,
blockStore,
mempool,
catpool,
catReactor,
evidencePool,
cs.StateMetrics(csMetrics),
cs.SetTraceClient(traceClient),
@@ -902,7 +856,7 @@ func NewNode(config *cfg.Config,
csMetrics.FastSyncing.Set(1)
}
consensusReactor, consensusState := createConsensusReactor(
config, state, blockExec, blockStore, mempool, evidencePool,
config, state, blockExec, blockStore, mempool, mempoolReactor, evidencePool,
privValidator, csMetrics, stateSync || fastSync, eventBus, consensusLogger, influxdbClient,
)

@@ -1483,7 +1437,7 @@ func makeNodeInfo(
}

if config.Mempool.Version == cfg.MempoolV2 {
nodeInfo.Channels = append(nodeInfo.Channels, mempoolv2.MempoolStateChannel)
nodeInfo.Channels = append(nodeInfo.Channels, cat.MempoolStateChannel)
}

lAddr := config.P2P.ExternalAddress
4 changes: 2 additions & 2 deletions node/node_test.go
Original file line number Diff line number Diff line change
@@ -318,7 +318,7 @@ func TestCreateProposalBlock(t *testing.T) {
)

commit := types.NewCommit(height-1, 0, types.BlockID{}, nil)
block, _ := blockExec.CreateProposalBlock(
block := blockExec.CreateProposalBlock(
height,
state, commit,
proposerAddr,
@@ -408,7 +408,7 @@ func TestMaxProposalBlockSize(t *testing.T) {
)

commit := types.NewCommit(height-1, 0, types.BlockID{}, nil)
block, _ := blockExec.CreateProposalBlock(
block := blockExec.CreateProposalBlock(
height,
state, commit,
proposerAddr,
5 changes: 3 additions & 2 deletions state/execution.go
Original file line number Diff line number Diff line change
@@ -97,7 +97,7 @@ func (blockExec *BlockExecutor) CreateProposalBlock(
height int64,
state State, commit *types.Commit,
proposerAddr []byte,
) (*types.Block, *types.PartSet) {
) *types.Block {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why get rid of the partset here?


maxBytes := state.ConsensusParams.Block.MaxBytes
maxGas := state.ConsensusParams.Block.MaxGas
@@ -158,13 +158,14 @@ func (blockExec *BlockExecutor) CreateProposalBlock(
panic(err)
}

return state.MakeBlock(
block, _ := state.MakeBlock(
height,
newData,
commit,
evidence,
proposerAddr,
)
return block
}

func (blockExec *BlockExecutor) ProcessProposal(
4 changes: 3 additions & 1 deletion test/maverick/consensus/state.go
Original file line number Diff line number Diff line change
@@ -1284,7 +1284,9 @@ func (cs *State) createProposalBlock() (block *types.Block, blockParts *types.Pa
}
proposerAddr := cs.privValidatorPubKey.Address()

return cs.blockExec.CreateProposalBlock(cs.Height, cs.state, commit, proposerAddr)
block = cs.blockExec.CreateProposalBlock(cs.Height, cs.state, commit, proposerAddr)
blockParts = block.MakePartSet(types.BlockPartSizeBytes)
return
}

// Enter: any +2/3 prevotes at next round.