Skip to content

Commit 18a7372

Browse files
rach-idrootulp
andauthoredJan 14, 2025··
feat: support gRPC endpoints in core (#1513)
This is an implementation of a streaming API for blocks in core. Helps close celestiaorg/celestia-app#3421 but not sure it entirely closes it. It can easily be used: ```go package main import ( "context" "fmt" coregrpc "github.com/tendermint/tendermint/rpc/grpc" ) func main() { client := coregrpc.StartBlockAPIGRPCClient("tcp://localhost:9090") blockStreamer, err := client.BlockByHeight(context.Background(), &coregrpc.BlockByHeightRequest{Height: 2}) if err != nil { panic(err) } blockMeta, err := client.BlockMetaByHeight(context.Background(), &coregrpc.BlockMetaByHeightRequest{Height: 2}) if err != nil { panic(err) } parts := make([]*core.Part, 0) for i := 0; i < int(blockMeta.BlockMeta.BlockID.PartSetHeader.Total); i++ { resp, err := blockStreamer.Recv() if err != nil { panic(err) } parts = append(parts, resp.BlockPart) if resp.IsLast && i < int(blockMeta.BlockMeta.BlockID.PartSetHeader.Total)-1 { panic("couldn't get all parts") } else if resp.IsLast { break } } h := types.NewPartSetFromHeader(types.PartSetHeader{ Total: blockMeta.BlockMeta.BlockID.PartSetHeader.Total, Hash: blockMeta.BlockMeta.BlockID.PartSetHeader.Hash, }) for _, part := range parts { ok, err := h.AddPart(&types.Part{ Index: part.Index, Bytes: part.Bytes, Proof: merkle.Proof{ Total: part.Proof.Total, Index: part.Proof.Index, LeafHash: part.Proof.LeafHash, Aunts: part.Proof.Aunts, }, }) if err != nil { panic(err) } if !ok { panic("not okey") } } pbb := new(core.Block) bz, err := io.ReadAll(h.GetReader()) if err != nil { panic(err) } err = proto.Unmarshal(bz, pbb) if err != nil { panic(err) } block, err := types.BlockFromProto(pbb) if err != nil { panic(err) } fmt.Println(block) // get a commit commit, err := client.Commit(context.Background(), &coregrpc.CommitRequest{Height: 10}) if err != nil { panic(err) } fmt.Println(commit) // listen for new heights streamer, err := client.SubscribeNewHeights(context.Background(), &coregrpc.SubscribeNewHeightsRequest{}) if err != nil { panic(err) } for { resp, err := streamer.Recv() if err != nil { panic(err) } fmt.Println(resp) } } ``` Ps: I didn't add the tests because I didn't find a direct way of mocking the environment without polluting the rest of the repo (exporting some methods, adding new helpers, etc). And I think since the implementation is simple, just querying the block/state stores for results, it's fine to leave it untested. --------- Co-authored-by: Rootul P <[email protected]>
1 parent 1d9821a commit 18a7372

File tree

11 files changed

+9214
-777
lines changed

11 files changed

+9214
-777
lines changed
 

‎buf.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ lint:
99
- BASIC
1010
- FILE_LOWER_SNAKE_CASE
1111
- UNARY_RPC
12+
except:
13+
- RPC_NO_SERVER_STREAMING
1214
ignore:
1315
- gogoproto
1416
breaking:

‎proto/buf.yaml

+2
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,5 @@ lint:
1010
- BASIC
1111
- FILE_LOWER_SNAKE_CASE
1212
- UNARY_RPC
13+
except:
14+
- RPC_NO_SERVER_STREAMING

‎proto/tendermint/rpc/grpc/types.pb.go

+4,138-382
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎proto/tendermint/rpc/grpc/types.proto

+114
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@ package tendermint.rpc.grpc;
33
option go_package = "github.com/tendermint/tendermint/rpc/grpc;coregrpc";
44

55
import "tendermint/abci/types.proto";
6+
import "tendermint/types/types.proto";
7+
import "tendermint/p2p/types.proto";
8+
import "tendermint/crypto/keys.proto";
9+
import "tendermint/types/validator.proto";
10+
import "google/protobuf/timestamp.proto";
11+
import "gogoproto/gogo.proto";
612

713
//----------------------------------------
814
// Request types
@@ -13,6 +19,38 @@ message RequestBroadcastTx {
1319
bytes tx = 1;
1420
}
1521

22+
message BlockByHashRequest {
23+
bytes hash = 1;
24+
bool prove = 2;
25+
}
26+
27+
message BlockByHeightRequest {
28+
// Height the requested block height.
29+
// If height is equal to 0, the latest height stored in the block store
30+
// will be used.
31+
int64 height = 1;
32+
// Prove set to true to return the parts proofs.
33+
bool prove = 2;
34+
}
35+
36+
message CommitRequest {
37+
// Height the requested block commit height.
38+
// If height is equal to 0, the latest height stored in the block store
39+
// will be used.
40+
int64 height = 1;
41+
}
42+
43+
message ValidatorSetRequest {
44+
// Height the requested validator set height.
45+
// If height is equal to 0, the latest height stored in the block store
46+
// will be used.
47+
int64 height = 1;
48+
}
49+
50+
message SubscribeNewHeightsRequest {}
51+
52+
message StatusRequest {}
53+
1654
//----------------------------------------
1755
// Response types
1856

@@ -23,10 +61,86 @@ message ResponseBroadcastTx {
2361
tendermint.abci.ResponseDeliverTx deliver_tx = 2;
2462
}
2563

64+
message StreamedBlockByHashResponse {
65+
tendermint.types.Part block_part = 1;
66+
// Commit is only set in the first part, and
67+
// it stays nil in the remaining ones.
68+
tendermint.types.Commit commit = 2;
69+
// ValidatorSet is only set in the first part, and
70+
// it stays nil in the remaining ones.
71+
tendermint.types.ValidatorSet validator_set = 3;
72+
bool is_last = 4;
73+
}
74+
75+
message StreamedBlockByHeightResponse {
76+
tendermint.types.Part block_part = 1;
77+
// Commit is only set in the first part, and
78+
// it stays nil in the remaining ones.
79+
tendermint.types.Commit commit = 2;
80+
// ValidatorSet is only set in the first part, and
81+
// it stays nil in the remaining ones.
82+
tendermint.types.ValidatorSet validator_set = 3;
83+
bool is_last = 4;
84+
}
85+
86+
message CommitResponse {
87+
tendermint.types.Commit commit = 1;
88+
}
89+
90+
message ValidatorSetResponse {
91+
// ValidatorSet the requested validator set.
92+
tendermint.types.ValidatorSet validator_set = 1;
93+
// Height the height corresponding to the returned
94+
// validator set.
95+
int64 height = 2;
96+
}
97+
98+
message NewHeightEvent {
99+
int64 height = 1;
100+
bytes hash = 2;
101+
}
102+
103+
message StatusResponse {
104+
tendermint.p2p.DefaultNodeInfo node_info = 1;
105+
SyncInfo sync_info = 2;
106+
ValidatorInfo validator_info = 3;
107+
}
108+
109+
message SyncInfo {
110+
bytes latest_block_hash = 1;
111+
bytes latest_app_hash = 2;
112+
int64 latest_block_height = 3;
113+
google.protobuf.Timestamp latest_block_time = 4
114+
[(gogoproto.nullable) = false, (gogoproto.stdtime) = true];
115+
116+
bytes earliest_block_hash = 5;
117+
bytes earliest_app_hash = 6;
118+
int64 earliest_block_height = 7;
119+
google.protobuf.Timestamp earliest_block_time = 8
120+
[(gogoproto.nullable) = false, (gogoproto.stdtime) = true];
121+
122+
bool catching_up = 9;
123+
}
124+
125+
message ValidatorInfo {
126+
bytes address = 1;
127+
tendermint.crypto.PublicKey pub_key = 2;
128+
int64 voting_power = 3;
129+
}
130+
26131
//----------------------------------------
27132
// Service Definition
28133

29134
service BroadcastAPI {
30135
rpc Ping(RequestPing) returns (ResponsePing);
31136
rpc BroadcastTx(RequestBroadcastTx) returns (ResponseBroadcastTx);
32137
}
138+
139+
service BlockAPI {
140+
rpc BlockByHash(BlockByHashRequest) returns (stream StreamedBlockByHashResponse);
141+
rpc BlockByHeight(BlockByHeightRequest) returns (stream StreamedBlockByHeightResponse);
142+
rpc Commit(CommitRequest) returns (CommitResponse);
143+
rpc ValidatorSet(ValidatorSetRequest) returns (ValidatorSetResponse);
144+
rpc SubscribeNewHeights(SubscribeNewHeightsRequest) returns (stream NewHeightEvent);
145+
rpc Status(StatusRequest) returns (StatusResponse);
146+
}

‎rpc/core/status.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
// Status returns CometBFT status including node info, pubkey, latest block
1414
// hash, app hash, block height and time.
1515
// More: https://docs.cometbft.com/v0.34/rpc/#/Info/status
16-
func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) {
16+
func Status(_ *rpctypes.Context) (*ctypes.ResultStatus, error) {
1717
var (
1818
earliestBlockHeight int64
1919
earliestBlockHash cmtbytes.HexBytes

‎rpc/grpc/api.go

+384-1
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,24 @@ package coregrpc
22

33
import (
44
"context"
5+
"errors"
6+
"fmt"
7+
"sync"
8+
"time"
9+
10+
"github.com/tendermint/tendermint/libs/pubsub"
11+
12+
"github.com/tendermint/tendermint/crypto/encoding"
13+
14+
"github.com/tendermint/tendermint/proto/tendermint/crypto"
15+
16+
"github.com/tendermint/tendermint/libs/rand"
517

618
abci "github.com/tendermint/tendermint/abci/types"
7-
core "github.com/tendermint/tendermint/rpc/core"
19+
"github.com/tendermint/tendermint/proto/tendermint/types"
20+
"github.com/tendermint/tendermint/rpc/core"
821
rpctypes "github.com/tendermint/tendermint/rpc/jsonrpc/types"
22+
eventstypes "github.com/tendermint/tendermint/types"
923
)
1024

1125
type broadcastAPI struct {
@@ -37,3 +51,372 @@ func (bapi *broadcastAPI) BroadcastTx(ctx context.Context, req *RequestBroadcast
3751
},
3852
}, nil
3953
}
54+
55+
type BlockAPI struct {
56+
sync.Mutex
57+
heightListeners map[chan NewHeightEvent]struct{}
58+
newBlockSubscription eventstypes.Subscription
59+
subscriptionID string
60+
subscriptionQuery pubsub.Query
61+
}
62+
63+
func NewBlockAPI() *BlockAPI {
64+
return &BlockAPI{
65+
heightListeners: make(map[chan NewHeightEvent]struct{}, 1000),
66+
subscriptionID: fmt.Sprintf("block-api-subscription-%s", rand.Str(6)),
67+
subscriptionQuery: eventstypes.EventQueryNewBlock,
68+
}
69+
}
70+
71+
func (blockAPI *BlockAPI) StartNewBlockEventListener(ctx context.Context) error {
72+
env := core.GetEnvironment()
73+
if blockAPI.newBlockSubscription == nil {
74+
var err error
75+
blockAPI.newBlockSubscription, err = env.EventBus.Subscribe(
76+
ctx,
77+
blockAPI.subscriptionID,
78+
blockAPI.subscriptionQuery,
79+
500,
80+
)
81+
if err != nil {
82+
env.Logger.Error("Failed to subscribe to new blocks", "err", err)
83+
return err
84+
}
85+
}
86+
for {
87+
select {
88+
case <-ctx.Done():
89+
return nil
90+
case <-blockAPI.newBlockSubscription.Cancelled():
91+
env.Logger.Error("cancelled grpc subscription. retrying")
92+
ok, err := blockAPI.retryNewBlocksSubscription(ctx)
93+
if err != nil {
94+
return err
95+
}
96+
if !ok {
97+
// this will happen when the context is done. we can stop here
98+
return nil
99+
}
100+
case event, ok := <-blockAPI.newBlockSubscription.Out():
101+
if !ok {
102+
env.Logger.Error("new blocks subscription closed. re-subscribing")
103+
ok, err := blockAPI.retryNewBlocksSubscription(ctx)
104+
if err != nil {
105+
return err
106+
}
107+
if !ok {
108+
// this will happen when the context is done. we can stop here
109+
return nil
110+
}
111+
continue
112+
}
113+
newBlockEvent, ok := event.Events()[eventstypes.EventTypeKey]
114+
if !ok || len(newBlockEvent) == 0 || newBlockEvent[0] != eventstypes.EventNewBlock {
115+
continue
116+
}
117+
data, ok := event.Data().(eventstypes.EventDataNewBlock)
118+
if !ok {
119+
env.Logger.Error("couldn't cast event data to new block")
120+
return fmt.Errorf("couldn't cast event data to new block. Events: %s", event.Events())
121+
}
122+
blockAPI.broadcastToListeners(ctx, data.Block.Height, data.Block.Hash())
123+
}
124+
}
125+
}
126+
127+
// RetryAttempts the number of retry times when the subscription is closed.
128+
const RetryAttempts = 6
129+
130+
// SubscriptionCapacity the maximum number of pending blocks in the subscription.
131+
const SubscriptionCapacity = 500
132+
133+
func (blockAPI *BlockAPI) retryNewBlocksSubscription(ctx context.Context) (bool, error) {
134+
env := core.GetEnvironment()
135+
ticker := time.NewTicker(time.Second)
136+
defer ticker.Stop()
137+
blockAPI.Lock()
138+
defer blockAPI.Unlock()
139+
for i := 1; i < RetryAttempts; i++ {
140+
select {
141+
case <-ctx.Done():
142+
return false, nil
143+
case <-ticker.C:
144+
var err error
145+
blockAPI.newBlockSubscription, err = env.EventBus.Subscribe(
146+
ctx,
147+
fmt.Sprintf("block-api-subscription-%s", rand.Str(6)),
148+
blockAPI.subscriptionQuery,
149+
SubscriptionCapacity,
150+
)
151+
if err != nil {
152+
env.Logger.Error("Failed to subscribe to new blocks. retrying", "err", err, "retry_number", i)
153+
} else {
154+
return true, nil
155+
}
156+
}
157+
}
158+
return false, errors.New("couldn't recover from failed blocks subscription. stopping listeners")
159+
}
160+
161+
func (blockAPI *BlockAPI) broadcastToListeners(ctx context.Context, height int64, hash []byte) {
162+
blockAPI.Lock()
163+
defer blockAPI.Unlock()
164+
for ch := range blockAPI.heightListeners {
165+
func() {
166+
defer func() {
167+
if r := recover(); r != nil {
168+
// logging the error then removing the heights listener
169+
core.GetEnvironment().Logger.Debug("failed to write to heights listener", "err", r)
170+
blockAPI.removeHeightListener(ch)
171+
}
172+
}()
173+
select {
174+
case <-ctx.Done():
175+
return
176+
case ch <- NewHeightEvent{Height: height, Hash: hash}:
177+
}
178+
}()
179+
}
180+
}
181+
182+
func (blockAPI *BlockAPI) addHeightListener() chan NewHeightEvent {
183+
blockAPI.Lock()
184+
defer blockAPI.Unlock()
185+
ch := make(chan NewHeightEvent, 50)
186+
blockAPI.heightListeners[ch] = struct{}{}
187+
return ch
188+
}
189+
190+
func (blockAPI *BlockAPI) removeHeightListener(ch chan NewHeightEvent) {
191+
blockAPI.Lock()
192+
defer blockAPI.Unlock()
193+
delete(blockAPI.heightListeners, ch)
194+
}
195+
196+
func (blockAPI *BlockAPI) closeAllListeners() {
197+
blockAPI.Lock()
198+
defer blockAPI.Unlock()
199+
if blockAPI.heightListeners == nil {
200+
// if this is nil, then there is no need to close anything
201+
return
202+
}
203+
for channel := range blockAPI.heightListeners {
204+
delete(blockAPI.heightListeners, channel)
205+
}
206+
}
207+
208+
// Stop cleans up the BlockAPI instance by closing all listeners
209+
// and ensuring no further events are processed.
210+
func (blockAPI *BlockAPI) Stop(ctx context.Context) error {
211+
blockAPI.Lock()
212+
defer blockAPI.Unlock()
213+
214+
// close all height listeners
215+
blockAPI.closeAllListeners()
216+
217+
var err error
218+
// stop the events subscription
219+
if blockAPI.newBlockSubscription != nil {
220+
err = core.GetEnvironment().EventBus.Unsubscribe(ctx, blockAPI.subscriptionID, blockAPI.subscriptionQuery)
221+
blockAPI.newBlockSubscription = nil
222+
}
223+
224+
core.GetEnvironment().Logger.Info("gRPC streaming API has been stopped")
225+
return err
226+
}
227+
228+
func (blockAPI *BlockAPI) BlockByHash(req *BlockByHashRequest, stream BlockAPI_BlockByHashServer) error {
229+
blockStore := core.GetEnvironment().BlockStore
230+
blockMeta := blockStore.LoadBlockMetaByHash(req.Hash)
231+
if blockMeta == nil {
232+
return fmt.Errorf("nil block meta for block hash %d", req.Hash)
233+
}
234+
commit := blockStore.LoadBlockCommit(blockMeta.Header.Height)
235+
if commit == nil {
236+
return fmt.Errorf("nil commit for block hash %d", req.Hash)
237+
}
238+
protoCommit := commit.ToProto()
239+
240+
validatorSet, err := core.GetEnvironment().StateStore.LoadValidators(blockMeta.Header.Height)
241+
if err != nil {
242+
return err
243+
}
244+
protoValidatorSet, err := validatorSet.ToProto()
245+
if err != nil {
246+
return err
247+
}
248+
249+
for i := 0; i < int(blockMeta.BlockID.PartSetHeader.Total); i++ {
250+
part, err := blockStore.LoadBlockPart(blockMeta.Header.Height, i).ToProto()
251+
if err != nil {
252+
return err
253+
}
254+
if part == nil {
255+
return fmt.Errorf("nil block part %d for block hash %d", i, req.Hash)
256+
}
257+
if !req.Prove {
258+
part.Proof = crypto.Proof{}
259+
}
260+
isLastPart := i == int(blockMeta.BlockID.PartSetHeader.Total)-1
261+
resp := StreamedBlockByHashResponse{
262+
BlockPart: part,
263+
IsLast: isLastPart,
264+
}
265+
if i == 0 {
266+
resp.ValidatorSet = protoValidatorSet
267+
resp.Commit = protoCommit
268+
}
269+
err = stream.Send(&resp)
270+
if err != nil {
271+
return err
272+
}
273+
}
274+
return nil
275+
}
276+
277+
func (blockAPI *BlockAPI) BlockByHeight(req *BlockByHeightRequest, stream BlockAPI_BlockByHeightServer) error {
278+
blockStore := core.GetEnvironment().BlockStore
279+
height := req.Height
280+
if height == 0 {
281+
height = blockStore.Height()
282+
}
283+
284+
blockMeta := blockStore.LoadBlockMeta(height)
285+
if blockMeta == nil {
286+
return fmt.Errorf("nil block meta for height %d", height)
287+
}
288+
289+
commit := blockStore.LoadSeenCommit(height)
290+
if commit == nil {
291+
return fmt.Errorf("nil block commit for height %d", height)
292+
}
293+
protoCommit := commit.ToProto()
294+
295+
validatorSet, err := core.GetEnvironment().StateStore.LoadValidators(height)
296+
if err != nil {
297+
return err
298+
}
299+
protoValidatorSet, err := validatorSet.ToProto()
300+
if err != nil {
301+
return err
302+
}
303+
304+
for i := 0; i < int(blockMeta.BlockID.PartSetHeader.Total); i++ {
305+
part, err := blockStore.LoadBlockPart(height, i).ToProto()
306+
if err != nil {
307+
return err
308+
}
309+
if part == nil {
310+
return fmt.Errorf("nil block part %d for height %d", i, height)
311+
}
312+
if !req.Prove {
313+
part.Proof = crypto.Proof{}
314+
}
315+
isLastPart := i == int(blockMeta.BlockID.PartSetHeader.Total)-1
316+
resp := StreamedBlockByHeightResponse{
317+
BlockPart: part,
318+
IsLast: isLastPart,
319+
}
320+
if i == 0 {
321+
resp.ValidatorSet = protoValidatorSet
322+
resp.Commit = protoCommit
323+
}
324+
err = stream.Send(&resp)
325+
if err != nil {
326+
return err
327+
}
328+
}
329+
return nil
330+
}
331+
332+
func (blockAPI *BlockAPI) Status(_ context.Context, _ *StatusRequest) (*StatusResponse, error) {
333+
status, err := core.Status(nil)
334+
if err != nil {
335+
return nil, err
336+
}
337+
338+
protoPubKey, err := encoding.PubKeyToProto(status.ValidatorInfo.PubKey)
339+
if err != nil {
340+
return nil, err
341+
}
342+
return &StatusResponse{
343+
NodeInfo: status.NodeInfo.ToProto(),
344+
SyncInfo: &SyncInfo{
345+
LatestBlockHash: status.SyncInfo.LatestBlockHash,
346+
LatestAppHash: status.SyncInfo.LatestAppHash,
347+
LatestBlockHeight: status.SyncInfo.LatestBlockHeight,
348+
LatestBlockTime: status.SyncInfo.LatestBlockTime,
349+
EarliestBlockHash: status.SyncInfo.EarliestBlockHash,
350+
EarliestAppHash: status.SyncInfo.EarliestAppHash,
351+
EarliestBlockHeight: status.SyncInfo.EarliestBlockHeight,
352+
EarliestBlockTime: status.SyncInfo.EarliestBlockTime,
353+
CatchingUp: status.SyncInfo.CatchingUp,
354+
},
355+
ValidatorInfo: &ValidatorInfo{
356+
Address: status.ValidatorInfo.Address,
357+
PubKey: &protoPubKey,
358+
VotingPower: status.ValidatorInfo.VotingPower,
359+
},
360+
}, nil
361+
}
362+
363+
func (blockAPI *BlockAPI) Commit(_ context.Context, req *CommitRequest) (*CommitResponse, error) {
364+
blockStore := core.GetEnvironment().BlockStore
365+
height := req.Height
366+
if height == 0 {
367+
height = blockStore.Height()
368+
}
369+
commit := blockStore.LoadSeenCommit(height)
370+
if commit == nil {
371+
return nil, fmt.Errorf("nil block commit for height %d", height)
372+
}
373+
protoCommit := commit.ToProto()
374+
375+
return &CommitResponse{
376+
Commit: &types.Commit{
377+
Height: protoCommit.Height,
378+
Round: protoCommit.Round,
379+
BlockID: protoCommit.BlockID,
380+
Signatures: protoCommit.Signatures,
381+
},
382+
}, nil
383+
}
384+
385+
func (blockAPI *BlockAPI) ValidatorSet(_ context.Context, req *ValidatorSetRequest) (*ValidatorSetResponse, error) {
386+
blockStore := core.GetEnvironment().BlockStore
387+
height := req.Height
388+
if height == 0 {
389+
height = blockStore.Height()
390+
}
391+
validatorSet, err := core.GetEnvironment().StateStore.LoadValidators(height)
392+
if err != nil {
393+
return nil, err
394+
}
395+
protoValidatorSet, err := validatorSet.ToProto()
396+
if err != nil {
397+
return nil, err
398+
}
399+
return &ValidatorSetResponse{
400+
ValidatorSet: protoValidatorSet,
401+
Height: height,
402+
}, nil
403+
}
404+
405+
func (blockAPI *BlockAPI) SubscribeNewHeights(_ *SubscribeNewHeightsRequest, stream BlockAPI_SubscribeNewHeightsServer) error {
406+
heightListener := blockAPI.addHeightListener()
407+
defer blockAPI.removeHeightListener(heightListener)
408+
409+
for {
410+
select {
411+
case event, ok := <-heightListener:
412+
if !ok {
413+
return errors.New("blocks subscription closed from the service side")
414+
}
415+
if err := stream.Send(&event); err != nil {
416+
return err
417+
}
418+
case <-stream.Context().Done():
419+
return nil
420+
}
421+
}
422+
}

‎rpc/grpc/client_server.go

+39-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package coregrpc
33
import (
44
"net"
55

6+
"github.com/tendermint/tendermint/rpc/core"
7+
68
"golang.org/x/net/context"
79
"google.golang.org/grpc"
810
"google.golang.org/grpc/credentials/insecure"
@@ -21,7 +23,26 @@ type Config struct {
2123
func StartGRPCServer(ln net.Listener) error {
2224
grpcServer := grpc.NewServer()
2325
RegisterBroadcastAPIServer(grpcServer, &broadcastAPI{})
24-
return grpcServer.Serve(ln)
26+
api := NewBlockAPI()
27+
RegisterBlockAPIServer(grpcServer, api)
28+
errCh := make(chan error, 2)
29+
ctx, cancel := context.WithCancel(context.Background())
30+
defer cancel()
31+
go func() {
32+
errCh <- api.StartNewBlockEventListener(ctx)
33+
}()
34+
go func() {
35+
errCh <- grpcServer.Serve(ln)
36+
}()
37+
defer grpcServer.GracefulStop()
38+
defer func(api *BlockAPI, ctx context.Context) {
39+
err := api.Stop(ctx)
40+
if err != nil {
41+
core.GetEnvironment().Logger.Error("error stopping block api", "err", err)
42+
}
43+
}(api, ctx)
44+
// blocks until one errors or returns nil
45+
return <-errCh
2546
}
2647

2748
// StartGRPCClient dials the gRPC server using protoAddr and returns a new
@@ -34,6 +55,23 @@ func StartGRPCClient(protoAddr string) BroadcastAPIClient {
3455
return NewBroadcastAPIClient(conn)
3556
}
3657

58+
// StartBlockAPIGRPCClient dials the gRPC server using protoAddr and returns a new
59+
// BlockAPIClient.
60+
func StartBlockAPIGRPCClient(protoAddr string, opts ...grpc.DialOption) (BlockAPIClient, error) {
61+
if len(opts) == 0 {
62+
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
63+
}
64+
opts = append(opts, grpc.WithContextDialer(dialerFunc))
65+
conn, err := grpc.Dial( //nolint:staticcheck
66+
protoAddr,
67+
opts...,
68+
)
69+
if err != nil {
70+
return nil, err
71+
}
72+
return NewBlockAPIClient(conn), nil
73+
}
74+
3775
func dialerFunc(ctx context.Context, addr string) (net.Conn, error) {
3876
return cmtnet.Connect(addr)
3977
}

‎rpc/grpc/grpc_test.go

+366
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,19 @@
1+
//nolint:dupl
12
package coregrpc_test
23

34
import (
45
"context"
56
"os"
67
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/assert"
11+
"github.com/tendermint/tendermint/libs/rand"
12+
"github.com/tendermint/tendermint/proto/tendermint/crypto"
13+
rpcclient "github.com/tendermint/tendermint/rpc/client"
14+
rpchttp "github.com/tendermint/tendermint/rpc/client/http"
15+
"github.com/tendermint/tendermint/rpc/core"
16+
"github.com/tendermint/tendermint/types"
717

818
"github.com/stretchr/testify/require"
919

@@ -33,3 +43,359 @@ func TestBroadcastTx(t *testing.T) {
3343
require.EqualValues(t, 0, res.CheckTx.Code)
3444
require.EqualValues(t, 0, res.DeliverTx.Code)
3545
}
46+
47+
func setupClient(t *testing.T) core_grpc.BlockAPIClient {
48+
client, err := rpctest.GetBlockAPIClient()
49+
require.NoError(t, err)
50+
return client
51+
}
52+
53+
func TestBlockByHash(t *testing.T) {
54+
client := setupClient(t)
55+
waitForHeight(t, 2)
56+
expectedBlockMeta := core.GetEnvironment().BlockStore.LoadBlockMeta(1)
57+
require.NotNil(t, expectedBlockMeta)
58+
59+
// query the block along with the part proofs
60+
res, err := client.BlockByHash(context.Background(), &core_grpc.BlockByHashRequest{
61+
Hash: expectedBlockMeta.BlockID.Hash,
62+
Prove: true,
63+
})
64+
require.NoError(t, err)
65+
66+
part, err := res.Recv()
67+
require.NoError(t, err)
68+
69+
require.NotNil(t, part.BlockPart)
70+
require.NotNil(t, part.ValidatorSet)
71+
require.NotNil(t, part.Commit)
72+
73+
assert.NotEqual(t, part.BlockPart.Proof, crypto.Proof{})
74+
assert.Equal(t, part.Commit.Height, expectedBlockMeta.Header.Height)
75+
76+
// query the block along without the part proofs
77+
res, err = client.BlockByHash(context.Background(), &core_grpc.BlockByHashRequest{
78+
Hash: expectedBlockMeta.BlockID.Hash,
79+
Prove: false,
80+
})
81+
require.NoError(t, err)
82+
83+
part, err = res.Recv()
84+
require.NoError(t, err)
85+
86+
require.NotNil(t, part.BlockPart)
87+
require.NotNil(t, part.ValidatorSet)
88+
require.NotNil(t, part.Commit)
89+
90+
assert.Equal(t, part.BlockPart.Proof, crypto.Proof{})
91+
assert.Equal(t, part.Commit.Height, expectedBlockMeta.Header.Height)
92+
}
93+
94+
func TestCommit(t *testing.T) {
95+
client := setupClient(t)
96+
waitForHeight(t, 2)
97+
expectedBlockCommit := core.GetEnvironment().BlockStore.LoadSeenCommit(1)
98+
99+
res, err := client.Commit(context.Background(), &core_grpc.CommitRequest{
100+
Height: 1,
101+
})
102+
require.NoError(t, err)
103+
104+
assert.Equal(t, expectedBlockCommit.BlockID.Hash.Bytes(), res.Commit.BlockID.Hash)
105+
}
106+
107+
func TestLatestCommit(t *testing.T) {
108+
client := setupClient(t)
109+
waitForHeight(t, 3)
110+
111+
res, err := client.Commit(context.Background(), &core_grpc.CommitRequest{
112+
Height: 0,
113+
})
114+
require.NoError(t, err)
115+
116+
assert.Greater(t, res.Commit.Height, int64(2))
117+
}
118+
119+
func TestValidatorSet(t *testing.T) {
120+
client := setupClient(t)
121+
waitForHeight(t, 2)
122+
expectedValidatorSet, err := core.GetEnvironment().StateStore.LoadValidators(1)
123+
require.NoError(t, err)
124+
125+
res, err := client.ValidatorSet(context.Background(), &core_grpc.ValidatorSetRequest{
126+
Height: 1,
127+
})
128+
require.NoError(t, err)
129+
130+
assert.Equal(t, len(expectedValidatorSet.Validators), len(res.ValidatorSet.Validators))
131+
}
132+
133+
func TestLatestValidatorSet(t *testing.T) {
134+
client := setupClient(t)
135+
waitForHeight(t, 3)
136+
137+
res, err := client.ValidatorSet(context.Background(), &core_grpc.ValidatorSetRequest{
138+
Height: 0,
139+
})
140+
require.NoError(t, err)
141+
142+
assert.Greater(t, res.Height, int64(2))
143+
}
144+
145+
func TestStatus(t *testing.T) {
146+
client := setupClient(t)
147+
expectedStatus, err := core.Status(nil)
148+
require.NoError(t, err)
149+
150+
res, err := client.Status(context.Background(), &core_grpc.StatusRequest{})
151+
require.NoError(t, err)
152+
assert.Equal(t, string(expectedStatus.NodeInfo.DefaultNodeID), res.NodeInfo.DefaultNodeID)
153+
}
154+
155+
func TestSubscribeNewHeights(t *testing.T) {
156+
client := setupClient(t)
157+
ctx, cancel := context.WithCancel(context.Background())
158+
defer cancel()
159+
stream, err := client.SubscribeNewHeights(ctx, &core_grpc.SubscribeNewHeightsRequest{})
160+
require.NoError(t, err)
161+
store := core.GetEnvironment().BlockStore
162+
163+
go func() {
164+
listenedHeightsCount := 0
165+
defer func() {
166+
assert.Greater(t, listenedHeightsCount, 0)
167+
}()
168+
for {
169+
res, err := stream.Recv()
170+
if ctx.Err() != nil {
171+
return
172+
}
173+
require.NoError(t, err)
174+
require.Greater(t, res.Height, int64(0))
175+
assert.Equal(t, store.LoadBlockMeta(res.Height).BlockID.Hash.Bytes(), res.Hash)
176+
listenedHeightsCount++
177+
}
178+
}()
179+
180+
time.Sleep(5 * time.Second)
181+
}
182+
183+
func TestBlockByHash_Streaming(t *testing.T) {
184+
client := setupClient(t)
185+
186+
// send a big transaction that would result in a block
187+
// containing multiple block parts
188+
txRes, err := rpctest.GetGRPCClient().BroadcastTx(
189+
context.Background(),
190+
&core_grpc.RequestBroadcastTx{Tx: rand.NewRand().Bytes(1000000)},
191+
)
192+
require.NoError(t, err)
193+
require.EqualValues(t, 0, txRes.CheckTx.Code)
194+
require.EqualValues(t, 0, txRes.DeliverTx.Code)
195+
196+
var expectedBlockMeta types.BlockMeta
197+
for i := int64(1); i < 500; i++ {
198+
waitForHeight(t, i+1)
199+
blockMeta := core.GetEnvironment().BlockStore.LoadBlockMeta(i)
200+
if blockMeta.BlockID.PartSetHeader.Total > 1 {
201+
expectedBlockMeta = *blockMeta
202+
break
203+
}
204+
}
205+
206+
// query the block without the part proofs
207+
res, err := client.BlockByHash(context.Background(), &core_grpc.BlockByHashRequest{
208+
Hash: expectedBlockMeta.BlockID.Hash,
209+
Prove: false,
210+
})
211+
require.NoError(t, err)
212+
213+
part1, err := res.Recv()
214+
require.NoError(t, err)
215+
216+
require.NotNil(t, part1.BlockPart)
217+
require.NotNil(t, part1.ValidatorSet)
218+
require.NotNil(t, part1.Commit)
219+
220+
assert.Equal(t, part1.BlockPart.Proof, crypto.Proof{})
221+
assert.Equal(t, part1.Commit.Height, expectedBlockMeta.Header.Height)
222+
223+
part2, err := res.Recv()
224+
require.NoError(t, err)
225+
226+
require.NotNil(t, part2.BlockPart)
227+
require.Nil(t, part2.ValidatorSet)
228+
require.Nil(t, part2.Commit)
229+
230+
assert.Equal(t, part2.BlockPart.Proof, crypto.Proof{})
231+
232+
// query the block along with the part proofs
233+
res, err = client.BlockByHash(context.Background(), &core_grpc.BlockByHashRequest{
234+
Hash: expectedBlockMeta.BlockID.Hash,
235+
Prove: true,
236+
})
237+
require.NoError(t, err)
238+
239+
part1, err = res.Recv()
240+
require.NoError(t, err)
241+
242+
require.NotNil(t, part1.BlockPart)
243+
require.NotNil(t, part1.ValidatorSet)
244+
require.NotNil(t, part1.Commit)
245+
246+
assert.NotEqual(t, part1.BlockPart.Proof, crypto.Proof{})
247+
assert.Equal(t, part1.Commit.Height, expectedBlockMeta.Header.Height)
248+
249+
part2, err = res.Recv()
250+
require.NoError(t, err)
251+
252+
require.NotNil(t, part2.BlockPart)
253+
require.Nil(t, part2.ValidatorSet)
254+
require.Nil(t, part2.Commit)
255+
256+
assert.NotEqual(t, part2.BlockPart.Proof, crypto.Proof{})
257+
}
258+
259+
func TestBlockByHeight(t *testing.T) {
260+
client := setupClient(t)
261+
waitForHeight(t, 2)
262+
expectedBlockMeta := core.GetEnvironment().BlockStore.LoadBlockMeta(1)
263+
264+
// query the block along with the part proofs
265+
res, err := client.BlockByHeight(context.Background(), &core_grpc.BlockByHeightRequest{
266+
Height: expectedBlockMeta.Header.Height,
267+
Prove: true,
268+
})
269+
require.NoError(t, err)
270+
271+
part, err := res.Recv()
272+
require.NoError(t, err)
273+
274+
require.NotNil(t, part.BlockPart)
275+
require.NotNil(t, part.ValidatorSet)
276+
require.NotNil(t, part.Commit)
277+
278+
assert.NotEqual(t, part.BlockPart.Proof, crypto.Proof{})
279+
assert.Equal(t, part.Commit.Height, expectedBlockMeta.Header.Height)
280+
281+
// query the block along without the part proofs
282+
res, err = client.BlockByHeight(context.Background(), &core_grpc.BlockByHeightRequest{
283+
Height: expectedBlockMeta.Header.Height,
284+
Prove: false,
285+
})
286+
require.NoError(t, err)
287+
288+
part, err = res.Recv()
289+
require.NoError(t, err)
290+
291+
require.NotNil(t, part.BlockPart)
292+
require.NotNil(t, part.ValidatorSet)
293+
require.NotNil(t, part.Commit)
294+
295+
assert.Equal(t, part.BlockPart.Proof, crypto.Proof{})
296+
assert.Equal(t, part.Commit.Height, expectedBlockMeta.Header.Height)
297+
}
298+
299+
func TestLatestBlockByHeight(t *testing.T) {
300+
client := setupClient(t)
301+
waitForHeight(t, 2)
302+
303+
// query the block along with the part proofs
304+
res, err := client.BlockByHeight(context.Background(), &core_grpc.BlockByHeightRequest{
305+
Height: 0,
306+
})
307+
require.NoError(t, err)
308+
309+
part, err := res.Recv()
310+
require.NoError(t, err)
311+
312+
require.NotNil(t, part.BlockPart)
313+
require.NotNil(t, part.ValidatorSet)
314+
require.NotNil(t, part.Commit)
315+
316+
assert.Greater(t, part.Commit.Height, int64(2))
317+
}
318+
319+
func TestBlockQuery_Streaming(t *testing.T) {
320+
client := setupClient(t)
321+
322+
// send a big transaction that would result in a block
323+
// containing multiple block parts
324+
txRes, err := rpctest.GetGRPCClient().BroadcastTx(
325+
context.Background(),
326+
&core_grpc.RequestBroadcastTx{Tx: rand.NewRand().Bytes(1000000)},
327+
)
328+
require.NoError(t, err)
329+
require.EqualValues(t, 0, txRes.CheckTx.Code)
330+
require.EqualValues(t, 0, txRes.DeliverTx.Code)
331+
332+
var expectedBlockMeta types.BlockMeta
333+
for i := int64(1); i < 500; i++ {
334+
waitForHeight(t, i+1)
335+
blockMeta := core.GetEnvironment().BlockStore.LoadBlockMeta(i)
336+
if blockMeta.BlockID.PartSetHeader.Total > 1 {
337+
expectedBlockMeta = *blockMeta
338+
break
339+
}
340+
}
341+
342+
// query the block without the part proofs
343+
res, err := client.BlockByHeight(context.Background(), &core_grpc.BlockByHeightRequest{
344+
Height: expectedBlockMeta.Header.Height,
345+
Prove: false,
346+
})
347+
require.NoError(t, err)
348+
349+
part1, err := res.Recv()
350+
require.NoError(t, err)
351+
352+
require.NotNil(t, part1.BlockPart)
353+
require.NotNil(t, part1.ValidatorSet)
354+
require.NotNil(t, part1.Commit)
355+
356+
assert.Equal(t, part1.BlockPart.Proof, crypto.Proof{})
357+
assert.Equal(t, part1.Commit.Height, expectedBlockMeta.Header.Height)
358+
359+
part2, err := res.Recv()
360+
require.NoError(t, err)
361+
362+
require.NotNil(t, part2.BlockPart)
363+
require.Nil(t, part2.ValidatorSet)
364+
require.Nil(t, part2.Commit)
365+
366+
assert.Equal(t, part2.BlockPart.Proof, crypto.Proof{})
367+
368+
// query the block along with the part proofs
369+
res, err = client.BlockByHeight(context.Background(), &core_grpc.BlockByHeightRequest{
370+
Height: expectedBlockMeta.Header.Height,
371+
Prove: true,
372+
})
373+
require.NoError(t, err)
374+
375+
part1, err = res.Recv()
376+
require.NoError(t, err)
377+
378+
require.NotNil(t, part1.BlockPart)
379+
require.NotNil(t, part1.ValidatorSet)
380+
require.NotNil(t, part1.Commit)
381+
382+
assert.NotEqual(t, part1.BlockPart.Proof, crypto.Proof{})
383+
assert.Equal(t, part1.Commit.Height, expectedBlockMeta.Header.Height)
384+
385+
part2, err = res.Recv()
386+
require.NoError(t, err)
387+
388+
require.NotNil(t, part2.BlockPart)
389+
require.Nil(t, part2.ValidatorSet)
390+
require.Nil(t, part2.Commit)
391+
392+
assert.NotEqual(t, part2.BlockPart.Proof, crypto.Proof{})
393+
}
394+
395+
func waitForHeight(t *testing.T, height int64) {
396+
rpcAddr := rpctest.GetConfig().RPC.ListenAddress
397+
c, err := rpchttp.New(rpcAddr, "/websocket")
398+
require.NoError(t, err)
399+
err = rpcclient.WaitForHeight(c, height, nil)
400+
require.NoError(t, err)
401+
}

‎rpc/grpc/types.pb.go

+4,138-382
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎rpc/test/helpers.go

+9
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,15 @@ func GetGRPCClient() core_grpc.BroadcastAPIClient {
117117
return core_grpc.StartGRPCClient(grpcAddr)
118118
}
119119

120+
func GetBlockAPIClient() (core_grpc.BlockAPIClient, error) {
121+
grpcAddr := globalConfig.RPC.GRPCListenAddress
122+
client, err := core_grpc.StartBlockAPIGRPCClient(grpcAddr)
123+
if err != nil {
124+
return nil, err
125+
}
126+
return client, nil
127+
}
128+
120129
// StartTendermint starts a test CometBFT server in a go routine and returns when it is initialized
121130
func StartTendermint(app abci.Application, opts ...func(*Options)) *nm.Node {
122131
nodeOpts := defaultOptions

‎types/part_set.go

+21-10
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,27 @@ func (ps *PartSet) Total() uint32 {
271271
}
272272

273273
func (ps *PartSet) AddPart(part *Part) (bool, error) {
274+
if part == nil {
275+
return false, fmt.Errorf("nil part")
276+
}
277+
278+
// The proof should be compatible with the number of parts.
279+
if part.Proof.Total != int64(ps.total) {
280+
return false, ErrPartSetInvalidProof
281+
}
282+
283+
// Check hash proof
284+
if part.Proof.Verify(ps.Hash(), part.Bytes) != nil {
285+
return false, ErrPartSetInvalidProof
286+
}
287+
288+
return ps.AddPartWithoutProof(part)
289+
}
290+
291+
func (ps *PartSet) AddPartWithoutProof(part *Part) (bool, error) {
292+
if part == nil {
293+
return false, fmt.Errorf("nil part")
294+
}
274295
if ps == nil {
275296
return false, nil
276297
}
@@ -287,16 +308,6 @@ func (ps *PartSet) AddPart(part *Part) (bool, error) {
287308
return false, nil
288309
}
289310

290-
// The proof should be compatible with the number of parts.
291-
if part.Proof.Total != int64(ps.total) {
292-
return false, ErrPartSetInvalidProof
293-
}
294-
295-
// Check hash proof
296-
if part.Proof.Verify(ps.Hash(), part.Bytes) != nil {
297-
return false, ErrPartSetInvalidProof
298-
}
299-
300311
// Add part
301312
ps.parts[part.Index] = part
302313
ps.partsBitArray.SetIndex(int(part.Index), true)

0 commit comments

Comments
 (0)
Please sign in to comment.