Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support recovering parts from the mempool #1664

Merged
merged 20 commits into from
Mar 12, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion consensus/byzantine_test.go
Original file line number Diff line number Diff line change
@@ -116,7 +116,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)

propagationReactor := propagation.NewReactor(nodeKey.ID(), nil, blockStore)
propagationReactor := propagation.NewReactor(nodeKey.ID(), nil, blockStore, mempool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, propagationReactor, mempool, evpool)
cs.SetLogger(cs.Logger)
// set private validator
2 changes: 1 addition & 1 deletion consensus/common_test.go
Original file line number Diff line number Diff line change
@@ -453,7 +453,7 @@ func newStateWithConfigAndBlockStore(
if err != nil {
panic(err)
}
propagator := propagation.NewReactor(key.ID(), nil, blockStore)
propagator := propagation.NewReactor(key.ID(), nil, blockStore, mempool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, propagator, mempool, evpool)
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetPrivValidator(pv)
57 changes: 57 additions & 0 deletions consensus/propagation/commitment.go
Original file line number Diff line number Diff line change
@@ -3,6 +3,8 @@ package propagation
import (
"fmt"

"github.com/gogo/protobuf/proto"
"github.com/tendermint/tendermint/proto/tendermint/mempool"
"github.com/tendermint/tendermint/proto/tendermint/propagation"

proptypes "github.com/tendermint/tendermint/consensus/propagation/types"
@@ -88,6 +90,61 @@ func (blockProp *Reactor) handleCompactBlock(cb *proptypes.CompactBlock, peer p2
if added {
blockProp.broadcastCompactBlock(cb, peer)
}

// check if we have any transactions that are in the compact block
parts := blockProp.compactBlockToParts(cb)
_, partSet, found := blockProp.GetProposal(cb.Proposal.Height, cb.Proposal.Round)
if !found {
return
}
for _, part := range parts {
added, err := partSet.AddPartWithoutProof(part)
if err != nil {
blockProp.Logger.Error("failed to add locally recovered part", "err", err)
continue
}
if !added {
blockProp.Logger.Error("failed to add locally recovered part", "part", part.Index)
continue
}
}
}

// compactBlockToParts queries the mempool to see if we can recover any block parts locally.
func (blockProp *Reactor) compactBlockToParts(cb *proptypes.CompactBlock) []*types.Part {
// find the compact block transactions that exist in our mempool
txsFound := make([]proptypes.UnmarshalledTx, 0)
for _, txMetaData := range cb.Blobs {
txKey, err := types.TxKeyFromBytes(txMetaData.Hash)
if err != nil {
blockProp.Logger.Error("failed to decode tx key", "err", err, "tx", txMetaData)
continue
}
tx, has := blockProp.mempool.GetTxByKey(txKey)
if !has {
continue
}

protoTxs := mempool.Txs{Txs: [][]byte{tx}}
marshalledTx, err := proto.Marshal(&protoTxs)
if err != nil {
blockProp.Logger.Error("failed to encode tx", "err", err, "tx", txMetaData)
continue
}

txsFound = append(txsFound, proptypes.UnmarshalledTx{MetaData: txMetaData, Key: txKey, TxBytes: marshalledTx})
}
if len(txsFound) == 0 {
// no compact block transaction was found locally
return nil
}

parts := proptypes.TxsToParts(txsFound)
if len(parts) > 0 {
blockProp.Logger.Info("recovered parts from the mempool", "number of parts", len(parts))
}

return parts
}

// broadcastProposal gossips the provided proposal to all peers. This should
67 changes: 67 additions & 0 deletions consensus/propagation/commitment_test.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,10 @@ import (
"testing"
"time"

dbm "github.com/cometbft/cometbft-db"
"github.com/tendermint/tendermint/pkg/trace"
"github.com/tendermint/tendermint/store"

"github.com/stretchr/testify/assert"

"github.com/stretchr/testify/require"
@@ -71,3 +75,66 @@ func TestPropose(t *testing.T) {
// the parts == total because we only have 2 peers
assert.Equal(t, haves.Size(), int(partSet.Total()*2))
}

// TestRecoverPartsLocally provides a set of transactions to the mempool
// and attempts to build the block parts from them.
func TestRecoverPartsLocally(t *testing.T) {
cleanup, _, sm := state.SetupTestCase(t)
t.Cleanup(func() {
cleanup(t)
})

numberOfTxs := 10
txsMap := make(map[types.TxKey]types.Tx)
txs := make([]types.Tx, numberOfTxs)
for i := 0; i < numberOfTxs; i++ {
tx := types.Tx(cmtrand.Bytes(int(types.BlockPartSizeBytes / 3)))
txKey, err := types.TxKeyFromBytes(tx.Hash())
require.NoError(t, err)
txsMap[txKey] = tx
txs[i] = tx
}

blockStore := store.NewBlockStore(dbm.NewMemDB())
blockPropR := NewReactor("", trace.NoOpTracer(), blockStore, mockMempool{
txs: txsMap,
})

data := types.Data{Txs: txs}

block, partSet := sm.MakeBlock(1, data, types.RandCommit(time.Now()), []types.Evidence{}, cmtrand.Bytes(20))
id := types.BlockID{Hash: block.Hash(), PartSetHeader: partSet.Header()}
prop := types.NewProposal(block.Height, 0, 0, id)
prop.Signature = cmtrand.Bytes(64)

metaData := make([]proptypes.TxMetaData, len(partSet.TxPos))
for i, pos := range partSet.TxPos {
metaData[i] = proptypes.TxMetaData{
Start: uint32(pos.Start),
End: uint32(pos.End),
Hash: block.Txs[i].Hash(),
}
}

blockPropR.ProposeBlock(prop, partSet, metaData)

_, actualParts, _ := blockPropR.GetProposal(prop.Height, prop.Round)

// we should be able to recover all the parts after where the transactions
// are encoded
startingPartIndex := metaData[0].Start/types.BlockPartSizeBytes + 1

for i := startingPartIndex; i < partSet.Total()-1; i++ {
assert.Equal(t, partSet.GetPart(int(i)).Bytes, actualParts.GetPart(int(i)).Bytes)
}
}

var _ Mempool = &mockMempool{}

type mockMempool struct {
txs map[types.TxKey]types.Tx
}

func (m mockMempool) GetTxByKey(key types.TxKey) (types.Tx, bool) {
return m.txs[key], true
}
7 changes: 7 additions & 0 deletions consensus/propagation/mempool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package propagation

import "github.com/tendermint/tendermint/types"

type Mempool interface {
GetTxByKey(key types.TxKey) (types.Tx, bool)
}
7 changes: 6 additions & 1 deletion consensus/propagation/reactor.go
Original file line number Diff line number Diff line change
@@ -40,12 +40,16 @@ type Reactor struct {
// block data for gossiping.
*ProposalCache

// mempool access to read the transactions by hash from the mempool
// and eventually remove it.
mempool Mempool

mtx *sync.RWMutex
traceClient trace.Tracer
self p2p.ID
}

func NewReactor(self p2p.ID, tracer trace.Tracer, store *store.BlockStore, options ...ReactorOption) *Reactor {
func NewReactor(self p2p.ID, tracer trace.Tracer, store *store.BlockStore, mempool Mempool, options ...ReactorOption) *Reactor {
if tracer == nil {
// TODO not pass nil. instead, use a NOOP and allow the tracer to be passed as an option
tracer = trace.NoOpTracer()
@@ -56,6 +60,7 @@ func NewReactor(self p2p.ID, tracer trace.Tracer, store *store.BlockStore, optio
peerstate: make(map[p2p.ID]*PeerState),
mtx: &sync.RWMutex{},
ProposalCache: NewProposalCache(store),
mempool: mempool,
}
reactor.BaseReactor = *p2p.NewBaseReactor("BlockProp", reactor, p2p.WithIncomingQueueSize(ReactorIncomingMessageQueueSize))

2 changes: 1 addition & 1 deletion consensus/propagation/reactor_test.go
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@ import (

func newPropagationReactor(s *p2p.Switch) *Reactor {
blockStore := store.NewBlockStore(dbm.NewMemDB())
blockPropR := NewReactor(s.NetAddress().ID, trace.NoOpTracer(), blockStore)
blockPropR := NewReactor(s.NetAddress().ID, trace.NoOpTracer(), blockStore, mockMempool{})
blockPropR.SetSwitch(s)

return blockPropR
101 changes: 101 additions & 0 deletions consensus/propagation/types/unmarshalling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package types

import (
"sort"

"github.com/tendermint/tendermint/crypto/merkle"
"github.com/tendermint/tendermint/types"
)

// TxsToParts takes a set of mempool transactions and tries to create parts from them.
func TxsToParts(txsFound []UnmarshalledTx) []*types.Part {
// sort the txs found by start index
sort.Slice(txsFound, func(i, j int) bool {
return txsFound[i].MetaData.Start < txsFound[j].MetaData.Start
})

// the part slice we will return
parts := make([]*types.Part, 0)

// the cumulative bytes slice will contain the transaction bytes along with
// any left bytes from previous contiguous transactions
cumulativeBytes := make([]byte, 0)
// the start index of where the cumulative bytes start
cumulativeBytesStartIndex := -1

for index := 0; index < len(txsFound); index++ {
// the transaction we're parsing
currentTx := txsFound[index]
// the inclusive part index where the transaction starts
currentPartIndex := currentTx.MetaData.Start / types.BlockPartSizeBytes
currentPartStartIndex := currentPartIndex * types.BlockPartSizeBytes
// the exclusive index of the byte where the current part ends
currentPartEndIndex := (currentPartIndex + 1) * types.BlockPartSizeBytes

if len(cumulativeBytes) == 0 {
// an empty cumulative bytes means the current transaction start
// is where the cumulative bytes will start
cumulativeBytesStartIndex = int(currentTx.MetaData.Start)
}
// append the current transaction bytes to the cumulative bytes slice
cumulativeBytes = append(cumulativeBytes, currentTx.TxBytes...)

// This case checks whether the cumulative bytes start index
// starts at the current part.
// If not, this means the current part, even if we might have some of its data,
// is not recoverable, and we can truncate it.
if int(currentPartStartIndex) < cumulativeBytesStartIndex {
// relative part end index
relativePartEndIndex := int(currentPartEndIndex) - cumulativeBytesStartIndex
if relativePartEndIndex > len(cumulativeBytes) {
// case where the cumulative bytes length is small.
// this happens with small transactions.
cumulativeBytes = cumulativeBytes[:0]
cumulativeBytesStartIndex = -1
} else {
// slice the cumulative bytes to start at exactly the part end index
cumulativeBytes = cumulativeBytes[relativePartEndIndex:]
// set the cumulative bytes start index to the current part end index
cumulativeBytesStartIndex = int(currentPartEndIndex)
}
}

// parse the parts we gathered so far
for len(cumulativeBytes) >= int(types.BlockPartSizeBytes) {
// get the part's bytes
partBz := cumulativeBytes[:types.BlockPartSizeBytes]
// create the part
part := &types.Part{
Index: uint32(cumulativeBytesStartIndex) / types.BlockPartSizeBytes,
Bytes: partBz,
Proof: merkle.Proof{}, // empty proof because we don't have the other leaves to create a valid one
}
parts = append(parts, part)
// slice this part off the cumulative bytes
cumulativeBytes = cumulativeBytes[types.BlockPartSizeBytes:]
// set cumulative start index
cumulativeBytesStartIndex += int(types.BlockPartSizeBytes)
}

// check whether the next transaction is a contingent to the current one.
if index+1 < len(txsFound) {
nextTx := txsFound[index+1]
if currentTx.MetaData.End != nextTx.MetaData.Start {
// the next transaction is not contingent, we can reset the cumulative bytes.
cumulativeBytes = cumulativeBytes[:0]
cumulativeBytesStartIndex = -1
}
}
}

return parts
}

// UnmarshalledTx is an intermediary type that allows keeping the transaction
// metadata, its Key and the actual tx bytes. This will be used to create the
// parts from the local txs.
type UnmarshalledTx struct {
MetaData TxMetaData
Key types.TxKey
TxBytes []byte
}
126 changes: 126 additions & 0 deletions consensus/propagation/types/unmarshalling_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package types

import (
"fmt"
"testing"
"time"

"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
cmtrand "github.com/tendermint/tendermint/libs/rand"
"github.com/tendermint/tendermint/proto/tendermint/mempool"
"github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
)

// TestTxsToParts extensive testing of the txs to parts method
// that recovers the parts from mempool txs.
func TestTxsToParts(t *testing.T) {
cleanup, _, sm := state.SetupTestCase(t)
t.Cleanup(func() {
cleanup(t)
})
numberOfTxs := 16 // increasing the number of transactions increases the test time exponentially

tests := []struct {
name string
txs []types.Tx
}{
{
name: "txs size == types.BlockPartSizeBytes/3",
txs: func() []types.Tx {
txs := make([]types.Tx, 0, numberOfTxs)
for i := 0; i < numberOfTxs; i++ {
txs = append(txs, cmtrand.Bytes(int(types.BlockPartSizeBytes/3)))
}
return txs
}(),
},
{
name: "txs size == types.BlockPartSizeBytes",
txs: func() []types.Tx {
txs := make([]types.Tx, 0, numberOfTxs)
for i := 0; i < numberOfTxs; i++ {
txs = append(txs, cmtrand.Bytes(int(types.BlockPartSizeBytes)))
}
return txs
}(),
},
{
name: "txs size == types.BlockPartSizeBytes * 3",
txs: func() []types.Tx {
txs := make([]types.Tx, 0, numberOfTxs)
for i := 0; i < numberOfTxs; i++ {
txs = append(txs, cmtrand.Bytes(int(types.BlockPartSizeBytes)*3))
}
return txs
}(),
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
data := types.Data{Txs: test.txs}
block, partSet := sm.MakeBlock(1, data, types.RandCommit(time.Now()), []types.Evidence{}, cmtrand.Bytes(20))

txsFound := make([]UnmarshalledTx, len(partSet.TxPos))
for i, pos := range partSet.TxPos {
// calculate the protobuf overhead
protoTxs := mempool.Txs{Txs: [][]byte{data.Txs[i]}}
marshalledTx, err := proto.Marshal(&protoTxs)
require.NoError(t, err)

txKey, err := types.TxKeyFromBytes(block.Txs[i].Hash())
require.NoError(t, err)
txsFound[i] = UnmarshalledTx{
MetaData: TxMetaData{
Start: uint32(pos.Start),
End: uint32(pos.End),
Hash: block.Txs[i].Hash(),
},
Key: txKey,
TxBytes: marshalledTx,
}
}

// generate all the possible combinations for the provided number of transactions
txsCombinations := GenerateTxsCombinations(numberOfTxs)

for _, combination := range txsCombinations {
t.Run(fmt.Sprintf("%v", combination), func(t *testing.T) {
combinationTxs := make([]UnmarshalledTx, 0)
for index, val := range combination {
if val == 1 {
combinationTxs = append(combinationTxs, txsFound[index])
}
}

parts := TxsToParts(combinationTxs)

for _, part := range parts {
expectedPart := partSet.GetPart(int(part.Index))
assert.Equal(t, expectedPart.Bytes, part.Bytes)
}
})
}
})
}
}

// GenerateTxsCombinations generates all relevant transaction placements
// in a block given a number of transactions.
func GenerateTxsCombinations(n int) [][]int {
total := 1 << n // 2^n combinations
result := make([][]int, 0)

for i := 0; i < total; i++ {
bitArray := make([]int, n)
for j := 0; j < n; j++ {
// Extract the bit at position j
bitArray[j] = (i >> j) & 1
}
result = append(result, bitArray)
}
return result
}
2 changes: 1 addition & 1 deletion consensus/reactor_test.go
Original file line number Diff line number Diff line change
@@ -221,7 +221,7 @@ func TestReactorWithEvidence(t *testing.T) {
if err != nil {
panic(err)
}
propagator := propagation.NewReactor(key.ID(), nil, blockStore)
propagator := propagation.NewReactor(key.ID(), nil, blockStore, mempool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, propagator, mempool, evpool2)
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetPrivValidator(pv)
2 changes: 1 addition & 1 deletion consensus/replay_file.go
Original file line number Diff line number Diff line change
@@ -339,7 +339,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
panic(err)
}
// TODO pass a tracer from here
propagator := propagation.NewReactor(key.ID(), nil, blockStore)
propagator := propagation.NewReactor(key.ID(), nil, blockStore, mempool)
consensusState := NewState(csConfig, state.Copy(), blockExec,
blockStore, propagator, mempool, evpool)

2 changes: 1 addition & 1 deletion consensus/wal_generator.go
Original file line number Diff line number Diff line change
@@ -92,7 +92,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) {
if err != nil {
panic(err)
}
propagator := propagation.NewReactor(key.ID(), nil, blockStore)
propagator := propagation.NewReactor(key.ID(), nil, blockStore, mempool)
consensusState := NewState(config.Consensus, state.Copy(), blockExec, blockStore, propagator, mempool, evpool)
consensusState.SetLogger(logger)
consensusState.SetEventBus(eventBus)
2 changes: 1 addition & 1 deletion node/node.go
Original file line number Diff line number Diff line change
@@ -926,7 +926,7 @@ func NewNodeWithContext(ctx context.Context,
return nil, fmt.Errorf("could not create blockchain reactor: %w", err)
}

propagationReactor := propagation.NewReactor(nodeKey.ID(), tracer, blockStore)
propagationReactor := propagation.NewReactor(nodeKey.ID(), tracer, blockStore, mempool)

// Make ConsensusReactor. Don't enable fully if doing a state sync and/or fast sync first.
// FIXME We need to update metrics here, since other reactors don't have access to them.
3 changes: 2 additions & 1 deletion types/marshalling.go
Original file line number Diff line number Diff line change
@@ -12,7 +12,8 @@ import (
// for a given txs field.
type TxPosition struct {
Start int
End int
// End exclusive position of the transaction
End int
}

// MarshalBlockWithTxPositions marshals the given Block message using protobuf
Loading