Skip to content

Commit 382f0ce

Browse files
rach-idrootulp
andauthoredJan 28, 2025
feat: support parallel processing of messages (celestiaorg#1595)
## Description closes celestiaorg#1534 --------- Co-authored-by: Rootul P <[email protected]>

14 files changed

+337
-47
lines changed
 

‎blockchain/v0/reactor.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ const (
3030
statusUpdateIntervalSeconds = 10
3131
// check if we should switch to consensus reactor
3232
switchToConsensusIntervalSeconds = 1
33+
34+
// ReactorIncomingMessageQueueSize the size of the reactor's message queue.
35+
ReactorIncomingMessageQueueSize = 10
3336
)
3437

3538
type consensusReactor interface {
@@ -92,7 +95,7 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *st
9295
requestsCh: requestsCh,
9396
errorsCh: errorsCh,
9497
}
95-
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
98+
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR, p2p.WithIncomingQueueSize(ReactorIncomingMessageQueueSize))
9699
return bcR
97100
}
98101

‎blockchain/v1/reactor.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ const (
2424

2525
// ask for best height every 10s
2626
statusUpdateIntervalSeconds = 10
27+
28+
// ReactorIncomingMessageQueueSize the size of the reactor's message queue.
29+
ReactorIncomingMessageQueueSize = 10
2730
)
2831

2932
var (
@@ -100,7 +103,7 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *st
100103
}
101104
fsm := NewFSM(startHeight, bcR)
102105
bcR.fsm = fsm
103-
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
106+
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR, p2p.WithIncomingQueueSize(ReactorIncomingMessageQueueSize))
104107
// bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch)
105108

106109
return bcR

‎consensus/byzantine_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -610,3 +610,7 @@ func (br *ByzantineReactor) Receive(chID byte, p p2p.Peer, m []byte) {
610610
br.reactor.Receive(chID, p, m)
611611
}
612612
func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }
613+
614+
func (br *ByzantineReactor) QueueUnprocessedEnvelope(e p2p.UnprocessedEnvelope) {
615+
br.reactor.QueueUnprocessedEnvelope(e)
616+
}

‎consensus/reactor.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ const (
3434

3535
blocksToContributeToBecomeGoodPeer = 10000
3636
votesToContributeToBecomeGoodPeer = 10000
37+
38+
// ReactorIncomingMessageQueueSize the size of the reactor's message queue.
39+
ReactorIncomingMessageQueueSize = 1000
3740
)
3841

3942
//-----------------------------------------------------------------------------
@@ -65,7 +68,7 @@ func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption)
6568
Metrics: NopMetrics(),
6669
traceClient: trace.NoOpTracer(),
6770
}
68-
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR)
71+
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR, p2p.WithIncomingQueueSize(ReactorIncomingMessageQueueSize))
6972

