Skip to content

Commit f5b67be

Browse files
rach-idevan-forbes
andauthoredMar 12, 2025··
feat: support recovering parts from the mempool (#1664)
## Description Closes #1661 #### PR checklist - [ ] Tests written/updated - [ ] Changelog entry added in `.changelog` (we use [unclog](https://github.com/informalsystems/unclog) to manage our changelog) - [ ] Updated relevant documentation (`docs/` or `spec/`) and code comments --------- Co-authored-by: Evan Forbes <[email protected]>

14 files changed

+373
-9
lines changed
 

‎consensus/byzantine_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
116116
// Make State
117117
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
118118

119-
propagationReactor := propagation.NewReactor(nodeKey.ID(), nil, blockStore)
119+
propagationReactor := propagation.NewReactor(nodeKey.ID(), nil, blockStore, mempool)
120120
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, propagationReactor, mempool, evpool)
121121
cs.SetLogger(cs.Logger)
122122
// set private validator

‎consensus/common_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,7 @@ func newStateWithConfigAndBlockStore(
453453
if err != nil {
454454
panic(err)
455455
}
456-
propagator := propagation.NewReactor(key.ID(), nil, blockStore)
456+
propagator := propagation.NewReactor(key.ID(), nil, blockStore, mempool)
457457
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, propagator, mempool, evpool)
458458
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
459459
cs.SetPrivValidator(pv)

‎consensus/propagation/commitment.go

