Skip to content

Commit

Permalink
Stuck packet (#1296)
Browse files Browse the repository at this point in the history
* Add stuck packet height hack

break out of query cycle if stuck packet height has been queried

Add debug logs

get hack working

* convert to feature with flags

* remove unused var

* allow stuck packet flags on flush

* Add docs

* Fix typo

* Update debug log
  • Loading branch information
agouin authored Oct 11, 2023
1 parent 892e52c commit 6bd9c02
Show file tree
Hide file tree
Showing 11 changed files with 143 additions and 13 deletions.
59 changes: 59 additions & 0 deletions cmd/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/cosmos/relayer/v2/relayer"
"github.com/cosmos/relayer/v2/relayer/processor"

"github.com/spf13/cobra"
"github.com/spf13/viper"
Expand Down Expand Up @@ -57,6 +58,9 @@ const (
flagSrcConnID = "src-connection-id"
flagDstConnID = "dst-connection-id"
flagOutput = "output"
flagStuckPacketChainID = "stuck-packet-chain-id"
flagStuckPacketHeightStart = "stuck-packet-height-start"
flagStuckPacketHeightEnd = "stuck-packet-height-end"
)

const (
Expand Down Expand Up @@ -424,3 +428,58 @@ func addOutputFlag(v *viper.Viper, cmd *cobra.Command) *cobra.Command {
}
return cmd
}

func stuckPacketFlags(v *viper.Viper, cmd *cobra.Command) *cobra.Command {
cmd.Flags().String(flagStuckPacketChainID, "", "chain ID with the stuck packet(s)")
if err := v.BindPFlag(flagStuckPacketChainID, cmd.Flags().Lookup(flagStuckPacketChainID)); err != nil {
panic(err)
}
cmd.Flags().Uint64(flagStuckPacketHeightStart, 0, "height to start searching for the stuck packet(s)")
if err := v.BindPFlag(flagStuckPacketHeightStart, cmd.Flags().Lookup(flagStuckPacketHeightStart)); err != nil {
panic(err)
}
cmd.Flags().Uint64(flagStuckPacketHeightEnd, 0, "height to end searching for the stuck packet(s)")
if err := v.BindPFlag(flagStuckPacketHeightEnd, cmd.Flags().Lookup(flagStuckPacketHeightEnd)); err != nil {
panic(err)
}
return cmd
}

func parseStuckPacketFromFlags(cmd *cobra.Command) (*processor.StuckPacket, error) {
stuckPacketChainID, err := cmd.Flags().GetString(flagStuckPacketChainID)
if err != nil {
return nil, err
}

if stuckPacketChainID == "" {
return nil, nil
}

stuckPacketHeightStart, err := cmd.Flags().GetUint64(flagStuckPacketHeightStart)
if err != nil {
return nil, err
}

if stuckPacketHeightStart == 0 {
return nil, fmt.Errorf("stuck packet chain ID %s is set but start height is not", stuckPacketChainID)
}

stuckPacketHeightEnd, err := cmd.Flags().GetUint64(flagStuckPacketHeightEnd)
if err != nil {
return nil, err
}

if stuckPacketHeightEnd == 0 {
return nil, fmt.Errorf("stuck packet chain ID %s is set but end height is not", stuckPacketChainID)
}

if stuckPacketHeightEnd < stuckPacketHeightStart {
return nil, fmt.Errorf("stuck packet end height %d is less than start height %d", stuckPacketHeightEnd, stuckPacketHeightStart)
}

return &processor.StuckPacket{
ChainID: stuckPacketChainID,
StartHeight: stuckPacketHeightStart,
EndHeight: stuckPacketHeightEnd,
}, nil
}
7 changes: 7 additions & 0 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ $ %s start demo-path2 --max-tx-size 10`, appName, appName, appName, appName)),
return err
}

stuckPacket, err := parseStuckPacketFromFlags(cmd)
if err != nil {
return err
}

rlyErrCh := relayer.StartRelayer(
cmd.Context(),
a.log,
Expand All @@ -156,6 +161,7 @@ $ %s start demo-path2 --max-tx-size 10`, appName, appName, appName, appName)),
processorType,
initialBlockHistory,
prometheusMetrics,
stuckPacket,
)

// Block until the error channel sends a message.
Expand All @@ -179,5 +185,6 @@ $ %s start demo-path2 --max-tx-size 10`, appName, appName, appName, appName)),
cmd = initBlockFlag(a.viper, cmd)
cmd = flushIntervalFlag(a.viper, cmd)
cmd = memoFlag(a.viper, cmd)
cmd = stuckPacketFlags(a.viper, cmd)
return cmd
}
8 changes: 8 additions & 0 deletions cmd/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,11 @@ $ %s tx flush demo-path channel-0`,
}
}