7073
for _, option := range options {
7174
option(conR)

‎evidence/reactor.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ const (
2424
broadcastEvidenceIntervalS = 10
2525
// If a message fails wait this much before sending it again
2626
peerRetryMessageIntervalMS = 100
27+
28+
// ReactorIncomingMessageQueueSize the size of the reactor's message queue.
29+
ReactorIncomingMessageQueueSize = 1
2730
)
2831

2932
// Reactor handles evpool evidence broadcasting amongst peers.
@@ -38,7 +41,7 @@ func NewReactor(evpool *Pool) *Reactor {
3841
evR := &Reactor{
3942
evpool: evpool,
4043
}
41-
evR.BaseReactor = *p2p.NewBaseReactor("Evidence", evR)
44+
evR.BaseReactor = *p2p.NewBaseReactor("Evidence", evR, p2p.WithIncomingQueueSize(ReactorIncomingMessageQueueSize))
4245
return evR
4346
}
4447

@@ -175,7 +178,7 @@ func (evR *Reactor) broadcastEvidenceRoutine(peer p2p.Peer) {
175178

176179
// Returns the message to send to the peer, or nil if the evidence is invalid for the peer.
177180
// If message is nil, we should sleep and try again.
178-
func (evR Reactor) prepareEvidenceMessage(
181+
func (evR *Reactor) prepareEvidenceMessage(
179182
peer p2p.Peer,
180183
ev types.Evidence,
181184
) (evis []types.Evidence) {
@@ -222,6 +225,10 @@ func (evR Reactor) prepareEvidenceMessage(
222225
return []types.Evidence{ev}
223226
}
224227

228+
func (evR *Reactor) OnStop() {
229+
evR.BaseReactor.OnStop()
230+
}
231+
225232
// PeerState describes the state of a peer.
226233
type PeerState interface {
227234
GetHeight() int64

‎evidence/reactor_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ func TestReactorBroadcastEvidenceMemoryLeak(t *testing.T) {
224224
r.AddPeer(p)
225225

226226
_ = sendEvidence(t, pool, val, 2)
227+
r.OnStop()
227228
}
228229

229230
// evidenceLogger is a TestingLogger which uses a different

‎mempool/v0/reactor_test.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ func TestReactorConcurrency(t *testing.T) {
8888

8989
const numTxs = 5
9090

91+
reactors[0].mempool.config.Size = 10000
92+
reactors[1].mempool.config.Size = 10000
9193
for i := 0; i < 1000; i++ {
9294
wg.Add(2)
9395

@@ -119,9 +121,6 @@ func TestReactorConcurrency(t *testing.T) {
119121
err := reactors[1].mempool.Update(1, []types.Tx{}, make([]*abci.ResponseDeliverTx, 0), nil, nil)
120122
assert.NoError(t, err)
121123
}()
122-
123-
// 1. flush the mempool
124-
reactors[1].mempool.Flush()
125124
}
126125

127126
wg.Wait()

‎p2p/base_reactor.go

+149-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,19 @@
11
package p2p
22

33
import (
4+
"context"
5+
"fmt"
6+
"reflect"
7+
8+
"github.com/gogo/protobuf/proto"
49
"github.com/tendermint/tendermint/libs/service"
510
"github.com/tendermint/tendermint/p2p/conn"
11+
"github.com/tendermint/tendermint/pkg/trace/schema"
612
)
713

14+
// ProcessorFunc is the message processor function type.
15+
type ProcessorFunc func(context.Context, <-chan UnprocessedEnvelope) error
16+
817
// Reactor is responsible for handling incoming messages on one or more
918
// Channel. Switch calls GetChannels when reactor is added to it. When a new
1019
// peer joins our node, InitPeer and AddPeer are called. RemovePeer is called
@@ -51,6 +60,15 @@ type Reactor interface {
5160
// Deprecated: Reactors looking to receive data from a peer should implement ReceiveEnvelope.
5261
// Receive will be deprecated in favor of ReceiveEnvelope in v0.37.
5362
Receive(chID byte, peer Peer, msgBytes []byte)
63+
64+
// QueueUnprocessedEnvelope is called by the switch when an unprocessed
65+
// envelope is received. Unprocessed envelopes are immediately buffered in a
66+
// queue to avoid blocking. Incoming messages are then passed to a
67+
// processing function. The default processing function unmarshals the
68+
// messages in the order the sender sent them and then calls Receive on the
69+
// reactor. The queue size and the processing function can be changed via
70+
// passing options to the base reactor.
71+
QueueUnprocessedEnvelope(e UnprocessedEnvelope)
5472
}
5573

5674
type EnvelopeReceiver interface {
@@ -68,18 +86,147 @@ type EnvelopeReceiver interface {
6886
type BaseReactor struct {
6987
service.BaseService // Provides Start, Stop, .Quit
7088
Switch *Switch
89+
90+
incoming chan UnprocessedEnvelope
91+
92+
ctx context.Context
93+
cancel context.CancelFunc
94+
// processor is called with the incoming channel and is responsible for
95+
// unmarshalling the messages and calling Receive on the reactor.
96+
processor ProcessorFunc
7197
}
7298

73-
func NewBaseReactor(name string, impl Reactor) *BaseReactor {
74-
return &BaseReactor{
99+
type ReactorOptions func(*BaseReactor)
100+
101+
func NewBaseReactor(name string, impl Reactor, opts ...ReactorOptions) *BaseReactor {
102+
ctx := context.Background()
103+
ctx, cancel := context.WithCancel(ctx)
104+
base := &BaseReactor{
105+
ctx: ctx,
106+
cancel: cancel,
75107
BaseService: *service.NewBaseService(nil, name, impl),
76108
Switch: nil,
109+
incoming: make(chan UnprocessedEnvelope, 100),
110+
processor: DefaultProcessor(impl),
111+
}
112+
for _, opt := range opts {
113+
opt(base)
114+
}
115+
116+
go func() {
117+
err := base.processor(ctx, base.incoming)
118+
if err != nil {
119+
err = base.Stop()
120+
if err != nil {
121+
panic(err)
122+
}
123+
}
124+
}()
125+
126+
return base
127+
}
128+
129+
// WithProcessor sets the processor function for the reactor. The processor
130+
// function is called with the incoming channel and is responsible for
131+
// unmarshalling the messages and calling Receive on the reactor.
132+
func WithProcessor(processor ProcessorFunc) ReactorOptions {
133+
return func(br *BaseReactor) {
134+
br.processor = processor
135+
}
136+
}
137+
138+
// WithIncomingQueueSize sets the size of the incoming message queue for a
139+
// reactor.
140+
func WithIncomingQueueSize(size int) ReactorOptions {
141+
return func(br *BaseReactor) {
142+
br.incoming = make(chan UnprocessedEnvelope, size)
77143
}
78144
}
79145

80146
func (br *BaseReactor) SetSwitch(sw *Switch) {
81147
br.Switch = sw
82148
}
149+
150+
// QueueUnprocessedEnvelope is called by the switch when an unprocessed
151+
// envelope is received. Unprocessed envelopes are immediately buffered in a
152+
// queue to avoid blocking. The size of the queue can be changed by passing
153+
// options to the base reactor.
154+
func (br *BaseReactor) QueueUnprocessedEnvelope(e UnprocessedEnvelope) {
155+
select {
156+
// if the context is done, do nothing.
157+
case <-br.ctx.Done():
158+
// if not, add the item to the channel.
159+
case br.incoming <- e:
160+
}
161+
}
162+
163+
func (br *BaseReactor) OnStop() {
164+
br.cancel()
165+
close(br.incoming)
166+
}
167+
168+
// DefaultProcessor unmarshalls the message and calls Receive on the reactor.
169+
// This preserves the sender's original order for all messages.
170+
func DefaultProcessor(impl Reactor) func(context.Context, <-chan UnprocessedEnvelope) error {
171+
implChannels := impl.GetChannels()
172+
173+
chIDs := make(map[byte]proto.Message, len(implChannels))
174+
for _, chDesc := range implChannels {
175+
chIDs[chDesc.ID] = chDesc.MessageType
176+
}
177+
return func(ctx context.Context, incoming <-chan UnprocessedEnvelope) error {
178+
for {
179+
select {
180+
case <-ctx.Done():
181+
return nil
182+
case ue, ok := <-incoming:
183+
if !ok {
184+
// this means the channel was closed.
185+
return nil
186+
}
187+
mt := chIDs[ue.ChannelID]
188+
189+
if mt == nil {
190+
return fmt.Errorf("no message type registered for channel %d", ue.ChannelID)
191+
}
192+
193+
msg := proto.Clone(mt)
194+
195+
err := proto.Unmarshal(ue.Message, msg)
196+
if err != nil {
197+
return fmt.Errorf("unmarshaling message: %v into type: %s resulted in error %w", msg, reflect.TypeOf(mt), err)
198+
}
199+
200+
if w, ok := msg.(Unwrapper); ok {
201+
msg, err = w.Unwrap()
202+
if err != nil {
203+
return fmt.Errorf("unwrapping message: %v", err)
204+
}
205+
}
206+
207+
labels := []string{
208+
"peer_id", string(ue.Src.ID()),
209+
"chID", fmt.Sprintf("%#x", ue.ChannelID),
210+
}
211+
212+
ue.Src.Metrics().PeerReceiveBytesTotal.With(labels...).Add(float64(len(ue.Message)))
213+
ue.Src.Metrics().MessageReceiveBytesTotal.With(append(labels, "message_type", ue.Src.ValueToMetricLabel(msg))...).Add(float64(len(ue.Message)))
214+
schema.WriteReceivedBytes(ue.Src.TraceClient(), string(ue.Src.ID()), ue.ChannelID, len(ue.Message))
215+
216+
if nr, ok := impl.(EnvelopeReceiver); ok {
217+
nr.ReceiveEnvelope(Envelope{
218+
ChannelID: ue.ChannelID,
219+
Src: ue.Src,
220+
Message: msg,
221+
})
222+
} else {
223+
impl.Receive(ue.ChannelID, ue.Src, ue.Message)
224+
}
225+
}
226+
}
227+
}
228+
}
229+
83230
func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil }
84231
func (*BaseReactor) AddPeer(peer Peer) {}
85232
func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {}

‎p2p/base_reactor_test.go

+122
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package p2p_test
2+
3+
import (
4+
"net"
5+
"sync"
6+
"testing"
7+
"time"
8+
9+
"github.com/gogo/protobuf/proto"
10+
"github.com/stretchr/testify/require"
11+
"github.com/tendermint/tendermint/libs/service"
12+
"github.com/tendermint/tendermint/p2p"
13+
"github.com/tendermint/tendermint/p2p/conn"
14+
"github.com/tendermint/tendermint/pkg/trace"
15+
"github.com/tendermint/tendermint/proto/tendermint/mempool"
16+
)
17+
18+
// TestBaseReactorProcessor tests the BaseReactor's message processing by
19+
// queueing encoded messages and adding artificial delay to the first message.
20+
// Depending on the processors used, the ordering of the sender could be lost.
21+
func TestBaseReactorProcessor(t *testing.T) {
22+
// a reactor using the default processor should be able to queue
23+
// messages, and they get processed in order.
24+
or := NewOrderedReactor()
25+
26+
msgs := []string{"msg1", "msg2", "msg3"}
27+
or.fillQueue(t, msgs...)
28+
29+
time.Sleep(300 * time.Millisecond) // wait plenty of time for the processing to finish
30+
31+
or.Lock()
32+
require.Equal(t, len(msgs), len(or.received))
33+
require.Equal(t, msgs, or.received)
34+
or.Unlock()
35+
}
36+
37+
var _ p2p.Reactor = &orderedReactor{}
38+
39+
// orderedReactor is used for testing. It saves each envelope in the order it
40+
// receives it.
41+
type orderedReactor struct {
42+
p2p.BaseReactor
43+
44+
sync.Mutex
45+
received []string
46+
receivedFirst bool
47+
}
48+
49+
func NewOrderedReactor() *orderedReactor {
50+
r := &orderedReactor{Mutex: sync.Mutex{}}
51+
r.BaseReactor = *p2p.NewBaseReactor("Ordered Rector", r, p2p.WithIncomingQueueSize(10))
52+
return r
53+
}
54+
55+
func (r *orderedReactor) GetChannels() []*conn.ChannelDescriptor {
56+
return []*conn.ChannelDescriptor{
57+
{
58+
ID: 0x99,
59+
Priority: 1,
60+
RecvMessageCapacity: 10,
61+
MessageType: &mempool.Txs{},
62+
},
63+
}
64+
}
65+
66+
// ReceiveEnvelope adds a delay to the first processed envelope to test ordering.
67+
func (r *orderedReactor) ReceiveEnvelope(e p2p.Envelope) {
68+
r.Lock()
69+
f := r.receivedFirst
70+
if !f {
71+
r.receivedFirst = true
72+
r.Unlock()
73+
time.Sleep(100 * time.Millisecond)
74+
} else {
75+
r.Unlock()
76+
}
77+
r.Lock()
78+
defer r.Unlock()
79+
80+
envMsg := e.Message.(*mempool.Txs)
81+
r.received = append(r.received, string(envMsg.Txs[0]))
82+
}
83+
84+
func (r *orderedReactor) fillQueue(t *testing.T, msgs ...string) {
85+
peer := &imaginaryPeer{}
86+
for _, msg := range msgs {
87+
s, err := proto.Marshal(&mempool.Txs{Txs: [][]byte{[]byte(msg)}})
88+
require.NoError(t, err)
89+
r.QueueUnprocessedEnvelope(p2p.UnprocessedEnvelope{
90+
Src: peer,
91+
Message: s,
92+
ChannelID: 0x99,
93+
})
94+
}
95+
}
96+
97+
var _ p2p.IntrospectivePeer = &imaginaryPeer{}
98+
99+
type imaginaryPeer struct {
100+
service.BaseService
101+
}
102+
103+
func (ip *imaginaryPeer) TraceClient() trace.Tracer { return trace.NoOpTracer() }
104+
func (ip *imaginaryPeer) HasIPChanged() bool { return false }
105+
func (ip *imaginaryPeer) FlushStop() {}
106+
func (ip *imaginaryPeer) ID() p2p.ID { return "" }
107+
func (ip *imaginaryPeer) RemoteIP() net.IP { return []byte{} }
108+
func (ip *imaginaryPeer) RemoteAddr() net.Addr { return nil }
109+
func (ip *imaginaryPeer) IsOutbound() bool { return true }
110+
func (ip *imaginaryPeer) CloseConn() error { return nil }
111+
func (ip *imaginaryPeer) IsPersistent() bool { return false }
112+
func (ip *imaginaryPeer) NodeInfo() p2p.NodeInfo { return nil }
113+
func (ip *imaginaryPeer) Status() conn.ConnectionStatus { return conn.ConnectionStatus{} }
114+
func (ip *imaginaryPeer) SocketAddr() *p2p.NetAddress { return nil }
115+
func (ip *imaginaryPeer) Send(byte, []byte) bool { return true }
116+
func (ip *imaginaryPeer) TrySend(byte, []byte) bool { return true }
117+
func (ip *imaginaryPeer) Set(key string, value any) {}
118+
func (ip *imaginaryPeer) Get(key string) any { return nil }
119+
func (ip *imaginaryPeer) SetRemovalFailed() {}
120+
func (ip *imaginaryPeer) GetRemovalFailed() bool { return false }
121+
func (ip *imaginaryPeer) Metrics() *p2p.Metrics { return p2p.NopMetrics() }
122+
func (ip *imaginaryPeer) ValueToMetricLabel(i any) string { return "" }

‎p2p/conn/connection.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -865,7 +865,8 @@ func (ch *Channel) recvPacketMsg(packet tmp2p.PacketMsg) ([]byte, error) {
865865
}
866866
ch.recving = append(ch.recving, packet.Data...)
867867
if packet.EOF {
868-
msgBytes := ch.recving
868+
msgBytes := make([]byte, len(ch.recving))
869+
copy(msgBytes, ch.recving)
869870

870871
// clear the slice without re-allocating.
871872
// http://stackoverflow.com/questions/16971741/how-do-you-clear-a-slice-in-go

‎p2p/mock/reactor.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ type Reactor struct {
1414

1515
func NewReactor() *Reactor {
1616
r := &Reactor{}
17-
r.BaseReactor = *p2p.NewBaseReactor("Mock-PEX", r)
17+
r.BaseReactor = *p2p.NewBaseReactor("Mock-PEX", r, p2p.WithIncomingQueueSize(1))
1818
r.SetLogger(log.TestingLogger())
1919
return r
2020
}

‎p2p/peer.go

+25-34
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package p2p
33
import (
44
"fmt"
55
"net"
6-
"reflect"
76
"time"
87

98
"github.com/gogo/protobuf/proto"
@@ -55,6 +54,13 @@ type Peer interface {
5554
GetRemovalFailed() bool
5655
}
5756

57+
type IntrospectivePeer interface {
58+
Peer
59+
Metrics() *Metrics
60+
ValueToMetricLabel(i any) string
61+
TraceClient() trace.Tracer
62+
}
63+
5864
type EnvelopeSender interface {
5965
SendEnvelope(Envelope) bool
6066
TrySendEnvelope(Envelope) bool
@@ -187,6 +193,10 @@ type peer struct {
187193
removalAttemptFailed bool
188194
}
189195

196+
func (p *peer) TraceClient() trace.Tracer {
197+
return p.traceClient
198+
}
199+
190200
type PeerOption func(*peer)
191201

192202
func WithPeerTracer(t trace.Tracer) PeerOption {
@@ -200,7 +210,7 @@ func newPeer(
200210
mConfig cmtconn.MConnConfig,
201211
nodeInfo NodeInfo,
202212
reactorsByCh map[byte]Reactor,
203-
msgTypeByChID map[byte]proto.Message,
213+
_ map[byte]proto.Message,
204214
chDescs []*cmtconn.ChannelDescriptor,
205215
onPeerError func(Peer, interface{}),
206216
mlc *metricsLabelCache,
@@ -221,7 +231,6 @@ func newPeer(
221231
pc.conn,
222232
p,
223233
reactorsByCh,
224-
msgTypeByChID,
225234
chDescs,
226235
onPeerError,
227236
mConfig,
@@ -246,6 +255,14 @@ func (p *peer) String() string {
246255
//---------------------------------------------------
247256
// Implements service.Service
248257

258+
func (p *peer) Metrics() *Metrics {
259+
return p.metrics
260+
}
261+
262+
func (p *peer) ValueToMetricLabel(i any) string {
263+
return p.mlc.ValueToMetricLabel(i)
264+
}
265+
249266
// SetLogger implements BaseService.
250267
func (p *peer) SetLogger(l log.Logger) {
251268
p.Logger = l
@@ -539,7 +556,6 @@ func createMConnection(
539556
conn net.Conn,
540557
p *peer,
541558
reactorsByCh map[byte]Reactor,
542-
msgTypeByChID map[byte]proto.Message,
543559
chDescs []*cmtconn.ChannelDescriptor,
544560
onPeerError func(Peer, interface{}),
545561
config cmtconn.MConnConfig,
@@ -552,37 +568,12 @@ func createMConnection(
552568
// which does onPeerError.
553569
panic(fmt.Sprintf("Unknown channel %X", chID))
554570
}
555-
mt := msgTypeByChID[chID]
556-
msg := proto.Clone(mt)
557-
err := proto.Unmarshal(msgBytes, msg)
558-
if err != nil {
559-
panic(fmt.Errorf("unmarshaling message: %s into type: %s", err, reflect.TypeOf(mt)))
560-
}
561571

562-
if w, ok := msg.(Unwrapper); ok {
563-
msg, err = w.Unwrap()
564-
if err != nil {
565-
panic(fmt.Errorf("unwrapping message: %s", err))
566-
}
567-
}
568-
569-
labels := []string{
570-
"peer_id", string(p.ID()),
571-
"chID", fmt.Sprintf("%#x", chID),
572-
}
573-
574-
p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes)))
575-
p.metrics.MessageReceiveBytesTotal.With(append(labels, "message_type", p.mlc.ValueToMetricLabel(msg))...).Add(float64(len(msgBytes)))
576-
schema.WriteReceivedBytes(p.traceClient, string(p.ID()), chID, len(msgBytes))
577-
if nr, ok := reactor.(EnvelopeReceiver); ok {
578-
nr.ReceiveEnvelope(Envelope{
579-
ChannelID: chID,
580-
Src: p,
581-
Message: msg,
582-
})
583-
} else {
584-
reactor.Receive(chID, p, msgBytes)
585-
}
572+
reactor.QueueUnprocessedEnvelope(UnprocessedEnvelope{
573+
ChannelID: chID,
574+
Src: p,
575+
Message: msgBytes,
576+
})
586577
}
587578

588579
onError := func(r interface{}) {

‎p2p/types.go

+6
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@ type Envelope struct {
1616
ChannelID byte
1717
}
1818

19+
type UnprocessedEnvelope struct {
20+
Src IntrospectivePeer
21+
Message []byte
22+
ChannelID byte
23+
}
24+
1925
// Unwrapper is a Protobuf message that can contain a variety of inner messages
2026
// (e.g. via oneof fields). If a Channel's message type implements Unwrapper, the
2127
// p2p layer will automatically unwrap inbound messages so that reactors do not have to do this themselves.

‎statesync/reactor.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ const (
2424
ChunkChannel = byte(0x61)
2525
// recentSnapshots is the number of recent snapshots to send and receive per peer.
2626
recentSnapshots = 10
27+
28+
// ReactorIncomingMessageQueueSize the size of the reactor's message queue.
29+
ReactorIncomingMessageQueueSize = 100
2730
)
2831

2932
// Reactor handles state sync, both restoring snapshots for the local node and serving snapshots
@@ -55,7 +58,7 @@ func NewReactor(
5558
conn: conn,
5659
connQuery: connQuery,
5760
}
58-
r.BaseReactor = *p2p.NewBaseReactor("StateSync", r)
61+
r.BaseReactor = *p2p.NewBaseReactor("StateSync", r, p2p.WithIncomingQueueSize(ReactorIncomingMessageQueueSize))
5962

6063
return r
6164
}

0 commit comments

Comments
 (0)
Please sign in to comment.