Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(abci): ignore comet ctx and use app ctx #2568

Merged
merged 71 commits into from
Mar 6, 2025
Merged
Show file tree
Hide file tree
Changes from 68 commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
e7f0b79
Enforce synchronicity of final FCU and always return error on engine …
shotes Feb 6, 2025
a3796f5
nit
abi87 Feb 6, 2025
da52ea0
extended error
abi87 Feb 7, 2025
dbd75e1
Add error to force FCU
shotes Feb 6, 2025
3dc865d
Add retryWithTimeout
shotes Feb 7, 2025
26f4c7d
retry on unknown
shotes Feb 7, 2025
8bc54e9
comments around engine API error handling
shotes Feb 7, 2025
3b91793
Add lib for backoff func
shotes Feb 7, 2025
d20bd0d
shorten retry initial interval
shotes Feb 7, 2025
35296a1
lint and remove unused return value
shotes Feb 7, 2025
4687f00
Undo extra changes
shotes Feb 19, 2025
00ef155
remove extra return
shotes Feb 19, 2025
db423b2
Merge branch 'main' into engine-api-retry
calbera Feb 20, 2025
b58fced
Merge branch 'main' into engine-api-retry
calbera Feb 25, 2025
62b5838
Merge branch 'main' into engine-api-retry
shotes Feb 25, 2025
015698c
Merge branch 'main' into engine-api-retry
shotes Feb 28, 2025
194ba50
Merge branch 'main' into engine-api-retry
abi87 Feb 28, 2025
4e49106
Use configured values instead of consts
shotes Feb 28, 2025
2639b6b
lint
shotes Feb 28, 2025
8ec64f3
Add RPCRetryInterval flag
shotes Feb 28, 2025
f9096cb
remove debug print
shotes Feb 28, 2025
9b1255f
add to config and reorder
shotes Feb 28, 2025
b6e92ab
Add max interval and infinite retries
shotes Feb 28, 2025
5303fd5
Organize comments and increase max interval
shotes Feb 28, 2025
dc34dc9
More explicit comments
shotes Feb 28, 2025
d51d636
Condense NotifyNewPayload for funlen lint
shotes Feb 28, 2025
2069930
Allow new payload during beginning sync:
shotes Feb 28, 2025
2798898
lint
shotes Feb 28, 2025
48f6f7d
remove extraneous test files
shotes Feb 28, 2025
8dc24a5
Fix mock
shotes Feb 28, 2025
ab124a9
Add infinite duration
shotes Mar 1, 2025
5f9fe91
Make syncing status WARN
shotes Mar 1, 2025
5df3171
Merge branch 'main' of github.com:berachain/beacon-kit into engine-ap…
shotes Mar 4, 2025
69d67fd
remove extraneous file and generate mock
shotes Mar 4, 2025
156ba5f
Fix test unit cover
shotes Mar 4, 2025
0144133
Add cancellable ctx to cometbft used for stopping the service
fridrik01 Feb 8, 2025
a871e7b
Reuse cancellable context
shotes Mar 4, 2025
8a86207
Clean up comments and logging
shotes Mar 4, 2025
dcf36f0
undo IDE auto correct change
shotes Mar 4, 2025
42fa262
Make comments better
shotes Mar 4, 2025
aa7b9f7
lint
shotes Mar 4, 2025
0265130
combine ctxs
shotes Mar 4, 2025
a7b367b
reduce diff
shotes Mar 4, 2025
adc58d0
reduce diff more
shotes Mar 4, 2025
f8032c4
set max elapsed time to 0 and nits
shotes Mar 4, 2025
347cf4b
Add nil check for combineContexts for tests
shotes Mar 4, 2025
b0da96e
Merge branch 'main' into engine-api-retry
shotes Mar 4, 2025
7c22253
Merge branch 'main' into engine-api-retry
abi87 Mar 5, 2025
7c39a3f
Fixes for context cancellation:
shotes Mar 5, 2025
d422a6d
fix linter?
shotes Mar 5, 2025
73b0444
Merge branch 'engine-api-retry' into abci_context_cancel
calbera Mar 5, 2025
d375414
reorg context defining
shotes Mar 5, 2025
b225f6f
Merge branch 'abci_context_cancel' of github.com:berachain/beacon-kit…
shotes Mar 5, 2025
a1cf15a
Merge branch 'main' into engine-api-retry
rezbera Mar 5, 2025
16feb52
Merge branch 'engine-api-retry' into abci_context_cancel
rezbera Mar 5, 2025
5899cb5
Add chan to kill goroutine
shotes Mar 5, 2025
8ff93eb
lint and fix done case
shotes Mar 5, 2025
17e2094
delete extra file
shotes Mar 5, 2025
a0658c3
Add fatal errors
shotes Mar 5, 2025
d7d7449
Add IsNonFatalError
shotes Mar 5, 2025
38d5b09
Merge branch 'engine-api-retry' into abci_context_cancel
shotes Mar 5, 2025
75b5ad6
add more timeout for test and ignore temp-test-simulated.txt
shotes Mar 5, 2025
b26a276
Remove combineContext
shotes Mar 5, 2025
f8b3cbe
Check context and error in commit
shotes Mar 6, 2025
6416fb7
move ctx checks to higher level service
shotes Mar 6, 2025
994b472
lint
shotes Mar 6, 2025
a4e630d
logs and nits
abi87 Mar 6, 2025
b387d0b
Merge branch 'engine-api-retry' into abci_context_cancel
abi87 Mar 6, 2025
aea72aa
Merge branch 'main' into abci_context_cancel
abi87 Mar 6, 2025
b25d97d
check s.Ctx.Err on IniChain as well
abi87 Mar 6, 2025
f1c22ab
simplified consensus functions to avoid passing s.ctx as parameter
abi87 Mar 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ coverage-test-unit-cover.txt
coverage-merged.txt
test-simulated.txt
test-unit-cover.txt
temp-test-simulated.txt
.vercel

