Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: celestiaorg/celestia-core
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: v1.46.0-tm-v0.34.35
Choose a base ref
...
head repository: celestiaorg/celestia-core
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: v1.47.0-tm-v0.34.35
Choose a head ref
  • 1 commit
  • 14 files changed
  • 1 contributor

Commits on Feb 3, 2025

  1. Verified

    This commit was signed with the committer’s verified signature.
    evan-forbes Evan Forbes
    Copy the full SHA
    aa60931 View commit details
5 changes: 1 addition & 4 deletions blockchain/v0/reactor.go
Original file line number Diff line number Diff line change
@@ -30,9 +30,6 @@ const (
statusUpdateIntervalSeconds = 10
// check if we should switch to consensus reactor
switchToConsensusIntervalSeconds = 1

// ReactorIncomingMessageQueueSize the size of the reactor's message queue.
ReactorIncomingMessageQueueSize = 10
)

type consensusReactor interface {
@@ -95,7 +92,7 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *st
requestsCh: requestsCh,
errorsCh: errorsCh,
}
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR, p2p.WithIncomingQueueSize(ReactorIncomingMessageQueueSize))
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
return bcR
}

5 changes: 1 addition & 4 deletions blockchain/v1/reactor.go
Original file line number Diff line number Diff line change
@@ -24,9 +24,6 @@ const (

// ask for best height every 10s
statusUpdateIntervalSeconds = 10

// ReactorIncomingMessageQueueSize the size of the reactor's message queue.
ReactorIncomingMessageQueueSize = 10
)

var (
@@ -103,7 +100,7 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *st
}
fsm := NewFSM(startHeight, bcR)
bcR.fsm = fsm
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR, p2p.WithIncomingQueueSize(ReactorIncomingMessageQueueSize))
bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
// bcR.swReporter = behaviour.NewSwitchReporter(bcR.BaseReactor.Switch)

return bcR
4 changes: 0 additions & 4 deletions consensus/byzantine_test.go
Original file line number Diff line number Diff line change
@@ -610,7 +610,3 @@ func (br *ByzantineReactor) Receive(chID byte, p p2p.Peer, m []byte) {
br.reactor.Receive(chID, p, m)
}
func (br *ByzantineReactor) InitPeer(peer p2p.Peer) p2p.Peer { return peer }

func (br *ByzantineReactor) QueueUnprocessedEnvelope(e p2p.UnprocessedEnvelope) {
br.reactor.QueueUnprocessedEnvelope(e)
}
5 changes: 1 addition & 4 deletions consensus/reactor.go
Original file line number Diff line number Diff line change
@@ -34,9 +34,6 @@ const (

blocksToContributeToBecomeGoodPeer = 10000
votesToContributeToBecomeGoodPeer = 10000

// ReactorIncomingMessageQueueSize the size of the reactor's message queue.
ReactorIncomingMessageQueueSize = 1000
)

//-----------------------------------------------------------------------------
@@ -68,7 +65,7 @@ func NewReactor(consensusState *State, waitSync bool, options ...ReactorOption)
Metrics: NopMetrics(),
traceClient: trace.NoOpTracer(),
}
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR, p2p.WithIncomingQueueSize(ReactorIncomingMessageQueueSize))
conR.BaseReactor = *p2p.NewBaseReactor("Consensus", conR)

