Skip to content

Commit bdf96fd

Browse files
committed
[CLOB-930] Add ctx.Done() support to shutdown, make start-up wait time configurable, and add support for domain sockets for gRPC server and gRPC web server.
ctx.Done() support and was added in 0.50 with cosmos#15041 server start up time was removed in 0.50 with cosmos#15041
1 parent b95c66d commit bdf96fd

File tree

6 files changed

+112
-64
lines changed

6 files changed

+112
-64
lines changed

server/grpc/grpc_web.go

+19-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package grpc
22

33
import (
44
"fmt"
5+
"net"
56
"net/http"
7+
"strings"
68
"time"
79

810
"github.com/improbable-eng/grpc-web/go/grpcweb"
@@ -23,24 +25,38 @@ func StartGRPCWeb(grpcSrv *grpc.Server, config config.Config) (*http.Server, err
2325
)
2426
}
2527

28+
var proto, addr string
29+
parts := strings.SplitN(config.GRPCWeb.Address, "://", 2)
30+
// Default to using 'tcp' to maintain backwards compatibility with configurations that don't specify
31+
// the network to use.
32+
if len(parts) != 2 {
33+
proto = "tcp"
34+
addr = config.GRPCWeb.Address
35+
} else {
36+
proto, addr = parts[0], parts[1]
37+
}
38+
listener, err := net.Listen(proto, addr)
39+
if err != nil {
40+
return nil, err
41+
}
42+
2643
wrappedServer := grpcweb.WrapServer(grpcSrv, options...)
2744
grpcWebSrv := &http.Server{
28-
Addr: config.GRPCWeb.Address,
2945
Handler: wrappedServer,
3046
ReadHeaderTimeout: 500 * time.Millisecond,
3147
}
3248

3349
errCh := make(chan error)
3450
go func() {
35-
if err := grpcWebSrv.ListenAndServe(); err != nil {
51+
if err := grpcWebSrv.Serve(listener); err != nil {
3652
errCh <- fmt.Errorf("[grpc] failed to serve: %w", err)
3753
}
3854
}()
3955

4056
select {
4157
case err := <-errCh:
4258
return nil, err
43-
case <-time.After(types.ServerStartTime): // assume server started successfully
59+
case <-time.After(time.Duration(types.ServerStartTime.Load())): // assume server started successfully
4460
return grpcWebSrv, nil
4561
}
4662
}

server/grpc/server.go

+18-7
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package grpc
33
import (
44
"fmt"
55
"net"
6+
"strings"
67
"time"
78

89
"google.golang.org/grpc"
@@ -18,7 +19,7 @@ import (
1819
)
1920

2021
// StartGRPCServer starts a gRPC server on the given address.
21-
func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) {
22+
func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, net.Addr, error) {
2223
maxSendMsgSize := cfg.MaxSendMsgSize
2324
if maxSendMsgSize == 0 {
2425
maxSendMsgSize = config.DefaultGRPCMaxSendMsgSize
@@ -53,16 +54,26 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config
5354
InterfaceRegistry: clientCtx.InterfaceRegistry,
5455
})
5556
if err != nil {
56-
return nil, err
57+
return nil, nil, err
5758
}
5859

5960
// Reflection allows external clients to see what services and methods
6061
// the gRPC server exposes.
6162
gogoreflection.Register(grpcSrv)
6263

63-
listener, err := net.Listen("tcp", cfg.Address)
64+
var proto, addr string
65+
parts := strings.SplitN(cfg.Address, "://", 2)
66+
// Default to using 'tcp' to maintain backwards compatibility with configurations that don't specify
67+
// the network to use.
68+
if len(parts) != 2 {
69+
proto = "tcp"
70+
addr = cfg.Address
71+
} else {
72+
proto, addr = parts[0], parts[1]
73+
}
74+
listener, err := net.Listen(proto, addr)
6475
if err != nil {
65-
return nil, err
76+
return nil, nil, err
6677
}
6778

