Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prototype: compact blocks #1191

Draft
wants to merge 75 commits into
base: v0.34.x-celestia
Choose a base branch
from
Draft
Changes from 14 commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
bf6edc0
implement prototype for compact blocks
cmwaters Jan 20, 2024
211694b
add compact block messages to the reactor
cmwaters Jan 24, 2024
567cf3f
checkpoint
cmwaters Jan 24, 2024
22639ab
checkpoint
cmwaters Jan 25, 2024
d4f62fd
fix e2e tests
cmwaters Jan 25, 2024
27a6fe3
add has compact block message
cmwaters Jan 26, 2024
f0e972e
use go 1.19
cmwaters Jan 26, 2024
1cb6666
fix tests
cmwaters Jan 27, 2024
4874e77
fix consensus tests
cmwaters Jan 27, 2024
187d26c
fix test I just broke
cmwaters Jan 27, 2024
0f529f6
linting
cmwaters Jan 27, 2024
64d3f6d
lint
cmwaters Jan 27, 2024
c530d1e
fix data race
cmwaters Jan 27, 2024
3e6c075
go fmt
cmwaters Jan 27, 2024
507f3d2
Merge remote-tracking branch 'origin/v0.34.x-celestia' into callum/co…
cmwaters Jul 19, 2024
9ab18d4
gofmt
cmwaters Jul 19, 2024
8f63838
save metrics
cmwaters Jul 19, 2024
71b752e
enable metrics
cmwaters Jul 19, 2024
c2e6331
shift where we measure compact blocks
cmwaters Jul 19, 2024
1cbd361
fix: panic in prometheus
cmwaters Jul 24, 2024
1fe8522
add the square size to the constructed data
cmwaters Jul 29, 2024
1846523
add more metrics for compact blocks
cmwaters Jul 30, 2024
1c4f76e
fix usage of metrics
cmwaters Jul 30, 2024
9ad05ed
fix error handling in cat pool and consensus
cmwaters Jul 30, 2024
93774a7
checkpoint
cmwaters Jul 31, 2024
6f8f70a
print out account sequence
cmwaters Jul 31, 2024
82a8958
track if we see missing transcations which are invalid
cmwaters Aug 8, 2024
d1b3c4d
add more logs around the mempool
cmwaters Aug 9, 2024
8a5d797
remove jitter
cmwaters Aug 9, 2024
e937057
add inclusion delay
cmwaters Aug 9, 2024
74cac47
jump to 3 second delay
cmwaters Aug 9, 2024
dfa4382
improve logs
cmwaters Aug 9, 2024
a56bbb7
checkpoint
cmwaters Aug 9, 2024
905ba36
Revert "checkpoint"
cmwaters Aug 12, 2024
2273f83
update logger
cmwaters Aug 12, 2024
513949c
checkpoint
cmwaters Aug 13, 2024
567a281
checkpoint
cmwaters Aug 13, 2024
dcea04c
add more tracing data
cmwaters Aug 13, 2024
c5fd230
return some logs to debug
cmwaters Aug 13, 2024
b4ebb57
checkpoint
cmwaters Aug 14, 2024
33004a3
sneaky fix
cmwaters Aug 14, 2024
1bb12bf
fix
cmwaters Aug 14, 2024
b1086d2
checkpoint
cmwaters Aug 15, 2024
aab522a
fix nil pointer dereference
cmwaters Aug 15, 2024
d46ad2d
fix pruning and resetting
cmwaters Aug 16, 2024
5aefb63
define a send queue capacity
cmwaters Aug 16, 2024
178f81a
add more tracing data
cmwaters Aug 20, 2024
aaf89cf
register mempool recovery table
cmwaters Aug 23, 2024
c42c673
allow the fetching of compact blocks to last the entire round
cmwaters Aug 26, 2024
9ff3319
repropose the valid block if it exists
cmwaters Aug 26, 2024
7578e10
use updateRoundStep to track consensus state changes
cmwaters Aug 27, 2024
34774cd
add compact hash to the proposal type
cmwaters Aug 27, 2024
955714f
chore: remove maverick directory
cmwaters Aug 27, 2024
5ab5034
use compact hash
cmwaters Aug 27, 2024
ec3335c
remove cases in e2e test
cmwaters Aug 27, 2024
bc72dc4
cover up a few other missing cases
cmwaters Aug 27, 2024
5b25a7c
fix the fallback mechanism
cmwaters Aug 28, 2024
cf354c5
don't evict transcactions that have been proposed
cmwaters Aug 30, 2024
dddd61e
mark peer as having sent compact proposal
cmwaters Aug 30, 2024
92e20de
don't evict transactions in a proposal
cmwaters Aug 30, 2024
0a5adc0
make sure the lock is correctly used
cmwaters Aug 30, 2024
99b4727
play around with p2p settings
cmwaters Sep 2, 2024
7218a79
add a channel and modify some of the vote gossiping logic
cmwaters Sep 3, 2024
bc85fc2
implement priority based broadcast system
cmwaters Sep 5, 2024
79947c6
remove print statement
cmwaters Sep 5, 2024
a801fa3
checkpoint
cmwaters Sep 6, 2024
9f48af6
increase the send queue capacity
cmwaters Sep 6, 2024
594e303
make modifications to the consensus reactor
cmwaters Sep 6, 2024
7b76868
fix panic
cmwaters Sep 6, 2024
caeb9ef
checkpoint
cmwaters Sep 6, 2024
91f1703
Merge branch 'v0.34.x-celestia' into callum/compact-blocks
cmwaters Oct 28, 2024
a55e1c9
fix test
cmwaters Oct 28, 2024
77db525
revert some modifications to the consensus reactor logic
cmwaters Oct 28, 2024
f170aad
repurpose listen only in the mempool
cmwaters Oct 28, 2024
a31a8fe
lint
cmwaters Oct 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -692,9 +692,7 @@ func DefaultFuzzConnConfig() *FuzzConnConfig {
// MempoolConfig defines the configuration options for the CometBFT mempool
type MempoolConfig struct {
// Mempool version to use:
// 1) "v0" - FIFO mempool.
// 2) "v1" - (default) prioritized mempool.
// 3) "v2" - content addressable transaction pool
// 1) "v2" - (default) content addressable transaction pool
Version string `mapstructure:"version"`
// RootDir is the root directory for all data. This should be configured via
// the $CMTHOME env variable or --home cmd flag rather than overriding this
@@ -763,7 +761,7 @@ type MempoolConfig struct {
// DefaultMempoolConfig returns a default configuration for the CometBFT mempool
func DefaultMempoolConfig() *MempoolConfig {
return &MempoolConfig{
Version: MempoolV1,
Version: MempoolV2,
Recheck: true,
Broadcast: true,
WalPath: "",
@@ -798,6 +796,9 @@ func (cfg *MempoolConfig) WalEnabled() bool {
// ValidateBasic performs basic validation (checking param bounds, etc.) and
// returns an error if any check fails.
func (cfg *MempoolConfig) ValidateBasic() error {
if cfg.Version != MempoolV2 {
return errors.New("only v2 mempool is supported for compact blocks")
}
if cfg.Size < 0 {
return errors.New("size can't be negative")
}
4 changes: 1 addition & 3 deletions config/toml.go
Original file line number Diff line number Diff line change
@@ -343,9 +343,7 @@ dial_timeout = "{{ .P2P.DialTimeout }}"
[mempool]

# Mempool version to use:
# 1) "v0" - FIFO mempool.
# 2) "v1" - (default) prioritized mempool.
# 3) "v2" - content addressable transaction pool
# 1) "v2" - (default) content addressable transaction pool
version = "{{ .Mempool.Version }}"

# Recheck (default: true) defines whether CometBFT should recheck the
71 changes: 33 additions & 38 deletions consensus/byzantine_test.go
Original file line number Diff line number Diff line change
@@ -20,12 +20,8 @@ import (
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/service"
cmtsync "github.com/tendermint/tendermint/libs/sync"
mempl "github.com/tendermint/tendermint/mempool"

cfg "github.com/tendermint/tendermint/config"
mempoolv2 "github.com/tendermint/tendermint/mempool/cat"
mempoolv0 "github.com/tendermint/tendermint/mempool/v0"
mempoolv1 "github.com/tendermint/tendermint/mempool/v1"
"github.com/tendermint/tendermint/mempool/cat"
"github.com/tendermint/tendermint/p2p"
cmtcons "github.com/tendermint/tendermint/proto/tendermint/consensus"
cmtproto "github.com/tendermint/tendermint/proto/tendermint/types"
@@ -48,6 +44,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {

genDoc, privVals := randGenesisDoc(nValidators, false, 30)
css := make([]*State, nValidators)
catReactors := make([]*cat.Reactor, nValidators)

for i := 0; i < nValidators; i++ {
logger := consensusLogger().With("test", "byzantine", "validator", i)
@@ -72,33 +69,21 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
proxyAppConnConMem := abcicli.NewLocalClient(mtx, app)

// Make Mempool
var mempool mempl.Mempool

switch thisConfig.Mempool.Version {
case cfg.MempoolV0:
mempool = mempoolv0.NewCListMempool(config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
mempoolv0.WithPreCheck(sm.TxPreCheck(state)),
mempoolv0.WithPostCheck(sm.TxPostCheck(state)))
case cfg.MempoolV1:
mempool = mempoolv1.NewTxMempool(logger,
config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
)
case cfg.MempoolV2:
mempool = mempoolv2.NewTxPool(
logger,
config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
mempoolv2.WithPreCheck(sm.TxPreCheck(state)),
mempoolv2.WithPostCheck(sm.TxPostCheck(state)),
)
}
mempool := cat.NewTxPool(
logger,
config.Mempool,
proxyAppConnConMem,
state.LastBlockHeight,
cat.WithPreCheck(sm.TxPreCheck(state)),
cat.WithPostCheck(sm.TxPostCheck(state)),
)
var err error
catReactors[i], err = cat.NewReactor(mempool, &cat.ReactorOptions{
ListenOnly: !config.Mempool.Broadcast,
MaxTxSize: config.Mempool.MaxTxBytes,
MaxGossipDelay: config.Mempool.MaxGossipDelay,
})
require.NoError(t, err)

if thisConfig.Consensus.WaitForTxs() {
mempool.EnableTxsAvailable()
@@ -112,7 +97,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {

// Make State
blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, catReactors[i], evpool)
cs.SetLogger(cs.Logger)
// set private validator
pv := privVals[i]
@@ -154,6 +139,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
// make connected switches and start all reactors
p2p.MakeConnectedSwitches(config.P2P, nValidators, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("CONSENSUS", reactors[i])
s.AddReactor("MEMPOOL", catReactors[i])
s.SetLogger(reactors[i].conS.Logger.With("module", "p2p"))
return s
}, p2p.Connect2Switches)
@@ -230,9 +216,15 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
}
proposerAddr := lazyProposer.privValidatorPubKey.Address()

block, blockParts := lazyProposer.blockExec.CreateProposalBlock(
block := lazyProposer.blockExec.CreateProposalBlock(
lazyProposer.Height, lazyProposer.state, commit, proposerAddr,
)
blockHash := block.Hash()
blockParts := block.MakePartSet(types.BlockPartSizeBytes)

keys, err := lazyProposer.txFetcher.FetchKeysFromTxs(context.Background(), block.Txs.ToSliceOfBytes())
require.NoError(t, err)
block.Txs = types.ToTxs(keys)

// Flush the WAL. Otherwise, we may not recompute the same proposal to sign,
// and the privValidator will refuse to sign anything.
@@ -241,14 +233,15 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
}

// Make proposal
propBlockID := types.BlockID{Hash: block.Hash(), PartSetHeader: blockParts.Header()}
propBlockID := types.BlockID{Hash: blockHash, PartSetHeader: blockParts.Header()}
proposal := types.NewProposal(height, round, lazyProposer.TwoThirdPrevoteRound, propBlockID)
p := proposal.ToProto()
if err := lazyProposer.privValidator.SignProposal(lazyProposer.state.ChainID, p); err == nil {
proposal.Signature = p.Signature

// send proposal and block parts on internal msg queue
lazyProposer.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, ""})
lazyProposer.sendInternalMessage(msgInfo{&CompactBlockMessage{block, proposal.Round}, ""})
for i := 0; i < int(blockParts.Total()); i++ {
part := blockParts.GetPart(i)
lazyProposer.sendInternalMessage(msgInfo{&BlockPartMessage{lazyProposer.Height, lazyProposer.Round, part}, ""})
@@ -307,7 +300,7 @@ func TestByzantinePrevoteEquivocation(t *testing.T) {
}
case <-time.After(20 * time.Second):
for i, reactor := range reactors {
t.Logf("Consensus Reactor %d\n%v", i, reactor)
t.Logf("Consensus Reactor %d\n%v", i, reactor.conS.GetRoundState())
}
t.Fatalf("Timed out waiting for validators to commit evidence")
}
@@ -480,7 +473,8 @@ func byzantineDecideProposalFunc(t *testing.T, height int64, round int32, cs *St
// Avoid sending on internalMsgQueue and running consensus state.

// Create a new proposal block from state/txs from the mempool.
block1, blockParts1 := cs.createProposalBlock()
block1 := cs.createProposalBlock()
blockParts1 := block1.MakePartSet(types.BlockPartSizeBytes)
polRound := cs.TwoThirdPrevoteRound
propBlockID := types.BlockID{Hash: block1.Hash(), PartSetHeader: blockParts1.Header()}
proposal1 := types.NewProposal(height, round, polRound, propBlockID)
@@ -495,7 +489,8 @@ func byzantineDecideProposalFunc(t *testing.T, height int64, round int32, cs *St
deliverTxsRange(cs, 0, 1)

// Create a new proposal block from state/txs from the mempool.
block2, blockParts2 := cs.createProposalBlock()
block2 := cs.createProposalBlock()
blockParts2 := block2.MakePartSet(types.BlockPartSizeBytes)
polRound = cs.TwoThirdPrevoteRound
propBlockID = types.BlockID{Hash: block2.Hash(), PartSetHeader: blockParts2.Header()}
proposal2 := types.NewProposal(height, round, polRound, propBlockID)
6 changes: 4 additions & 2 deletions consensus/common_test.go
Original file line number Diff line number Diff line change
@@ -207,7 +207,9 @@ func decideProposal(
round int32,
) (proposal *types.Proposal, block *types.Block) {
cs1.mtx.Lock()
block, blockParts := cs1.createProposalBlock()
block = cs1.createProposalBlock()
blockParts := block.MakePartSet(types.BlockPartSizeBytes)

validRound := cs1.TwoThirdPrevoteRound
chainID := cs1.state.ChainID
cs1.mtx.Unlock()
@@ -447,7 +449,7 @@ func newStateWithConfigAndBlockStore(
}

blockExec := sm.NewBlockExecutor(stateStore, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs := NewState(thisConfig.Consensus, state, blockExec, blockStore, mempool, nil, evpool)
cs.SetLogger(log.TestingLogger().With("module", "consensus"))
cs.SetPrivValidator(pv)

50 changes: 50 additions & 0 deletions consensus/metrics.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package consensus

import (
"encoding/json"
"fmt"
"path/filepath"
"strings"
"time"

"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/discard"
cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/libs/os"

prometheus "github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
@@ -299,3 +303,49 @@ func (m *Metrics) MarkStep(s cstypes.RoundStepType) {
}
m.stepStart = time.Now()
}

type JSONMetrics struct {
dir string
interval int
StartTime time.Time
EndTime time.Time
Blocks uint64
Rounds uint64
SentConsensusBytes uint64
SentCompactBlocks uint64
SentCompactBytes uint64
CompactBlockFailures uint64
SentBlockParts uint64
SentBlockPartsBytes uint64
}

func NewJSONMetrics(dir string) *JSONMetrics {
return &JSONMetrics{
dir: dir,
StartTime: time.Now().UTC(),
}
}

func (m *JSONMetrics) Save() {
m.EndTime = time.Now().UTC()
content, err := json.MarshalIndent(m, "", " ")
if err != nil {
panic(err)
}
path := filepath.Join(m.dir, fmt.Sprintf("metrics_%d.json", m.interval))
os.MustWriteFile(path, content, 0644)
m.StartTime = m.EndTime
m.interval++
m.reset()
}

func (m *JSONMetrics) reset() {
m.Blocks = 0
m.Rounds = 0
m.SentConsensusBytes = 0
m.SentBlockParts = 0
m.SentBlockPartsBytes = 0
m.SentCompactBlocks = 0
m.SentCompactBytes = 0
m.CompactBlockFailures = 0
}
34 changes: 34 additions & 0 deletions consensus/msgs.go
Original file line number Diff line number Diff line change
@@ -60,6 +60,24 @@ func MsgToProto(msg Message) (*cmtcons.Message, error) {
}
return m.Wrap().(*cmtcons.Message), nil

case *CompactBlockMessage:
block, err := msg.Block.ToProto()
if err != nil {
return nil, fmt.Errorf("msg to proto error: %w", err)
}
m := &cmtcons.CompactBlock{
Block: block,
Round: msg.Round,
}
return m.Wrap().(*cmtcons.Message), nil

case *HasBlockMessage:
m := &cmtcons.HasCompactBlock{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

presumably these names will be the same? when we send a HasCompactBlock, are we indicating that we have the block in its entirety or just he hashes?

Height: msg.Height,
Round: msg.Round,
}
return m.Wrap().(*cmtcons.Message), nil

case *BlockPartMessage:
parts, err := msg.Part.ToProto()
if err != nil {
@@ -188,6 +206,22 @@ func MsgFromProto(p *cmtcons.Message) (Message, error) {
Round: msg.Round,
Part: parts,
}
case *cmtcons.CompactBlock:
block, err := types.BlockFromProto(msg.Block)
if err != nil {
return nil, fmt.Errorf("compactBlock msg to proto error: %w", err)
}
pb = &CompactBlockMessage{
Block: block,
Round: msg.Round,
}

case *cmtcons.HasCompactBlock:
pb = &HasBlockMessage{
Height: msg.Height,
Round: msg.Round,
}

case *cmtcons.Vote:
vote, err := types.VoteFromProto(msg.Vote)
if err != nil {
Loading