+57
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package propagation
33
import (
44
"fmt"
55

6+
"github.com/gogo/protobuf/proto"
7+
"github.com/tendermint/tendermint/proto/tendermint/mempool"
68
"github.com/tendermint/tendermint/proto/tendermint/propagation"
79

810
proptypes "github.com/tendermint/tendermint/consensus/propagation/types"
@@ -88,6 +90,61 @@ func (blockProp *Reactor) handleCompactBlock(cb *proptypes.CompactBlock, peer p2
8890
if added {
8991
blockProp.broadcastCompactBlock(cb, peer)
9092
}
93+
94+
// check if we have any transactions that are in the compact block
95+
parts := blockProp.compactBlockToParts(cb)
96+
_, partSet, found := blockProp.GetProposal(cb.Proposal.Height, cb.Proposal.Round)
97+
if !found {
98+
return
99+
}
100+
for _, part := range parts {
101+
added, err := partSet.AddPartWithoutProof(part)
102+
if err != nil {
103+
blockProp.Logger.Error("failed to add locally recovered part", "err", err)
104+
continue
105+
}
106+
if !added {
107+
blockProp.Logger.Error("failed to add locally recovered part", "part", part.Index)
108+
continue
109+
}
110+
}
111+
}
112+
113+
// compactBlockToParts queries the mempool to see if we can recover any block parts locally.
114+
func (blockProp *Reactor) compactBlockToParts(cb *proptypes.CompactBlock) []*types.Part {
115+
// find the compact block transactions that exist in our mempool
116+
txsFound := make([]proptypes.UnmarshalledTx, 0)
117+
for _, txMetaData := range cb.Blobs {
118+
txKey, err := types.TxKeyFromBytes(txMetaData.Hash)
119+
if err != nil {
120+
blockProp.Logger.Error("failed to decode tx key", "err", err, "tx", txMetaData)
121+
continue
122+
}
123+
tx, has := blockProp.mempool.GetTxByKey(txKey)
124+
if !has {
125+
continue
126+
}
127+
128+
protoTxs := mempool.Txs{Txs: [][]byte{tx}}
129+
marshalledTx, err := proto.Marshal(&protoTxs)
130+
if err != nil {
131+
blockProp.Logger.Error("failed to encode tx", "err", err, "tx", txMetaData)
132+
continue
133+
}
134+
135+
txsFound = append(txsFound, proptypes.UnmarshalledTx{MetaData: txMetaData, Key: txKey, TxBytes: marshalledTx})
136+
}
137+
if len(txsFound) == 0 {
138+
// no compact block transaction was found locally
139+
return nil
140+
}
141+
142+
parts := proptypes.TxsToParts(txsFound)
143+
if len(parts) > 0 {
144+
blockProp.Logger.Info("recovered parts from the mempool", "number of parts", len(parts))
145+
}
146+
147+
return parts
91148
}
92149

93150
// broadcastProposal gossips the provided proposal to all peers. This should

‎consensus/propagation/commitment_test.go

+67
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ import (
44
"testing"
55
"time"
66

7+
dbm "github.com/cometbft/cometbft-db"
8+
"github.com/tendermint/tendermint/pkg/trace"
9+
"github.com/tendermint/tendermint/store"
10+
711
"github.com/stretchr/testify/assert"
812

913
"github.com/stretchr/testify/require"
@@ -71,3 +75,66 @@ func TestPropose(t *testing.T) {
7175
// the parts == total because we only have 2 peers
7276
assert.Equal(t, haves.Size(), int(partSet.Total()*2))
7377
}
78+
79+
// TestRecoverPartsLocally provides a set of transactions to the mempool
80+
// and attempts to build the block parts from them.
81+
func TestRecoverPartsLocally(t *testing.T) {
82+
cleanup, _, sm := state.SetupTestCase(t)
83+
t.Cleanup(func() {
84+
cleanup(t)
85+
})
86+
87+
numberOfTxs := 10
88+
txsMap := make(map[types.TxKey]types.Tx)
89+
txs := make([]types.Tx, numberOfTxs)
90+
for i := 0; i < numberOfTxs; i++ {
91+
tx := types.Tx(cmtrand.Bytes(int(types.BlockPartSizeBytes / 3)))
92+
txKey, err := types.TxKeyFromBytes(tx.Hash())
93+
require.NoError(t, err)
94+
txsMap[txKey] = tx
95+
txs[i] = tx
96+
}
97+
98+
blockStore := store.NewBlockStore(dbm.NewMemDB())
99+
blockPropR := NewReactor("", trace.NoOpTracer(), blockStore, mockMempool{
100+
txs: txsMap,
101+
})
102+
103+
data := types.Data{Txs: txs}
104+
105+
block, partSet := sm.MakeBlock(1, data, types.RandCommit(time.Now()), []types.Evidence{}, cmtrand.Bytes(20))
106+
id := types.BlockID{Hash: block.Hash(), PartSetHeader: partSet.Header()}
107+
prop := types.NewProposal(block.Height, 0, 0, id)
108+
prop.Signature = cmtrand.Bytes(64)
109+
110+
metaData := make([]proptypes.TxMetaData, len(partSet.TxPos))
111+
for i, pos := range partSet.TxPos {
112+
metaData[i] = proptypes.TxMetaData{
113+
Start: uint32(pos.Start),
114+
End: uint32(pos.End),
115+
Hash: block.Txs[i].Hash(),
116+
}
117+
}
118+
119+
blockPropR.ProposeBlock(prop, partSet, metaData)
120+
121+
_, actualParts, _ := blockPropR.GetProposal(prop.Height, prop.Round)
122+
123+
// we should be able to recover all the parts after where the transactions
124+
// are encoded
125+
startingPartIndex := metaData[0].Start/types.BlockPartSizeBytes + 1
126+
127+
for i := startingPartIndex; i < partSet.Total()-1; i++ {
128+
assert.Equal(t, partSet.GetPart(int(i)).Bytes, actualParts.GetPart(int(i)).Bytes)
129+
}
130+
}
131+
132+
var _ Mempool = &mockMempool{}
133+
134+
type mockMempool struct {
135+
txs map[types.TxKey]types.Tx
136+
}
137+
138+
func (m mockMempool) GetTxByKey(key types.TxKey) (types.Tx, bool) {
139+
return m.txs[key], true
140+
}

‎consensus/propagation/mempool.go

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package propagation
2+
3+
import "github.com/tendermint/tendermint/types"
4+
5+
type Mempool interface {
6+
GetTxByKey(key types.TxKey) (types.Tx, bool)
7+
}

‎consensus/propagation/reactor.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,16 @@ type Reactor struct {
4040
// block data for gossiping.
4141
*ProposalCache
4242

43+
// mempool access to read the transactions by hash from the mempool
44+
// and eventually remove it.
45+
mempool Mempool
46+
4347
mtx *sync.RWMutex
4448
traceClient trace.Tracer
4549
self p2p.ID
4650
}
4751

48-
func NewReactor(self p2p.ID, tracer trace.Tracer, store *store.BlockStore, options ...ReactorOption) *Reactor {
52+
func NewReactor(self p2p.ID, tracer trace.Tracer, store *store.BlockStore, mempool Mempool, options ...ReactorOption) *Reactor {
4953
if tracer == nil {
5054
// TODO not pass nil. instead, use a NOOP and allow the tracer to be passed as an option
5155
tracer = trace.NoOpTracer()
@@ -56,6 +60,7 @@ func NewReactor(self p2p.ID, tracer trace.Tracer, store *store.BlockStore, optio
5660
peerstate: make(map[p2p.ID]*PeerState),
5761
mtx: &sync.RWMutex{},
5862
ProposalCache: NewProposalCache(store),
63+
mempool: mempool,
5964
}
6065
reactor.BaseReactor = *p2p.NewBaseReactor("BlockProp", reactor, p2p.WithIncomingQueueSize(ReactorIncomingMessageQueueSize))
6166

‎consensus/propagation/reactor_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121

2222
func newPropagationReactor(s *p2p.Switch) *Reactor {
2323
blockStore := store.NewBlockStore(dbm.NewMemDB())
24-
blockPropR := NewReactor(s.NetAddress().ID, trace.NoOpTracer(), blockStore)
24+
blockPropR := NewReactor(s.NetAddress().ID, trace.NoOpTracer(), blockStore, mockMempool{})
2525
blockPropR.SetSwitch(s)
2626

2727
return blockPropR
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package types
2+
3+
import (
4+
"sort"
5+
6+
"github.com/tendermint/tendermint/crypto/merkle"
7+
"github.com/tendermint/tendermint/types"
8+
)
9+
10+
// TxsToParts takes a set of mempool transactions and tries to create parts from them.
11+
func TxsToParts(txsFound []UnmarshalledTx) []*types.Part {
12+
// sort the txs found by start index
13+
sort.Slice(txsFound, func(i, j int) bool {
14+
return txsFound[i].MetaData.Start < txsFound[j].MetaData.Start
15+
})
16+
17+
// the part slice we will return
18+
parts := make([]*types.Part, 0)
19+
20+
// the cumulative bytes slice will contain the transaction bytes along with
21+
// any left bytes from previous contiguous transactions
22+
cumulativeBytes := make([]byte, 0)
23+
// the start index of where the cumulative bytes start
24+
cumulativeBytesStartIndex := -1
25+
26+
for index := 0; index < len(txsFound); index++ {
27+
// the transaction we're parsing
28+
currentTx := txsFound[index]
29+
// the inclusive part index where the transaction starts
30+
currentPartIndex := currentTx.MetaData.Start / types.BlockPartSizeBytes
31+
currentPartStartIndex := currentPartIndex * types.BlockPartSizeBytes
32+
// the exclusive index of the byte where the current part ends
33+
currentPartEndIndex := (currentPartIndex + 1) * types.BlockPartSizeBytes
34+
35+
if len(cumulativeBytes) == 0 {
36+
// an empty cumulative bytes means the current transaction start
37+
// is where the cumulative bytes will start
38+
cumulativeBytesStartIndex = int(currentTx.MetaData.Start)
39+
}
40+
// append the current transaction bytes to the cumulative bytes slice
41+
cumulativeBytes = append(cumulativeBytes, currentTx.TxBytes...)
42+
43+
// This case checks whether the cumulative bytes start index
44+
// starts at the current part.
45+
// If not, this means the current part, even if we might have some of its data,
46+
// is not recoverable, and we can truncate it.
47+
if int(currentPartStartIndex) < cumulativeBytesStartIndex {
48+
// relative part end index
49+
relativePartEndIndex := int(currentPartEndIndex) - cumulativeBytesStartIndex
50+
if relativePartEndIndex > len(cumulativeBytes) {
51+
// case where the cumulative bytes length is small.
52+
// this happens with small transactions.
53+
cumulativeBytes = cumulativeBytes[:0]
54+
cumulativeBytesStartIndex = -1
55+
} else {
56+
// slice the cumulative bytes to start at exactly the part end index
57+
cumulativeBytes = cumulativeBytes[relativePartEndIndex:]
58+
// set the cumulative bytes start index to the current part end index
59+
cumulativeBytesStartIndex = int(currentPartEndIndex)
60+
}
61+
}
62+
63+
// parse the parts we gathered so far
64+
for len(cumulativeBytes) >= int(types.BlockPartSizeBytes) {
65+
// get the part's bytes
66+
partBz := cumulativeBytes[:types.BlockPartSizeBytes]
67+
// create the part
68+
part := &types.Part{
69+
Index: uint32(cumulativeBytesStartIndex) / types.BlockPartSizeBytes,
70+
Bytes: partBz,
71+
Proof: merkle.Proof{}, // empty proof because we don't have the other leaves to create a valid one
72+
}
73+
parts = append(parts, part)
74+
// slice this part off the cumulative bytes
75+
cumulativeBytes = cumulativeBytes[types.BlockPartSizeBytes:]
76+
// set cumulative start index
77+
cumulativeBytesStartIndex += int(types.BlockPartSizeBytes)
78+
}
79+
80+
// check whether the next transaction is a contingent to the current one.
81+
if index+1 < len(txsFound) {
82+
nextTx := txsFound[index+1]
83+
if currentTx.MetaData.End != nextTx.MetaData.Start {
84+
// the next transaction is not contingent, we can reset the cumulative bytes.
85+
cumulativeBytes = cumulativeBytes[:0]
86+
cumulativeBytesStartIndex = -1
87+
}
88+
}
89+
}
90+
91+
return parts
92+
}
93+
94+
// UnmarshalledTx is an intermediary type that allows keeping the transaction
95+
// metadata, its Key and the actual tx bytes. This will be used to create the
96+
// parts from the local txs.
97+
type UnmarshalledTx struct {
98+
MetaData TxMetaData
99+
Key types.TxKey
100+
TxBytes []byte
101+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package types
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
"time"
7+
8+
"github.com/gogo/protobuf/proto"
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
cmtrand "github.com/tendermint/tendermint/libs/rand"
12+
"github.com/tendermint/tendermint/proto/tendermint/mempool"
13+
"github.com/tendermint/tendermint/state"
14+
"github.com/tendermint/tendermint/types"
15+
)
16+
17+
// TestTxsToParts extensive testing of the txs to parts method
18+
// that recovers the parts from mempool txs.
19+
func TestTxsToParts(t *testing.T) {
20+
cleanup, _, sm := state.SetupTestCase(t)
21+
t.Cleanup(func() {
22+
cleanup(t)
23+
})
24+
numberOfTxs := 16 // increasing the number of transactions increases the test time exponentially
25+
26+
tests := []struct {
27+
name string
28+
txs []types.Tx
29+
}{
30+
{
31+
name: "txs size == types.BlockPartSizeBytes/3",
32+
txs: func() []types.Tx {
33+
txs := make([]types.Tx, 0, numberOfTxs)
34+
for i := 0; i < numberOfTxs; i++ {
35+
txs = append(txs, cmtrand.Bytes(int(types.BlockPartSizeBytes/3)))
36+
}
37+
return txs
38+
}(),
39+
},
40+
{
41+
name: "txs size == types.BlockPartSizeBytes",
42+
txs: func() []types.Tx {
43+
txs := make([]types.Tx, 0, numberOfTxs)
44+
for i := 0; i < numberOfTxs; i++ {
45+
txs = append(txs, cmtrand.Bytes(int(types.BlockPartSizeBytes)))
46+
}
47+
return txs
48+
}(),
49+
},
50+
{
51+
name: "txs size == types.BlockPartSizeBytes * 3",
52+
txs: func() []types.Tx {
53+
txs := make([]types.Tx, 0, numberOfTxs)
54+
for i := 0; i < numberOfTxs; i++ {
55+
txs = append(txs, cmtrand.Bytes(int(types.BlockPartSizeBytes)*3))
56+
}
57+
return txs
58+
}(),
59+
},
60+
}
61+
62+
for _, test := range tests {
63+
t.Run(test.name, func(t *testing.T) {
64+
data := types.Data{Txs: test.txs}
65+
block, partSet := sm.MakeBlock(1, data, types.RandCommit(time.Now()), []types.Evidence{}, cmtrand.Bytes(20))
66+
67+
txsFound := make([]UnmarshalledTx, len(partSet.TxPos))
68+
for i, pos := range partSet.TxPos {
69+
// calculate the protobuf overhead
70+
protoTxs := mempool.Txs{Txs: [][]byte{data.Txs[i]}}
71+
marshalledTx, err := proto.Marshal(&protoTxs)
72+
require.NoError(t, err)
73+
74+
txKey, err := types.TxKeyFromBytes(block.Txs[i].Hash())
75+
require.NoError(t, err)
76+
txsFound[i] = UnmarshalledTx{
77+
MetaData: TxMetaData{
78+
Start: uint32(pos.Start),
79+
End: uint32(pos.End),
80+
Hash: block.Txs[i].Hash(),
81+
},
82+
Key: txKey,
83+
TxBytes: marshalledTx,
84+
}
85+
}
86+
87+
// generate all the possible combinations for the provided number of transactions
88+
txsCombinations := GenerateTxsCombinations(numberOfTxs)
89+
90+
for _, combination := range txsCombinations {
91+
t.Run(fmt.Sprintf("%v", combination), func(t *testing.T) {
92+
combinationTxs := make([]UnmarshalledTx, 0)
93+
for index, val := range combination {
94+
if val == 1 {
95+
combinationTxs = append(combinationTxs, txsFound[index])
96+
}
97+
}
98+
99+
parts := TxsToParts(combinationTxs)
100+
101+
for _, part := range parts {
102+
expectedPart := partSet.GetPart(int(part.Index))
103+
assert.Equal(t, expectedPart.Bytes, part.Bytes)
104+
}
105+
})
106+
}
107+
})
108+
}
109+
}
110+
111+
// GenerateTxsCombinations generates all relevant transaction placements
112+
// in a block given a number of transactions.
113+
func GenerateTxsCombinations(n int) [][]int {
114+
total := 1 << n // 2^n combinations
115+
result := make([][]int, 0)
116+
117+
for i := 0; i < total; i++ {
118+
bitArray := make([]int, n)
119+
for j := 0; j < n; j++ {
120+
// Extract the bit at position j
121+
bitArray[j] = (i >> j) & 1
122+
}
123+
result = append(result, bitArray)
124+
}
125+
return result
126+
}

‎consensus/reactor_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ func TestReactorWithEvidence(t *testing.T) {
221221
if err != nil {
222222
panic(err)
223223
}
224-
propagator := propagation.NewReactor(key.ID(), nil, blockStore)
224+
propagator := propagation.NewReactor(key.ID(), nil, blockStore, mempool)
225225
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, propagator, mempool, evpool2)
226226
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
227227
cs.SetPrivValidator(pv)

‎consensus/replay_file.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
339339
panic(err)
340340
}
341341
// TODO pass a tracer from here
342-
propagator := propagation.NewReactor(key.ID(), nil, blockStore)
342+
propagator := propagation.NewReactor(key.ID(), nil, blockStore, mempool)
343343
consensusState := NewState(csConfig, state.Copy(), blockExec,
344344
blockStore, propagator, mempool, evpool)
345345

‎consensus/wal_generator.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) {
9292
if err != nil {
9393
panic(err)
9494
}
95-
propagator := propagation.NewReactor(key.ID(), nil, blockStore)
95+
propagator := propagation.NewReactor(key.ID(), nil, blockStore, mempool)
9696
consensusState := NewState(config.Consensus, state.Copy(), blockExec, blockStore, propagator, mempool, evpool)
9797
consensusState.SetLogger(logger)
9898
consensusState.SetEventBus(eventBus)

‎node/node.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -926,7 +926,7 @@ func NewNodeWithContext(ctx context.Context,
926926
return nil, fmt.Errorf("could not create blockchain reactor: %w", err)
927927
}
928928

929-
propagationReactor := propagation.NewReactor(nodeKey.ID(), tracer, blockStore)
929+
propagationReactor := propagation.NewReactor(nodeKey.ID(), tracer, blockStore, mempool)
930930

931931
// Make ConsensusReactor. Don't enable fully if doing a state sync and/or fast sync first.
932932
// FIXME We need to update metrics here, since other reactors don't have access to them.

‎types/marshalling.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ import (
1212
// for a given txs field.
1313
type TxPosition struct {
1414
Start int
15-
End int
15+
// End exclusive position of the transaction
16+
End int
1617
}
1718

1819
// MarshalBlockWithTxPositions marshals the given Block message using protobuf

0 commit comments

Comments
 (0)
Please sign in to comment.