stuckPacket, err := parseStuckPacketFromFlags(cmd)
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(cmd.Context(), flushTimeout)
defer cancel()

Expand All @@ -811,6 +816,7 @@ $ %s tx flush demo-path channel-0`,
relayer.ProcessorEvents,
0,
nil,
stuckPacket,
)

// Block until the error channel sends a message.
Expand All @@ -830,6 +836,8 @@ $ %s tx flush demo-path channel-0`,

cmd = strategyFlag(a.viper, cmd)
cmd = memoFlag(a.viper, cmd)
cmd = stuckPacketFlags(a.viper, cmd)

return cmd
}

Expand Down
18 changes: 18 additions & 0 deletions docs/advanced_usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,25 @@ To remove the feegrant configuration:
- `rly chains configure feegrant basicallowance kujira --delete`


## Stuck Packet

There can be scenarios where a standard flush fails to clear a packet due to differences in the way packets are observed. The standard flush depends on the packet queries working properly. Sometimes the packet queries can miss things that the block scanning performed by the relayer during standard operation wouldn't. For packets affected by this, if they were emitted in recent blocks, the `--block-history` flag can be used to have the standard relayer block scanning start at a block height that many blocks behind the current chain tip. However, if the stuck packet occurred at an old height, farther back than would be reasonable for the `--block-history` scan from historical to current, there is an additional set of flags that can be used to zoom in on the block heights where the stuck packet occurred.

For example, say a relayer is configured between Chain A and B. The relayer was not operational during the time a user on Chain A sends a packet to Chain B. Due to an issue in the queries to Chain A, the typical flush of the relayer does not relay the packet. Say that many days go by before recognition of the issue by the relayer operator. The relayer operator could start up the relayer with a massive `--block-history` to query all blocks from the time of the stuck packet until the current block, but that could take many hours to query through each block. Instead, the relayer operator can flush out the packet by doing the following:

```bash
rly start $PATH_NAME --stuck-packet-chain-id $CHAIN_A_CHAIN_ID --stuck-packet-height-start $CHAIN_A_STUCK_PACKET_HEIGHT --stuck-packet-height-end $CHAIN_A_STUCK_PACKET_HEIGHT -d
```

Alternatively, a flush can be run with these flags so that the relayer exits once it is done:

```bash
rly tx flush $PATH_NAME --stuck-packet-chain-id $CHAIN_A_CHAIN_ID --stuck-packet-height-start $CHAIN_A_STUCK_PACKET_HEIGHT --stuck-packet-height-end $CHAIN_A_STUCK_PACKET_HEIGHT -d
```

If the `CHAIN_A_STUCK_PACKET_HEIGHT` is not exactly known, the `stuck-packet-height-start` and `stuck-packet-height-end` flags can be placed at heights surrounding the range where the stuck packet is expected to be, for convenience of not needing to dig through every block to determine the exact height.

Note that this narrows the window of visibility that the relayer has into what has happened on the chain, since the relayer is only getting a picture of what happened between `stuck-packet-height-start` and `stuck-packet-height-end` and then starts observing the most recent blocks after that. If a packet was actually relayed properly in between `stuck-packet-height-end` and the chain tip, then the relayer would encounter errors trying to relay a packet that was already relayed. This feature should only be used by advanced users for zooming in on a troublesome packet.

---

Expand Down
29 changes: 22 additions & 7 deletions relayer/chains/cosmos/cosmos_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ type CosmosChainProcessor struct {
parsedGasPrices *sdk.DecCoins
}

