Skip to content

Commit c3e436d

Browse files
committedNov 18, 2024
chore: initialise the mock reactor as part of the node setup
1 parent eb6b72f commit c3e436d

File tree

5 files changed

+81
-31
lines changed

5 files changed

+81
-31
lines changed
 

‎node/node.go

+59-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"context"
66
"errors"
77
"fmt"
8+
"github.com/tendermint/tendermint/p2p/load"
9+
"github.com/tendermint/tendermint/p2p/mock"
810
"net"
911
"net/http"
1012
"strings"
@@ -237,6 +239,7 @@ type Node struct {
237239
tracer trace.Tracer
238240
pyroscopeProfiler *pyroscope.Profiler
239241
pyroscopeTracer *sdktrace.TracerProvider
242+
mockReactor *mock.Reactor
240243
}
241244

242245
func initDBs(config *cfg.Config, dbProvider DBProvider) (blockStore *store.BlockStore, stateDB dbm.DB, err error) {
@@ -623,6 +626,7 @@ func createSwitch(config *cfg.Config,
623626
stateSyncReactor *statesync.Reactor,
624627
consensusReactor *cs.Reactor,
625628
evidenceReactor *evidence.Reactor,
629+
mockReactor *load.MockReactor,
626630
nodeInfo p2p.NodeInfo,
627631
nodeKey *p2p.NodeKey,
628632
p2pLogger log.Logger,
@@ -641,6 +645,7 @@ func createSwitch(config *cfg.Config,
641645
sw.AddReactor("CONSENSUS", consensusReactor)
642646
sw.AddReactor("EVIDENCE", evidenceReactor)
643647
sw.AddReactor("STATESYNC", stateSyncReactor)
648+
sw.AddReactor("MOCK", mockReactor)
644649

645650
sw.SetNodeInfo(nodeInfo)
646651
sw.SetNodeKey(nodeKey)
@@ -927,11 +932,59 @@ func NewNode(config *cfg.Config,
927932
// Setup Transport.
928933
transport, peerFilters := createTransport(config, nodeInfo, nodeKey, proxyApp, tracer)
929934

935+
mockReactor := load.NewMockReactor(load.DefaultTestChannels, 100)
936+
937+
go func() {
938+
for {
939+
time.Sleep(10 * time.Second)
940+
mockReactor.PrintReceiveSpeed()
941+
}
942+
}()
943+
944+
go func() {
945+
mockReactor.FloodAllPeers(10*time.Minute,
946+
load.FirstChannel,
947+
load.SecondChannel,
948+
load.ThirdChannel,
949+
load.FourthChannel,
950+
load.FifthChannel,
951+
load.SixthChannel,
952+
load.SeventhChannel,
953+
load.EighthChannel,
954+
load.NinthChannel,
955+
load.TenthChannel,
956+
)
957+
}()
958+
959+
go func() {
960+
for _, size := range []int{
961+
500,
962+
1_000,
963+
5_000,
964+
10_000,
965+
50_000,
966+
100_000,
967+
500_000,
968+
1_000_000,
969+
5_000_000,
970+
10_000_000,
971+
20_000_000,
972+
30_000_000,
973+
50_000_000,
974+
100_000_000,
975+
200_000_000,
976+
} {
977+
time.Sleep(30 * time.Second)
978+
mockReactor.IncreaseSize(int64(size))
979+
logger.Error("======> increased flood size", "size", size)
980+
}
981+
}()
982+
930983
// Setup Switch.
931984
p2pLogger := logger.With("module", "p2p")
932985
sw := createSwitch(
933986
config, transport, p2pMetrics, peerFilters, mempoolReactor, bcReactor,
934-
stateSyncReactor, consensusReactor, evidenceReactor, nodeInfo, nodeKey, p2pLogger, tracer,
987+
stateSyncReactor, consensusReactor, evidenceReactor, mockReactor, nodeInfo, nodeKey, p2pLogger, tracer,
935988
)
936989

937990
err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " "))
@@ -1368,6 +1421,11 @@ func (n *Node) ConsensusReactor() *cs.Reactor {
13681421
return n.consensusReactor
13691422
}
13701423

1424+
// ConsensusReactor returns the Node's ConsensusReactor.
1425+
func (n *Node) MockReactor() *mock.Reactor {
1426+
return n.mockReactor
1427+
}
1428+
13711429
// MempoolReactor returns the Node's mempool reactor.
13721430
func (n *Node) MempoolReactor() p2p.Reactor {
13731431
return n.mempoolReactor

‎p2p/conn/connection.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ const (
4141
defaultFlushThrottle = 100 * time.Millisecond
4242

4343
defaultSendQueueCapacity = 1
44-
defaultRecvBufferCapacity = 4096
45-
defaultRecvMessageCapacity = 22020096 // 21MB
44+
defaultRecvBufferCapacity = 40960000
45+
defaultRecvMessageCapacity = 22020096000
4646
defaultSendRate = int64(512000) // 500KB/s
4747
defaultRecvRate = int64(512000) // 500KB/s
4848
defaultSendTimeout = 10 * time.Second

‎p2p/load/mock_reactor.go

+9-17
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@ const (
3030
var priorities = make(map[byte]int)
3131

3232
func init() {
33-
for _, ch := range defaultTestChannels {
33+
for _, ch := range DefaultTestChannels {
3434
priorities[ch.ID] = ch.Priority
3535
}
3636
}
3737

38-
var defaultTestChannels = []*p2p.ChannelDescriptor{
38+
var DefaultTestChannels = []*p2p.ChannelDescriptor{
3939
{
4040
ID: FirstChannel,
4141
Priority: 1,
@@ -135,7 +135,6 @@ var defaultMsgSizes = []int{
135135
type MockReactor struct {
136136
p2p.BaseReactor
137137
channels []*conn.ChannelDescriptor
138-
sizes map[byte]int
139138

140139
mtx sync.Mutex
141140
peers map[p2p.ID]p2p.Peer
@@ -149,21 +148,17 @@ type MockReactor struct {
149148
}
150149

151150
// NewMockReactor creates a new mock reactor.
152-
func NewMockReactor(channels []*conn.ChannelDescriptor, msgSizes []int) *MockReactor {
151+
func NewMockReactor(channels []*conn.ChannelDescriptor, msgSize int) *MockReactor {
153152
s := atomic.Int64{}
154-
s.Store(200)
153+
s.Store(int64(msgSize))
155154
mr := &MockReactor{
156155
channels: channels,
157156
peers: make(map[p2p.ID]p2p.Peer),
158-
sizes: make(map[byte]int),
159157
startTime: map[string]time.Time{},
160158
speed: map[string]float64{},
161159
cumulativeReceivedBytes: map[string]int{},
162160
size: s,
163161
}
164-
for i, ch := range channels {
165-
mr.sizes[ch.ID] = msgSizes[i]
166-
}
167162
mr.BaseReactor = *p2p.NewBaseReactor("MockReactor", mr)
168163
return mr
169164
}
@@ -292,22 +287,19 @@ func (mr *MockReactor) FillChannel(id p2p.ID, chID byte, count, msgSize int) (bo
292287
return true, count, end.Sub(start)
293288
}
294289

295-
func (mr *MockReactor) FloodChannel(wg *sync.WaitGroup, id p2p.ID, d time.Duration, chIDs ...byte) {
290+
func (mr *MockReactor) FloodChannel(id p2p.ID, d time.Duration, chIDs ...byte) {
296291
for _, chID := range chIDs {
297-
wg.Add(1)
298-
size := mr.sizes[chID]
299-
go func(d time.Duration, chID byte, size int) {
292+
go func(d time.Duration, chID byte) {
300293
start := time.Now()
301-
defer wg.Done()
302294
for time.Since(start) < d {
303295
mr.SendBytes(id, chID)
304296
}
305-
}(d, chID, size)
297+
}(d, chID)
306298
}
307299
}
308300

309-
func (mr *MockReactor) FloodAllPeers(wg *sync.WaitGroup, d time.Duration, chIDs ...byte) {
301+
func (mr *MockReactor) FloodAllPeers(d time.Duration, chIDs ...byte) {
310302
for _, peer := range mr.peers {
311-
mr.FloodChannel(wg, peer.ID(), d, chIDs...)
303+
mr.FloodChannel(peer.ID(), d, chIDs...)
312304
}
313305
}

‎p2p/load/multi_conn_test.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func TestMultipleConnections(t *testing.T) {
2727
chainID := "base-30"
2828

2929
for i := 0; i < peerCount; i++ {
30-
reactor := NewMockReactor(defaultTestChannels, defaultMsgSizes)
30+
reactor := NewMockReactor(DefaultTestChannels, defaultMsgSizes)
3131
node, err := newnode(*cfg, mcfg, chainID, reactor)
3232
require.NoError(t, err)
3333

@@ -84,14 +84,14 @@ func TestMultipleConnections(t *testing.T) {
8484
}
8585

8686
for _, size := range []int64{
87-
500,
88-
1_000,
89-
2_000,
90-
5_000,
91-
10_000,
92-
50_000,
93-
100_000,
94-
500_000,
87+
//500,
88+
//1_000,
89+
//2_000,
90+
//5_000,
91+
//10_000,
92+
//50_000,
93+
//100_000,
94+
//500_000,
9595
1_000_000,
9696
10_000_000,
9797
100_000_000,

‎p2p/load/transport_bench_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@ func TestTransportBench(t *testing.T) {
3636
mcfg.SendRate = 50000
3737
mcfg.RecvRate = 50000
3838

39-
reactor1 := NewMockReactor(defaultTestChannels, defaultMsgSizes)
39+
reactor1 := NewMockReactor(DefaultTestChannels, defaultMsgSizes)
4040
node1, err := newnode(*cfg, mcfg, "test", reactor1)
4141
require.NoError(t, err)
4242

43-
reactor2 := NewMockReactor(defaultTestChannels, defaultMsgSizes)
43+
reactor2 := NewMockReactor(DefaultTestChannels, defaultMsgSizes)
4444
node2, err := newnode(*cfg, mcfg, "test", reactor2)
4545
require.NoError(t, err)
4646

0 commit comments

Comments
 (0)
Please sign in to comment.