for _, option := range options {
option(conR)
11 changes: 2 additions & 9 deletions evidence/reactor.go
Original file line number Diff line number Diff line change
@@ -24,9 +24,6 @@ const (
broadcastEvidenceIntervalS = 10
// If a message fails wait this much before sending it again
peerRetryMessageIntervalMS = 100

// ReactorIncomingMessageQueueSize the size of the reactor's message queue.
ReactorIncomingMessageQueueSize = 1
)

// Reactor handles evpool evidence broadcasting amongst peers.
@@ -41,7 +38,7 @@ func NewReactor(evpool *Pool) *Reactor {
evR := &Reactor{
evpool: evpool,
}
evR.BaseReactor = *p2p.NewBaseReactor("Evidence", evR, p2p.WithIncomingQueueSize(ReactorIncomingMessageQueueSize))
evR.BaseReactor = *p2p.NewBaseReactor("Evidence", evR)
return evR
}

@@ -178,7 +175,7 @@ func (evR *Reactor) broadcastEvidenceRoutine(peer p2p.Peer) {

// Returns the message to send to the peer, or nil if the evidence is invalid for the peer.
// If message is nil, we should sleep and try again.
func (evR *Reactor) prepareEvidenceMessage(
func (evR Reactor) prepareEvidenceMessage(
peer p2p.Peer,
ev types.Evidence,
) (evis []types.Evidence) {
@@ -225,10 +222,6 @@ func (evR *Reactor) prepareEvidenceMessage(
return []types.Evidence{ev}
}

func (evR *Reactor) OnStop() {
evR.BaseReactor.OnStop()
}

// PeerState describes the state of a peer.
type PeerState interface {
GetHeight() int64
1 change: 0 additions & 1 deletion evidence/reactor_test.go
Original file line number Diff line number Diff line change
@@ -224,7 +224,6 @@ func TestReactorBroadcastEvidenceMemoryLeak(t *testing.T) {
r.AddPeer(p)

_ = sendEvidence(t, pool, val, 2)
r.OnStop()
}

// evidenceLogger is a TestingLogger which uses a different
5 changes: 3 additions & 2 deletions mempool/v0/reactor_test.go
Original file line number Diff line number Diff line change
@@ -88,8 +88,6 @@ func TestReactorConcurrency(t *testing.T) {

const numTxs = 5

reactors[0].mempool.config.Size = 10000
reactors[1].mempool.config.Size = 10000
for i := 0; i < 1000; i++ {
wg.Add(2)

@@ -121,6 +119,9 @@ func TestReactorConcurrency(t *testing.T) {
err := reactors[1].mempool.Update(1, []types.Tx{}, make([]*abci.ResponseDeliverTx, 0), nil, nil)
assert.NoError(t, err)
}()

// 1. flush the mempool
reactors[1].mempool.Flush()
}

wg.Wait()
151 changes: 2 additions & 149 deletions p2p/base_reactor.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,10 @@
package p2p

import (
"context"
"fmt"
"reflect"

"github.com/gogo/protobuf/proto"
"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/p2p/conn"
"github.com/tendermint/tendermint/pkg/trace/schema"
)

// ProcessorFunc is the message processor function type.
type ProcessorFunc func(context.Context, <-chan UnprocessedEnvelope) error

// Reactor is responsible for handling incoming messages on one or more
// Channel. Switch calls GetChannels when reactor is added to it. When a new
// peer joins our node, InitPeer and AddPeer are called. RemovePeer is called
@@ -60,15 +51,6 @@ type Reactor interface {
// Deprecated: Reactors looking to receive data from a peer should implement ReceiveEnvelope.
// Receive will be deprecated in favor of ReceiveEnvelope in v0.37.
Receive(chID byte, peer Peer, msgBytes []byte)

// QueueUnprocessedEnvelope is called by the switch when an unprocessed
// envelope is received. Unprocessed envelopes are immediately buffered in a
// queue to avoid blocking. Incoming messages are then passed to a
// processing function. The default processing function unmarshals the
// messages in the order the sender sent them and then calls Receive on the
// reactor. The queue size and the processing function can be changed via
// passing options to the base reactor.
QueueUnprocessedEnvelope(e UnprocessedEnvelope)
}

type EnvelopeReceiver interface {
@@ -86,147 +68,18 @@ type EnvelopeReceiver interface {
type BaseReactor struct {
service.BaseService // Provides Start, Stop, .Quit
Switch *Switch

incoming chan UnprocessedEnvelope

ctx context.Context
cancel context.CancelFunc
// processor is called with the incoming channel and is responsible for
// unmarshalling the messages and calling Receive on the reactor.
processor ProcessorFunc
}

type ReactorOptions func(*BaseReactor)

func NewBaseReactor(name string, impl Reactor, opts ...ReactorOptions) *BaseReactor {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
base := &BaseReactor{
ctx: ctx,
cancel: cancel,
func NewBaseReactor(name string, impl Reactor) *BaseReactor {
return &BaseReactor{
BaseService: *service.NewBaseService(nil, name, impl),
Switch: nil,
incoming: make(chan UnprocessedEnvelope, 100),
processor: DefaultProcessor(impl),
}
for _, opt := range opts {
opt(base)
}

go func() {
err := base.processor(ctx, base.incoming)
if err != nil {
err = base.Stop()
if err != nil {
panic(err)
}
}
}()

return base
}

// WithProcessor sets the processor function for the reactor. The processor
// function is called with the incoming channel and is responsible for
// unmarshalling the messages and calling Receive on the reactor.
func WithProcessor(processor ProcessorFunc) ReactorOptions {
return func(br *BaseReactor) {
br.processor = processor
}
}

// WithIncomingQueueSize sets the size of the incoming message queue for a
// reactor.
func WithIncomingQueueSize(size int) ReactorOptions {
return func(br *BaseReactor) {
br.incoming = make(chan UnprocessedEnvelope, size)
}
}

func (br *BaseReactor) SetSwitch(sw *Switch) {
br.Switch = sw
}

// QueueUnprocessedEnvelope is called by the switch when an unprocessed
// envelope is received. Unprocessed envelopes are immediately buffered in a
// queue to avoid blocking. The size of the queue can be changed by passing
// options to the base reactor.
func (br *BaseReactor) QueueUnprocessedEnvelope(e UnprocessedEnvelope) {
select {
// if the context is done, do nothing.
case <-br.ctx.Done():
// if not, add the item to the channel.
case br.incoming <- e:
}
}

func (br *BaseReactor) OnStop() {
br.cancel()
close(br.incoming)
}

// DefaultProcessor unmarshalls the message and calls Receive on the reactor.
// This preserves the sender's original order for all messages.
func DefaultProcessor(impl Reactor) func(context.Context, <-chan UnprocessedEnvelope) error {
implChannels := impl.GetChannels()

chIDs := make(map[byte]proto.Message, len(implChannels))
for _, chDesc := range implChannels {
chIDs[chDesc.ID] = chDesc.MessageType
}
return func(ctx context.Context, incoming <-chan UnprocessedEnvelope) error {
for {
select {
case <-ctx.Done():
return nil
case ue, ok := <-incoming:
if !ok {
// this means the channel was closed.
return nil
}
mt := chIDs[ue.ChannelID]

if mt == nil {
return fmt.Errorf("no message type registered for channel %d", ue.ChannelID)
}

msg := proto.Clone(mt)

err := proto.Unmarshal(ue.Message, msg)
if err != nil {
return fmt.Errorf("unmarshaling message: %v into type: %s resulted in error %w", msg, reflect.TypeOf(mt), err)
}

if w, ok := msg.(Unwrapper); ok {
msg, err = w.Unwrap()
if err != nil {
return fmt.Errorf("unwrapping message: %v", err)
}
}

labels := []string{
"peer_id", string(ue.Src.ID()),
"chID", fmt.Sprintf("%#x", ue.ChannelID),
}

ue.Src.Metrics().PeerReceiveBytesTotal.With(labels...).Add(float64(len(ue.Message)))
ue.Src.Metrics().MessageReceiveBytesTotal.With(append(labels, "message_type", ue.Src.ValueToMetricLabel(msg))...).Add(float64(len(ue.Message)))
schema.WriteReceivedBytes(ue.Src.TraceClient(), string(ue.Src.ID()), ue.ChannelID, len(ue.Message))

if nr, ok := impl.(EnvelopeReceiver); ok {
nr.ReceiveEnvelope(Envelope{
ChannelID: ue.ChannelID,
Src: ue.Src,
Message: msg,
})
} else {
impl.Receive(ue.ChannelID, ue.Src, ue.Message)
}
}
}
}
}

func (*BaseReactor) GetChannels() []*conn.ChannelDescriptor { return nil }
func (*BaseReactor) AddPeer(peer Peer) {}
func (*BaseReactor) RemovePeer(peer Peer, reason interface{}) {}
Loading