6879
errCh := make(chan error)
@@ -75,10 +86,10 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config
7586

7687
select {
7788
case err := <-errCh:
78-
return nil, err
89+
return nil, nil, err
7990

80-
case <-time.After(types.ServerStartTime):
91+
case <-time.After(time.Duration(types.ServerStartTime.Load())):
8192
// assume server started successfully
82-
return grpcSrv, nil
93+
return grpcSrv, listener.Addr(), nil
8394
}
8495
}

server/start.go

+59-48
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package server
33
// DONTCOVER
44

55
import (
6+
"context"
67
"errors"
78
"fmt"
89
"net"
@@ -141,14 +142,14 @@ is performed. Note, when enabled, gRPC will also be automatically enabled.
141142
withTM, _ := cmd.Flags().GetBool(flagWithTendermint)
142143
if !withTM {
143144
serverCtx.Logger.Info("starting ABCI without Tendermint")
144-
return wrapCPUProfile(serverCtx, func() error {
145-
return startStandAlone(serverCtx, appCreator)
145+
return wrapCPUProfile(cmd.Context(), serverCtx, func() error {
146+
return startStandAlone(cmd.Context(), serverCtx, appCreator)
146147
})
147148
}
148149

149150
// amino is needed here for backwards compatibility of REST routes
150-
err = wrapCPUProfile(serverCtx, func() error {
151-
return startInProcess(serverCtx, clientCtx, appCreator)
151+
err = wrapCPUProfile(cmd.Context(), serverCtx, func() error {
152+
return startInProcess(cmd.Context(), serverCtx, clientCtx, appCreator)
152153
})
153154
errCode, ok := err.(ErrorCode)
154155
if !ok {
@@ -206,7 +207,7 @@ is performed. Note, when enabled, gRPC will also be automatically enabled.
206207
return cmd
207208
}
208209

209-
func startStandAlone(ctx *Context, appCreator types.AppCreator) error {
210+
func startStandAlone(parentCtx context.Context, ctx *Context, appCreator types.AppCreator) error {
210211
addr := ctx.Viper.GetString(flagAddress)
211212
transport := ctx.Viper.GetString(flagTransport)
212213
home := ctx.Viper.GetString(flags.FlagHome)
@@ -260,10 +261,10 @@ func startStandAlone(ctx *Context, appCreator types.AppCreator) error {
260261
}()
261262

262263
// Wait for SIGINT or SIGTERM signal
263-
return WaitForQuitSignals()
264+
return WaitForQuitSignals(parentCtx)
264265
}
265266

266-
func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.AppCreator) error {
267+
func startInProcess(parentCtx context.Context, ctx *Context, clientCtx client.Context, appCreator types.AppCreator) error {
267268
cfg := ctx.Config
268269
home := cfg.RootDir
269270

@@ -354,6 +355,32 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
354355
return err
355356
}
356357

358+
var (
359+
grpcSrv *grpc.Server
360+
grpcSrvAddr net.Addr
361+
grpcWebSrv *http.Server
362+
)
363+
364+
if config.GRPC.Enable {
365+
grpcSrv, grpcSrvAddr, err = servergrpc.StartGRPCServer(clientCtx, app, config.GRPC)
366+
if err != nil {
367+
return err
368+
}
369+
defer grpcSrv.Stop()
370+
if config.GRPCWeb.Enable {
371+
grpcWebSrv, err = servergrpc.StartGRPCWeb(grpcSrv, config)
372+
if err != nil {
373+
ctx.Logger.Error("failed to start grpc-web http server: ", err)
374+
return err
375+
}
376+
defer func() {
377+
if err := grpcWebSrv.Close(); err != nil {
378+
ctx.Logger.Error("failed to close grpc-web http server: ", err)
379+
}
380+
}()
381+
}
382+
}
383+
357384
var apiSrv *api.Server
358385
if config.API.Enable {
359386
genDoc, err := genDocProvider()
@@ -364,11 +391,6 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
364391
clientCtx := clientCtx.WithHomeDir(home).WithChainID(genDoc.ChainID)
365392

366393
if config.GRPC.Enable {
367-
_, port, err := net.SplitHostPort(config.GRPC.Address)
368-
if err != nil {
369-
return err
370-
}
371-
372394
maxSendMsgSize := config.GRPC.MaxSendMsgSize
373395
if maxSendMsgSize == 0 {
374396
maxSendMsgSize = serverconfig.DefaultGRPCMaxSendMsgSize
@@ -379,11 +401,10 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
379401
maxRecvMsgSize = serverconfig.DefaultGRPCMaxRecvMsgSize
380402
}
381403

382-
grpcAddress := fmt.Sprintf("127.0.0.1:%s", port)
383-
404+
grpcSrvAddrString := fmt.Sprintf("%s://%s", grpcSrvAddr.Network(), grpcSrvAddr.String())
384405
// If grpc is enabled, configure grpc client for grpc gateway.
385406
grpcClient, err := grpc.Dial(
386-
grpcAddress,
407+
grpcSrvAddrString,
387408
grpc.WithTransportCredentials(insecure.NewCredentials()),
388409
grpc.WithDefaultCallOptions(
389410
grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()),
@@ -396,7 +417,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
396417
}
397418

398419
clientCtx = clientCtx.WithGRPCClient(grpcClient)
399-
ctx.Logger.Debug("grpc client assigned to client context", "target", grpcAddress)
420+
ctx.Logger.Debug("grpc client assigned to client context", "target", grpcSrvAddrString)
400421
}
401422

402423
apiSrv = api.New(clientCtx, ctx.Logger.With("module", "api-server"))
@@ -416,40 +437,30 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
416437
case err := <-errCh:
417438
return err
418439

419-
case <-time.After(types.ServerStartTime): // assume server started successfully
420-
}
421-
}
422-
423-
var (
424-
grpcSrv *grpc.Server
425-
grpcWebSrv *http.Server
426-
)
427-
428-
if config.GRPC.Enable {
429-
grpcSrv, err = servergrpc.StartGRPCServer(clientCtx, app, config.GRPC)
430-
if err != nil {
431-
return err
432-
}
433-
defer grpcSrv.Stop()
434-
if config.GRPCWeb.Enable {
435-
grpcWebSrv, err = servergrpc.StartGRPCWeb(grpcSrv, config)
436-
if err != nil {
437-
ctx.Logger.Error("failed to start grpc-web http server: ", err)
438-
return err
439-
}
440-
defer func() {
441-
if err := grpcWebSrv.Close(); err != nil {
442-
ctx.Logger.Error("failed to close grpc-web http server: ", err)
443-
}
444-
}()
440+
case <-time.After(time.Duration(types.ServerStartTime.Load())): // assume server started successfully
445441
}
446442
}
447443

448444
// At this point it is safe to block the process if we're in gRPC only mode as
449445
// we do not need to start Rosetta or handle any Tendermint related processes.
450446
if gRPCOnly {
447+
// Fix application shutdown
448+
defer func() {
449+
_ = app.Close()
450+
451+
if traceWriterCleanup != nil {
452+
traceWriterCleanup()
453+
}
454+
455+
if apiSrv != nil {
456+
_ = apiSrv.Close()
457+
}
458+
459+
ctx.Logger.Info("exiting...")
460+
}()
461+
451462
// wait for signal capture and gracefully return
452-
return WaitForQuitSignals()
463+
return WaitForQuitSignals(parentCtx)
453464
}
454465

455466
var rosettaSrv crgserver.Server
@@ -498,7 +509,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
498509
case err := <-errCh:
499510
return err
500511

501-
case <-time.After(types.ServerStartTime): // assume server started successfully
512+
case <-time.After(time.Duration(types.ServerStartTime.Load())): // assume server started successfully
502513
}
503514
}
504515

@@ -520,7 +531,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
520531
}()
521532

522533
// wait for signal capture and gracefully return
523-
return WaitForQuitSignals()
534+
return WaitForQuitSignals(parentCtx)
524535
}
525536

526537
func startTelemetry(cfg serverconfig.Config) (*telemetry.Metrics, error) {
@@ -531,7 +542,7 @@ func startTelemetry(cfg serverconfig.Config) (*telemetry.Metrics, error) {
531542
}
532543

533544
// wrapCPUProfile runs callback in a goroutine, then wait for quit signals.
534-
func wrapCPUProfile(ctx *Context, callback func() error) error {
545+
func wrapCPUProfile(parentCtx context.Context, ctx *Context, callback func() error) error {
535546
if cpuProfile := ctx.Viper.GetString(flagCPUProfile); cpuProfile != "" {
536547
f, err := os.Create(cpuProfile)
537548
if err != nil {
@@ -561,8 +572,8 @@ func wrapCPUProfile(ctx *Context, callback func() error) error {
561572
case err := <-errCh:
562573
return err
563574

564-
case <-time.After(types.ServerStartTime):
575+
case <-time.After(time.Duration(types.ServerStartTime.Load())):
565576
}
566577

567-
return WaitForQuitSignals()
578+
return WaitForQuitSignals(parentCtx)
568579
}

server/types/app.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package types
33
import (
44
"encoding/json"
55
"io"
6+
"sync/atomic"
67
"time"
78

89
dbm "github.com/cometbft/cometbft-db"
@@ -22,7 +23,11 @@ import (
2223

2324
// ServerStartTime defines the time duration that the server need to stay running after startup
2425
// for the startup be considered successful
25-
const ServerStartTime = 5 * time.Second
26+
var ServerStartTime = atomic.Int64{}
27+
28+
func init() {
29+
ServerStartTime.Add(int64(5 * time.Second))
30+
}
2631

2732
type (
2833
// AppOptions defines an interface that is passed into an application

server/util.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package server
22

33
import (
4+
"context"
45
"errors"
56
"fmt"
67
"io"
@@ -381,11 +382,15 @@ func TrapSignal(cleanupFunc func()) {
381382
}
382383

383384
// WaitForQuitSignals waits for SIGINT and SIGTERM and returns.
384-
func WaitForQuitSignals() ErrorCode {
385+
func WaitForQuitSignals(ctx context.Context) error {
385386
sigs := make(chan os.Signal, 1)
386387
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
387-
sig := <-sigs
388-
return ErrorCode{Code: int(sig.(syscall.Signal)) + 128}
388+
select {
389+
case sig := <-sigs:
390+
return ErrorCode{Code: int(sig.(syscall.Signal)) + 128}
391+
case <-ctx.Done():
392+
return nil
393+
}
389394
}
390395

391396
// GetAppDBBackend gets the backend type to use for the application DBs.

testutil/network/util.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -89,14 +89,14 @@ func startInProcess(cfg Config, val *Validator) error {
8989
select {
9090
case err := <-errCh:
9191
return err
92-
case <-time.After(srvtypes.ServerStartTime): // assume server started successfully
92+
case <-time.After(time.Duration(srvtypes.ServerStartTime.Load())): // assume server started successfully
9393
}
9494

9595
val.api = apiSrv
9696
}
9797

9898
if val.AppConfig.GRPC.Enable {
99-
grpcSrv, err := servergrpc.StartGRPCServer(val.ClientCtx, app, val.AppConfig.GRPC)
99+
grpcSrv, _, err := servergrpc.StartGRPCServer(val.ClientCtx, app, val.AppConfig.GRPC)
100100
if err != nil {
101101
return err
102102
}

0 commit comments

Comments
 (0)