Skip to content

Commit 1b4d37d

Browse files
authored
Publish Tx Events (cosmos#193)
1 parent f64d8e4 commit 1b4d37d

File tree

5 files changed

+104
-6
lines changed

5 files changed

+104
-6
lines changed

block/manager.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ func NewManager(
6565
mempool mempool.Mempool,
6666
proxyApp proxy.AppConnConsensus,
6767
dalc da.DataAvailabilityLayerClient,
68+
eventBus *tmtypes.EventBus,
6869
logger log.Logger,
6970
) (*Manager, error) {
7071
s, err := getInitialState(store, genesis)
@@ -77,7 +78,7 @@ func NewManager(
7778
return nil, err
7879
}
7980

80-
exec := state.NewBlockExecutor(proposerAddress, conf.NamespaceID, genesis.ChainID, mempool, proxyApp, logger)
81+
exec := state.NewBlockExecutor(proposerAddress, conf.NamespaceID, genesis.ChainID, mempool, proxyApp, eventBus, logger)
8182
if s.LastBlockHeight+1 == genesis.InitialHeight {
8283
res, err := exec.InitChain(genesis)
8384
if err != nil {

block/manager_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func TestInitialState(t *testing.T) {
7272
assert := assert.New(t)
7373
logger := log.TestingLogger()
7474
dalc := getMockDALC(logger)
75-
agg, err := NewManager(key, conf, c.genesis, c.store, nil, nil, dalc, logger)
75+
agg, err := NewManager(key, conf, c.genesis, c.store, nil, nil, dalc, nil, logger)
7676
assert.NoError(err)
7777
assert.NotNil(agg)
7878
assert.Equal(c.expectedChainID, agg.lastState.ChainID)

node/node.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func NewNode(ctx context.Context, conf config.NodeConfig, nodeKey crypto.PrivKey
101101
mp := mempool.NewCListMempool(llcfg.DefaultMempoolConfig(), proxyApp.Mempool(), 0)
102102
mpIDs := newMempoolIDs()
103103

104-
blockManager, err := block.NewManager(nodeKey, conf.BlockManagerConfig, genesis, s, mp, proxyApp.Consensus(), dalc, logger.With("module", "BlockManager"))
104+
blockManager, err := block.NewManager(nodeKey, conf.BlockManagerConfig, genesis, s, mp, proxyApp.Consensus(), dalc, eventBus, logger.With("module", "BlockManager"))
105105
if err != nil {
106106
return nil, fmt.Errorf("BlockManager initialization error: %w", err)
107107
}

state/executor.go

+50-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
1212
"github.com/tendermint/tendermint/proxy"
1313
tmtypes "github.com/tendermint/tendermint/types"
14+
"go.uber.org/multierr"
1415

1516
abciconv "github.com/celestiaorg/optimint/conv/abci"
1617
"github.com/celestiaorg/optimint/log"
@@ -26,18 +27,21 @@ type BlockExecutor struct {
2627
proxyApp proxy.AppConnConsensus
2728
mempool mempool.Mempool
2829

30+
eventBus *tmtypes.EventBus
31+
2932
logger log.Logger
3033
}
3134

3235
// NewBlockExecutor creates new instance of BlockExecutor.
3336
// Proposer address and namespace ID will be used in all newly created blocks.
34-
func NewBlockExecutor(proposerAddress []byte, namespaceID [8]byte, chainID string, mempool mempool.Mempool, proxyApp proxy.AppConnConsensus, logger log.Logger) *BlockExecutor {
37+
func NewBlockExecutor(proposerAddress []byte, namespaceID [8]byte, chainID string, mempool mempool.Mempool, proxyApp proxy.AppConnConsensus, eventBus *tmtypes.EventBus, logger log.Logger) *BlockExecutor {
3538
return &BlockExecutor{
3639
proposerAddress: proposerAddress,
3740
namespaceID: namespaceID,
3841
chainID: chainID,
3942
proxyApp: proxyApp,
4043
mempool: mempool,
44+
eventBus: eventBus,
4145
logger: logger,
4246
}
4347
}
@@ -129,6 +133,11 @@ func (e *BlockExecutor) ApplyBlock(ctx context.Context, state State, block *type
129133

130134
copy(state.AppHash[:], appHash[:])
131135

136+
err = e.publishEvents(resp, block)
137+
if err != nil {
138+
e.logger.Error("failed to fire block events", "error", err)
139+
}
140+
132141
return state, retainHeight, nil
133142
}
134143

@@ -271,6 +280,46 @@ func (e *BlockExecutor) getLastCommitHash(lastCommit *types.Commit, header *type
271280
return lastABCICommit.Hash()
272281
}
273282

283+
func (e *BlockExecutor) publishEvents(resp *tmstate.ABCIResponses, block *types.Block) error {
284+
if e.eventBus == nil {
285+
return nil
286+
}
287+
288+
abciBlock, err := abciconv.ToABCIBlock(block)
289+
if err != nil {
290+
return err
291+
}
292+
293+
err = multierr.Append(err, e.eventBus.PublishEventNewBlock(tmtypes.EventDataNewBlock{
294+
Block: abciBlock,
295+
ResultBeginBlock: *resp.BeginBlock,
296+
ResultEndBlock: *resp.EndBlock,
297+
}))
298+
err = multierr.Append(err, e.eventBus.PublishEventNewBlockHeader(tmtypes.EventDataNewBlockHeader{
299+
Header: abciBlock.Header,
300+
NumTxs: int64(len(abciBlock.Txs)),
301+
ResultBeginBlock: *resp.BeginBlock,
302+
ResultEndBlock: *resp.EndBlock,
303+
}))
304+
for _, ev := range abciBlock.Evidence.Evidence {
305+
err = multierr.Append(err, e.eventBus.PublishEventNewEvidence(tmtypes.EventDataNewEvidence{
306+
Evidence: ev,
307+
Height: int64(block.Header.Height),
308+
}))
309+
}
310+
for i, dtx := range resp.DeliverTxs {
311+
err = multierr.Append(err, e.eventBus.PublishEventTx(tmtypes.EventDataTx{
312+
TxResult: abci.TxResult{
313+
Height: int64(block.Header.Height),
314+
Index: uint32(i),
315+
Tx: abciBlock.Data.Txs[i],
316+
Result: *dtx,
317+
},
318+
}))
319+
}
320+
return err
321+
}
322+
274323
func toOptimintTxs(txs tmtypes.Txs) types.Txs {
275324
optiTxs := make(types.Txs, len(txs))
276325
for i := range txs {

state/executor_test.go

+50-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/rand"
66
"testing"
7+
"time"
78

89
"github.com/stretchr/testify/assert"
910
"github.com/stretchr/testify/mock"
@@ -12,7 +13,9 @@ import (
1213
abci "github.com/tendermint/tendermint/abci/types"
1314
cfg "github.com/tendermint/tendermint/config"
1415
"github.com/tendermint/tendermint/libs/log"
16+
"github.com/tendermint/tendermint/libs/pubsub/query"
1517
"github.com/tendermint/tendermint/proxy"
18+
tmtypes "github.com/tendermint/tendermint/types"
1619

1720
"github.com/celestiaorg/optimint/mempool"
1821
"github.com/celestiaorg/optimint/mocks"
@@ -35,7 +38,7 @@ func TestCreateBlock(t *testing.T) {
3538
nsID := [8]byte{1, 2, 3, 4, 5, 6, 7, 8}
3639

3740
mpool := mempool.NewCListMempool(cfg.DefaultMempoolConfig(), proxy.NewAppConnMempool(client), 0)
38-
executor := NewBlockExecutor([]byte("test address"), nsID, "test", mpool, proxy.NewAppConnConsensus(client), logger)
41+
executor := NewBlockExecutor([]byte("test address"), nsID, "test", mpool, proxy.NewAppConnConsensus(client), nil, logger)
3942

4043
state := State{}
4144
state.ConsensusParams.Block.MaxBytes = 100
@@ -91,7 +94,21 @@ func TestApplyBlock(t *testing.T) {
9194
chainID := "test"
9295

9396
mpool := mempool.NewCListMempool(cfg.DefaultMempoolConfig(), proxy.NewAppConnMempool(client), 0)
94-
executor := NewBlockExecutor([]byte("test address"), nsID, chainID, mpool, proxy.NewAppConnConsensus(client), logger)
97+
eventBus := tmtypes.NewEventBus()
98+
require.NoError(eventBus.Start())
99+
executor := NewBlockExecutor([]byte("test address"), nsID, chainID, mpool, proxy.NewAppConnConsensus(client), eventBus, logger)
100+
101+
txQuery, err := query.New("tm.event='Tx'")
102+
require.NoError(err)
103+
txSub, err := eventBus.Subscribe(context.Background(), "test", txQuery, 1000)
104+
require.NoError(err)
105+
require.NotNil(txSub)
106+
107+
headerQuery, err := query.New("tm.event='NewBlockHeader'")
108+
require.NoError(err)
109+
headerSub, err := eventBus.Subscribe(context.Background(), "test", headerQuery, 100)
110+
require.NoError(err)
111+
require.NotNil(headerSub)
95112

96113
state := State{}
97114
state.InitialHeight = 1
@@ -125,4 +142,35 @@ func TestApplyBlock(t *testing.T) {
125142
require.NoError(err)
126143
require.NotNil(newState)
127144
assert.Equal(int64(2), newState.LastBlockHeight)
145+
146+
// wait for at least 4 Tx events, for up to 3 second.
147+
// 3 seconds is a fail-scenario only
148+
timer := time.NewTimer(3 * time.Second)
149+
txs := make(map[int64]int)
150+
cnt := 0
151+
for cnt != 4 {
152+
select {
153+
case evt := <-txSub.Out():
154+
cnt++
155+
data, ok := evt.Data().(tmtypes.EventDataTx)
156+
assert.True(ok)
157+
assert.NotEmpty(data.Tx)
158+
txs[data.Height]++
159+
case <-timer.C:
160+
t.FailNow()
161+
}
162+
}
163+
assert.Zero(len(txSub.Out())) // expected exactly 4 Txs - channel should be empty
164+
assert.EqualValues(1, txs[1])
165+
assert.EqualValues(3, txs[2])
166+
167+
require.EqualValues(2, len(headerSub.Out()))
168+
for h := 1; h <= 2; h++ {
169+
evt := <-headerSub.Out()
170+
data, ok := evt.Data().(tmtypes.EventDataNewBlockHeader)
171+
assert.True(ok)
172+
if data.Header.Height == 2 {
173+
assert.EqualValues(3, data.NumTxs)
174+
}
175+
}
128176
}

0 commit comments

Comments
 (0)