func NewCosmosChainProcessor(log *zap.Logger, provider *CosmosProvider, metrics *processor.PrometheusMetrics) *CosmosChainProcessor {
func NewCosmosChainProcessor(
log *zap.Logger,
provider *CosmosProvider,
metrics *processor.PrometheusMetrics,
) *CosmosChainProcessor {
return &CosmosChainProcessor{
log: log.With(zap.String("chain_name", provider.ChainName()), zap.String("chain_id", provider.ChainId())),
chainProvider: provider,
Expand Down Expand Up @@ -208,7 +212,7 @@ type queryCyclePersistence struct {
// Run starts the query loop for the chain which will gather applicable ibc messages and push events out to the relevant PathProcessors.
// The initialBlockHistory parameter determines how many historical blocks should be fetched and processed before continuing with current blocks.
// ChainProcessors should obey the context and return upon context cancellation.
func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory uint64) error {
func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory uint64, stuckPacket *processor.StuckPacket) error {
minQueryLoopDuration := ccp.chainProvider.PCfg.MinLoopDuration
if minQueryLoopDuration == 0 {
minQueryLoopDuration = defaultMinQueryLoopDuration
Expand Down Expand Up @@ -247,6 +251,10 @@ func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory ui
latestQueriedBlock = 0
}

if stuckPacket != nil && ccp.chainProvider.ChainId() == stuckPacket.ChainID {
latestQueriedBlock = int64(stuckPacket.StartHeight)
}

persistence.latestQueriedBlock = latestQueriedBlock

var eg errgroup.Group
Expand All @@ -266,7 +274,7 @@ func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory ui
defer ticker.Stop()

for {
if err := ccp.queryCycle(ctx, &persistence); err != nil {
if err := ccp.queryCycle(ctx, &persistence, stuckPacket); err != nil {
return err
}
select {
Expand Down Expand Up @@ -327,7 +335,7 @@ func (ccp *CosmosChainProcessor) initializeChannelState(ctx context.Context) err
return nil
}

func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *queryCyclePersistence) error {
func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *queryCyclePersistence, stuckPacket *processor.StuckPacket) error {
status, err := ccp.nodeStatusWithRetry(ctx)
if err != nil {
// don't want to cause CosmosChainProcessor to quit here, can retry again next cycle.
Expand Down Expand Up @@ -383,11 +391,11 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
var eg errgroup.Group
var blockRes *ctypes.ResultBlockResults
var ibcHeader provider.IBCHeader
i := i
sI := i
eg.Go(func() (err error) {
queryCtx, cancelQueryCtx := context.WithTimeout(ctx, blockResultsQueryTimeout)
defer cancelQueryCtx()
blockRes, err = ccp.chainProvider.RPCClient.BlockResults(queryCtx, &i)
blockRes, err = ccp.chainProvider.RPCClient.BlockResults(queryCtx, &sI)
if err != nil && ccp.metrics != nil {
ccp.metrics.IncBlockQueryFailure(chainID, "RPC Client")
}
Expand All @@ -396,7 +404,7 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
eg.Go(func() (err error) {
queryCtx, cancelQueryCtx := context.WithTimeout(ctx, queryTimeout)
defer cancelQueryCtx()
ibcHeader, err = ccp.chainProvider.QueryIBCHeader(queryCtx, i)
ibcHeader, err = ccp.chainProvider.QueryIBCHeader(queryCtx, sI)
if err != nil && ccp.metrics != nil {
ccp.metrics.IncBlockQueryFailure(chainID, "IBC Header")
}
Expand Down Expand Up @@ -467,6 +475,13 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
}

newLatestQueriedBlock = i

if stuckPacket != nil &&
ccp.chainProvider.ChainId() == stuckPacket.ChainID &&
newLatestQueriedBlock == int64(stuckPacket.EndHeight) {
i = persistence.latestHeight
ccp.log.Debug("Parsed stuck packet height, skipping to current")
}
}

if newLatestQueriedBlock == persistence.latestQueriedBlock {
Expand Down
2 changes: 1 addition & 1 deletion relayer/chains/mock/mock_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type queryCyclePersistence struct {
latestQueriedBlock int64
}

func (mcp *MockChainProcessor) Run(ctx context.Context, initialBlockHistory uint64) error {
func (mcp *MockChainProcessor) Run(ctx context.Context, initialBlockHistory uint64, _ *processor.StuckPacket) error {
// this will be used for persistence across query cycle loop executions
persistence := queryCyclePersistence{
// would be query of latest height, mocking 20
Expand Down
2 changes: 1 addition & 1 deletion relayer/chains/penumbra/penumbra_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ type queryCyclePersistence struct {
// Run starts the query loop for the chain which will gather applicable ibc messages and push events out to the relevant PathProcessors.
// The initialBlockHistory parameter determines how many historical blocks should be fetched and processed before continuing with current blocks.
// ChainProcessors should obey the context and return upon context cancellation.
func (pcp *PenumbraChainProcessor) Run(ctx context.Context, initialBlockHistory uint64) error {
func (pcp *PenumbraChainProcessor) Run(ctx context.Context, initialBlockHistory uint64, _ *processor.StuckPacket) error {
minQueryLoopDuration := pcp.chainProvider.PCfg.MinLoopDuration
if minQueryLoopDuration == 0 {
minQueryLoopDuration = defaultMinQueryLoopDuration
Expand Down
2 changes: 1 addition & 1 deletion relayer/processor/chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type ChainProcessor interface {
// Run starts the query loop for the chain which will gather applicable ibc messages and push events out to the relevant PathProcessors.
// The initialBlockHistory parameter determines how many historical blocks should be fetched and processed before continuing with current blocks.
// ChainProcessors should obey the context and return upon context cancellation.
Run(ctx context.Context, initialBlockHistory uint64) error
Run(ctx context.Context, initialBlockHistory uint64, stuckPacket *StuckPacket) error

// Provider returns the ChainProvider, which provides the methods for querying, assembling IBC messages, and sending transactions.
Provider() provider.ChainProvider
Expand Down
10 changes: 9 additions & 1 deletion relayer/processor/event_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type EventProcessorBuilder struct {
initialBlockHistory uint64
pathProcessors PathProcessors
messageLifecycle MessageLifecycle
stuckPacket *StuckPacket
}

// EventProcessor is a built instance that is ready to be executed with Run(ctx).
Expand All @@ -20,6 +21,7 @@ type EventProcessor struct {
initialBlockHistory uint64
pathProcessors PathProcessors
messageLifecycle MessageLifecycle
stuckPacket *StuckPacket
}

// NewEventProcessor creates a builder than can be used to construct a multi-ChainProcessor, multi-PathProcessor topology for the relayer.
Expand Down Expand Up @@ -61,6 +63,12 @@ func (ep EventProcessorBuilder) WithMessageLifecycle(messageLifecycle MessageLif
return ep
}

// WithStuckPacket sets the stuck packet configuration.
func (ep EventProcessorBuilder) WithStuckPacket(stuckPacket *StuckPacket) EventProcessorBuilder {
ep.stuckPacket = stuckPacket
return ep
}

// Build links the relevant ChainProcessors and PathProcessors, then returns an EventProcessor that can be used to run the ChainProcessors and PathProcessors.
func (ep EventProcessorBuilder) Build() EventProcessor {
for _, chainProcessor := range ep.chainProcessors {
Expand Down Expand Up @@ -95,7 +103,7 @@ func (ep EventProcessor) Run(ctx context.Context) error {
for _, chainProcessor := range ep.chainProcessors {
chainProcessor := chainProcessor
eg.Go(func() error {
err := chainProcessor.Run(runCtx, ep.initialBlockHistory)
err := chainProcessor.Run(runCtx, ep.initialBlockHistory, ep.stuckPacket)
// Signal the other chain processors to exit.
runCtxCancel()
return err
Expand Down
7 changes: 7 additions & 0 deletions relayer/processor/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,3 +605,10 @@ func ConnectionInfoConnectionKey(info provider.ConnectionInfo) ConnectionKey {
CounterpartyConnID: info.CounterpartyConnID,
}
}

// StuckPacket is used for narrowing block queries on packets that are stuck on a channel for a specific chain.
type StuckPacket struct {
ChainID string
StartHeight uint64
EndHeight uint64
}
12 changes: 10 additions & 2 deletions relayer/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func StartRelayer(
processorType string,
initialBlockHistory uint64,
metrics *processor.PrometheusMetrics,
stuckPacket *processor.StuckPacket,
) chan error {
//prevent incorrect bech32 address prefixed addresses when calling AccAddress.String()
sdk.SetAddrCacheEnabled(false)
Expand Down Expand Up @@ -93,6 +94,7 @@ func StartRelayer(
flushInterval,
errorChan,
metrics,
stuckPacket,
)
return errorChan
case ProcessorLegacy:
Expand All @@ -118,7 +120,10 @@ type path struct {
}

// chainProcessor returns the corresponding ChainProcessor implementation instance for a pathChain.
func (chain *Chain) chainProcessor(log *zap.Logger, metrics *processor.PrometheusMetrics) processor.ChainProcessor {
func (chain *Chain) chainProcessor(
log *zap.Logger,
metrics *processor.PrometheusMetrics,
) processor.ChainProcessor {
// Handle new ChainProcessor implementations as cases here
switch p := chain.ChainProvider.(type) {
case *penumbraprocessor.PenumbraProvider:
Expand All @@ -144,10 +149,13 @@ func relayerStartEventProcessor(
flushInterval time.Duration,
errCh chan<- error,
metrics *processor.PrometheusMetrics,
stuckPacket *processor.StuckPacket,
) {
defer close(errCh)

epb := processor.NewEventProcessor().WithChainProcessors(chainProcessors...)
epb := processor.NewEventProcessor().
WithChainProcessors(chainProcessors...).
WithStuckPacket(stuckPacket)

for _, p := range paths {
epb = epb.
Expand Down

0 comments on commit 6bd9c02

Please sign in to comment.