# server dev env for Air
Expand Down
21 changes: 5 additions & 16 deletions beacon/blockchain/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,21 +103,10 @@ func (s *Service) forceSyncUponFinalize(
return err
}

switch err := s.executionEngine.NotifyNewPayload(ctx, payloadReq); {
case err == nil:
// Do nothing and move on to NotifyForkchoiceUpdate.

case errors.IsAny(err,
engineerrors.ErrSyncingPayloadStatus,
engineerrors.ErrAcceptedPayloadStatus):
// Don't return error here, because we want to send the forkchoice update regardless.
s.logger.Warn("pushed new payload to SYNCING node during force startup",
"error", err,
"blockNum", executionPayload.GetNumber(),
"blockHash", executionPayload.GetBlockHash(),
)

default:
// We set retryOnSyncingStatus to false here. We can ignore SYNCING status and proceed
// to the FCU.
err := s.executionEngine.NotifyNewPayload(ctx, payloadReq, false)
if err != nil {
return fmt.Errorf("startSyncUponFinalize NotifyNewPayload failed: %w", err)
}

Expand All @@ -132,7 +121,7 @@ func (s *Service) forceSyncUponFinalize(
s.chainSpec.ActiveForkVersionForSlot(beaconBlock.GetSlot()),
)

switch _, err := s.executionEngine.NotifyForkchoiceUpdate(ctx, req); {
switch _, err = s.executionEngine.NotifyForkchoiceUpdate(ctx, req); {
case err == nil:
return nil

Expand Down
3 changes: 2 additions & 1 deletion beacon/blockchain/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,11 @@ type BlobSidecars[T any] interface {

// ExecutionEngine is the interface for the execution engine.
type ExecutionEngine interface {
// NotifyNewPayload notifies the execution client of new payload
// NotifyNewPayload notifies the execution client of new payload.
NotifyNewPayload(
ctx context.Context,
req *ctypes.NewPayloadRequest,
retryOnSyncingStatus bool,
) error
// NotifyForkchoiceUpdate notifies the execution client of a forkchoice
// update.
Expand Down
8 changes: 8 additions & 0 deletions cli/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const (
engineRoot = beaconKitRoot + "engine."
RPCDialURL = engineRoot + "rpc-dial-url"
RPCRetries = engineRoot + "rpc-retries"
RPCRetryInterval = engineRoot + "rpc-retry-interval"
RPCMaxRetryInterval = engineRoot + "rpc-max-retry-interval"
RPCTimeout = engineRoot + "rpc-timeout"
RPCStartupCheckInterval = engineRoot + "rpc-startup-check-interval"
RPCHealthCheckInteval = engineRoot + "rpc-health-check-interval"
Expand Down Expand Up @@ -92,6 +94,12 @@ func AddBeaconKitFlags(startCmd *cobra.Command) {
startCmd.Flags().Uint64(
RPCRetries, defaultCfg.Engine.RPCRetries, "rpc retries",
)
startCmd.Flags().Duration(
RPCRetryInterval, defaultCfg.Engine.RPCRetryInterval, "initial rpc retry interval",
)
startCmd.Flags().Duration(
RPCMaxRetryInterval, defaultCfg.Engine.RPCMaxRetryInterval, "max rpc retry interval",
)
startCmd.Flags().Duration(
RPCTimeout, defaultCfg.Engine.RPCTimeout, "rpc timeout",
)
Expand Down
52 changes: 42 additions & 10 deletions consensus/cometbft/service/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,28 @@ var (
)

func (s *Service) InitChain(
ctx context.Context,
_ context.Context,
req *cmtabci.InitChainRequest,
) (*cmtabci.InitChainResponse, error) {
return s.initChain(ctx, req)
//nolint:contextcheck // see s.ctx comment for more details
return s.initChain(s.ctx, req)
}

// PrepareProposal implements the PrepareProposal ABCI method and returns a
// ResponsePrepareProposal object to the client.
func (s *Service) PrepareProposal(
ctx context.Context,
_ context.Context,
req *cmtabci.PrepareProposalRequest,
) (*cmtabci.PrepareProposalResponse, error) {
return s.prepareProposal(ctx, req)
// Check if ctx is still good. CometBFT does not check this.
if s.ctx.Err() != nil {
// If the context is getting cancelled, we are shutting down.
// It is ok returning an empty proposal.
//nolint:nilerr // explicitly allowing this case
return &cmtabci.PrepareProposalResponse{Txs: req.Txs}, nil
}
//nolint:contextcheck // see s.ctx comment for more details
return s.prepareProposal(s.ctx, req)
}

func (s *Service) Info(context.Context,
Expand Down Expand Up @@ -77,17 +86,33 @@ func (s *Service) Info(context.Context,
// ProcessProposal implements the ProcessProposal ABCI method and returns a
// ResponseProcessProposal object to the client.
func (s *Service) ProcessProposal(
ctx context.Context,
_ context.Context,
req *cmtabci.ProcessProposalRequest,
) (*cmtabci.ProcessProposalResponse, error) {
return s.processProposal(ctx, req)
// Check if ctx is still good. CometBFT does not check this.
if s.ctx.Err() != nil {
// Node will panic on context cancel with "CONSENSUS FAILURE!!!" due to
// returning an error. This is expected. We do not want to accept or
// reject a proposal based on incomplete data.
// Returning PROCESS_PROPOSAL_STATUS_UNKNOWN will also result in comet panic.
return nil, s.ctx.Err()
}
//nolint:contextcheck // see s.ctx comment for more details
return s.processProposal(s.ctx, req)
}

func (s *Service) FinalizeBlock(
ctx context.Context,
_ context.Context,
req *cmtabci.FinalizeBlockRequest,
) (*cmtabci.FinalizeBlockResponse, error) {
return s.finalizeBlock(ctx, req)
// Check if ctx is still good. CometBFT does not check this.
if s.ctx.Err() != nil {
// Node will panic on context cancel with "CONSENSUS FAILURE!!!" due to error.
// We expect this to happen and do not want to finalize any incomplete or invalid state.
return nil, s.ctx.Err()
}
//nolint:contextcheck // see s.ctx comment for more details
return s.finalizeBlock(s.ctx, req)
}

// Commit implements the ABCI interface. It will commit all state that exists in
Expand All @@ -98,9 +123,16 @@ func (s *Service) FinalizeBlock(
// against that height and gracefully halt if it matches the latest committed
// height.
func (s *Service) Commit(
ctx context.Context, req *cmtabci.CommitRequest,
_ context.Context, req *cmtabci.CommitRequest,
) (*cmtabci.CommitResponse, error) {
return s.commit(ctx, req)
// Check if ctx is still good. CometBFT does not check this.
if s.ctx.Err() != nil {
// Node will panic on context cancel with "CONSENSUS FAILURE!!!" due to error.
// We expect this to happen and do not want to commit any incomplete or invalid state.
return nil, s.ctx.Err()
}
//nolint:contextcheck // see s.ctx comment for more details
return s.commit(s.ctx, req)
}

//
Expand Down
3 changes: 3 additions & 0 deletions consensus/cometbft/service/finalize_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ func (s *Service) finalizeBlockInternal(
// CometBFT.
if s.finalizeBlockState == nil {
s.finalizeBlockState = s.resetState(ctx)
} else {
// Preserve the CosmosSDK context while using the correct base ctx.
s.finalizeBlockState.SetContext(s.finalizeBlockState.Context().WithContext(ctx))
Copy link
Contributor

@rezbera rezbera Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bit unsure on this. Why was introducing this needed?

Copy link
Contributor Author

@shotes shotes Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the cosmos SDK context, which I will refer to as cosmosCtx, contains a reference to the baseCtx, which is the regular golang context.Context. This cosmosCtx also stores a bunch of data in it for our state and commit multi store and whatnot, but we don't want to affect any of that.

When we resetState(ctx), we are creating a new cosmosCtx, where baseCtx = ctx.

If we do not resetState(ctx), then the already existing finalizeBlockState is used. This state has the old cosmosCtx, which has the old ctx from whichever other function is was set in, likely the previous ProcessProposal. We don't mind using the old cosmosCtx, but we want to make sure we are using the new ctx that is passed in for this function. So we can simply set baseCtx = ctx here.

Indeed, this is not as big of a problem now, since all of the contexts are the same and the context isn't getting canceled inbetween each ABCI++ call like it was in a previous iteration of this PR. However, purely for correctness, we want to make sure our s.Blockchain.FinalizeBlock(s.finalizeBlockState.Context()) is using the ctx passed in via finalizeBlockInteral(ctx, ...) and NOT the ctx from some other function call.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, it is still not accurately using the correct ctx unless we make this change.

I had assumed it was always going to be using this context.

For my understanding, is it fair to say that we don't need to change the context here given we have this universal base context which currently never changes?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rezbera if you check the call hierarchy you will see that FinalizeBlock does use s.ctx

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually @rezbera we can solve the issue by just not passing ctx as parameter, but reusing s.ctx everywhere, which this PR basically does. So no need to look up the call hierarchy

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you make a helper function on state to reset/update the base ctx

}

// Iterate over all raw transactions in the proposal and attempt to execute
Expand Down
1 change: 1 addition & 0 deletions consensus/cometbft/service/prepare_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (s *Service) prepareProposal(
// Always reset state given that PrepareProposal can timeout
// and be called again in a subsequent round.
s.prepareProposalState = s.resetState(ctx)
//nolint:contextcheck // ctx already passed via resetState
s.prepareProposalState.SetContext(
s.getContextForProposal(
s.prepareProposalState.Context(),
Expand Down
1 change: 1 addition & 0 deletions consensus/cometbft/service/process_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func (s *Service) processProposal(
s.finalizeBlockState = s.resetState(ctx)
}

//nolint:contextcheck // ctx already passed via resetState
s.processProposalState.SetContext(
s.getContextForProposal(
s.processProposalState.Context(),
Expand Down
21 changes: 19 additions & 2 deletions consensus/cometbft/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ type Service struct {
minRetainBlocks uint64

chainID string

// ctx is the context passed in for the service. CometBFT currently does
// not support context usage. It passes "context.TODO()" to apps that
// implement the ABCI++ interface, and does not provide a context that is
// a child context of the one the node originally provides to comet.
// Thus the app cannot tell when the context as been cancelled or not.
// TODO: We must use this as a workaround for now until CometBFT properly
// generates contexts that inherit from the parent context we provide.
ctx context.Context
}

func NewService(
Expand Down Expand Up @@ -165,6 +174,7 @@ func (s *Service) Start(
return err
}

s.ctx = ctx
s.node, err = node.NewNode(
ctx,
cfg,
Expand Down Expand Up @@ -220,6 +230,12 @@ func (s *Service) Stop() error {
return errors.Join(errs...)
}

// ResetAppCtx sets the app ctx for the service. This is used
// primarily for the mock service.
func (s *Service) ResetAppCtx(ctx context.Context) {
s.ctx = ctx
}

// Name returns the name of the cometbft.
func (s *Service) Name() string {
return AppName
Expand Down Expand Up @@ -320,8 +336,9 @@ func (s *Service) getContextForProposal(
// on initialHeight. Panic appeases nilaway.
panic(fmt.Errorf("getContextForProposal: %w", errNilFinalizeBlockState))
}
ctx, _ = s.finalizeBlockState.Context().CacheContext()
return ctx
newCtx, _ := s.finalizeBlockState.Context().CacheContext()
// Preserve the CosmosSDK context while using the correct base ctx.
return newCtx.WithContext(ctx.Context())
}

// CreateQueryContext creates a new sdk.Context for a query, taking as args
Expand Down
16 changes: 16 additions & 0 deletions execution/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,19 @@ func (s *EngineClient) verifyChainIDAndConnection(
}
return nil
}

/* -------------------------------------------------------------------------- */
/* Getters */
/* -------------------------------------------------------------------------- */

func (s *EngineClient) GetRPCRetries() uint64 {
return s.cfg.RPCRetries
}

func (s *EngineClient) GetRPCRetryInterval() time.Duration {
return s.cfg.RPCRetryInterval
}

func (s *EngineClient) GetRPCMaxRetryInterval() time.Duration {
return s.cfg.RPCMaxRetryInterval
}
14 changes: 11 additions & 3 deletions execution/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (

const (
defaultDialURL = "http://localhost:8551"
defaultRPCRetries = 3
defaultRPCRetries = 0
defaultRPCRetryInterval = 100 * time.Millisecond
defaultRPCMaxRetryInterval = 10 * time.Second
defaultRPCTimeout = 900 * time.Millisecond
defaultRPCStartupCheckInterval = 3 * time.Second
defaultRPCJWTRefreshInterval = 30 * time.Second
Expand All @@ -43,6 +45,8 @@ func DefaultConfig() Config {
return Config{
RPCDialURL: dialURL,
RPCRetries: defaultRPCRetries,
RPCRetryInterval: defaultRPCRetryInterval,
RPCMaxRetryInterval: defaultRPCMaxRetryInterval,
RPCTimeout: defaultRPCTimeout,
RPCStartupCheckInterval: defaultRPCStartupCheckInterval,
RPCJWTRefreshInterval: defaultRPCJWTRefreshInterval,
Expand All @@ -55,9 +59,13 @@ type Config struct {
// RPCDialURL is the HTTP url of the execution client JSON-RPC endpoint.
RPCDialURL *url.ConnectionURL `mapstructure:"rpc-dial-url"`
// RPCRetries is the number of retries before shutting down consensus
// client.
// client. A value of 0 will retry infinitely.
RPCRetries uint64 `mapstructure:"rpc-retries"`
// RPCTimeout is the RPC timeout for execution client calls.
// RPCRetryInterval is the initial RPC backoff for repeated execution client calls.
RPCRetryInterval time.Duration `mapstructure:"rpc-retry-interval"`
// MaxRPCRetryInterval is the maximum RPC backoff for repeated execution client calls.
RPCMaxRetryInterval time.Duration `mapstructure:"rpc-max-retry-interval"`
// RPCTimeout is the RPC timeout for individual execution client calls.
RPCTimeout time.Duration `mapstructure:"rpc-timeout"`
// RPCStartupCheckInterval is the Interval for the startup check.
RPCStartupCheckInterval time.Duration `mapstructure:"rpc-startup-check-interval"`
Expand Down
29 changes: 24 additions & 5 deletions execution/client/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ var (
// ErrMismatchedEth1ChainID is returned when the chainID does not
// match the expected chain ID.
ErrMismatchedEth1ChainID = errors.New("mismatched chain ID")

// ErrBadConnection indicates that the http.Client was unable to
// establish a connection.
ErrBadConnection = errors.New("connection error")
)

// Handles errors received from the RPC server according to the specification.
Expand Down Expand Up @@ -66,11 +70,7 @@ func (s *EngineClient) handleRPCError(
var e jsonrpc.Error
ok := errors.As(err, &e)
if !ok || e == nil {
return errors.Wrapf(
err,
"got an unexpected server error in JSON-RPC response "+
"failed to convert from jsonrpc.Error",
)
return errors.Join(ErrBadConnection, err)
}

// Otherwise check for our engine errors.
Expand Down Expand Up @@ -119,3 +119,22 @@ func (s *EngineClient) handleRPCError(
return err
}
}

// IsFatalError defines errors that indicate a bad request or an otherwise
// unusable client.
func IsFatalError(err error) bool {
return jsonrpc.IsPreDefinedError(err) || errors.IsAny(
err,
ErrBadConnection,
)
}

// IsNonFatalError defines errors that should be ephemeral and can be
// recovered from simply by retrying.
func IsNonFatalError(err error) bool {
return errors.IsAny(
err,
engineerrors.ErrEngineAPITimeout,
http.ErrTimeout,
)
}
Loading
Loading