Skip to content

Commit 6799f25

Browse files
feature: claim everything w/ concurrency (#514)
* wip * wip * logging * comment * wip * comment * concurrency flag * logging to figure out which bridge url is bad * wait group for last claims to process * fix wait group race * doc gen --------- Co-authored-by: John Hilliard <[email protected]>
1 parent bcb3a64 commit 6799f25

File tree

2 files changed

+130
-29
lines changed

2 files changed

+130
-29
lines changed

cmd/ulxly/ulxly.go

+129-29
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"os"
1717
"strconv"
1818
"strings"
19+
"sync"
1920
"time"
2021

2122
"github.com/ethereum/go-ethereum"
@@ -646,16 +647,19 @@ func claimEverything(cmd *cobra.Command) error {
646647
RPCURL := *inputUlxlyArgs.rpcURL
647648
limit := *inputUlxlyArgs.bridgeLimit
648649
offset := *inputUlxlyArgs.bridgeOffset
650+
concurrency := *inputUlxlyArgs.concurrency
649651
urls, err := getBridgeServiceURLs()
650652
if err != nil {
651653
return err
652654
}
653655

654656
depositMap := make(map[DepositID]*BridgeDeposit)
655657

656-
for _, bridgeServiceUrl := range urls {
657-
deposits, bErr := getDepositsForAddress(fmt.Sprintf("%s/bridges/%s?offset=%d&limit=%d", bridgeServiceUrl, destinationAddress, offset, limit))
658+
for bridgeServiceId, bridgeServiceUrl := range urls {
659+
url := fmt.Sprintf("%s/bridges/%s?offset=%d&limit=%d", bridgeServiceUrl, destinationAddress, offset, limit)
660+
deposits, bErr := getDepositsForAddress(url)
658661
if bErr != nil {
662+
log.Err(bErr).Uint32("id", bridgeServiceId).Str("url", url).Msgf("Error getting deposits for bridge: %s", bErr.Error())
659663
return bErr
660664
}
661665
for idx, deposit := range deposits {
@@ -670,7 +674,7 @@ func claimEverything(cmd *cobra.Command) error {
670674
continue
671675
}
672676

673-
// if this new deposit is ready for claim OR it has already been claimed we should over ride the exisitng value
677+
// if this new deposit is ready for claim OR it has already been claimed we should override the existing value
674678
if deposit.ReadyForClaim || deposit.ClaimTxHash != "" {
675679
depositMap[depId] = &deposits[idx]
676680
}
@@ -692,36 +696,128 @@ func claimEverything(cmd *cobra.Command) error {
692696
if err != nil {
693697
return err
694698
}
695-
log.Info().Uint32("networkID", currentNetworkID).Msg("detected current networkid")
696-
for _, deposit := range depositMap {
697-
if deposit.DestNet != currentNetworkID {
698-
log.Debug().Uint32("destination_network", deposit.DestNet).Msg("discarding deposit for different network")
699-
continue
700-
}
701-
if deposit.ClaimTxHash != "" {
702-
log.Info().Str("txhash", deposit.ClaimTxHash).Msg("It looks like this tx was already claimed")
703-
continue
704-
}
705-
claimTx, dErr := claimSingleDeposit(cmd, client, bridgeContract, opts, *deposit, urls, currentNetworkID)
706-
if dErr != nil {
707-
log.Warn().Err(dErr).Uint32("DepositCnt", deposit.DepositCnt).
708-
Uint32("OrigNet", deposit.OrigNet).
709-
Uint32("DestNet", deposit.DestNet).
710-
Uint32("NetworkID", deposit.NetworkID).
711-
Str("OrigAddr", deposit.OrigAddr).
712-
Str("DestAddr", deposit.DestAddr).
713-
Msg("There was an error claiming")
714-
continue
715-
}
716-
dErr = WaitMineTransaction(cmd.Context(), client, claimTx, timeoutTxnReceipt)
717-
if dErr != nil {
718-
log.Error().Err(dErr).Msg("error while waiting for tx to main")
719-
}
699+
log.Info().Uint32("networkID", currentNetworkID).Msg("current network")
700+
701+
workPool := make(chan *BridgeDeposit, concurrency) // bounded chan for controlled concurrency
702+
703+
nonceCounter, err := currentNonce(cmd.Context(), client, destinationAddress)
704+
if err != nil {
705+
return err
706+
}
707+
log.Info().Int64("nonce", nonceCounter.Int64()).Msg("starting nonce")
708+
nonceMutex := sync.Mutex{}
709+
nonceIncrement := big.NewInt(1)
710+
retryNonces := make(chan *big.Int, concurrency) // bounded same as workPool
711+
712+
wg := sync.WaitGroup{} // wg so the last ones can get processed
713+
714+
for _, d := range depositMap {
715+
wg.Add(1)
716+
workPool <- d // block until a slot is available
717+
go func(deposit *BridgeDeposit) {
718+
defer func() {
719+
<-workPool // release work slot
720+
}()
721+
defer wg.Done()
722+
723+
if deposit.DestNet != currentNetworkID {
724+
log.Debug().Uint32("destination_network", deposit.DestNet).Msg("discarding deposit for different network")
725+
return
726+
}
727+
if deposit.ClaimTxHash != "" {
728+
log.Info().Str("txhash", deposit.ClaimTxHash).Msg("It looks like this tx was already claimed")
729+
return
730+
}
731+
// Either use the next retry nonce, or set and increment the next one
732+
var nextNonce *big.Int
733+
select {
734+
case n := <-retryNonces:
735+
nextNonce = n
736+
default:
737+
nonceMutex.Lock()
738+
nextNonce = big.NewInt(nonceCounter.Int64())
739+
nonceCounter = nonceCounter.Add(nonceCounter, nonceIncrement)
740+
nonceMutex.Unlock()
741+
}
742+
log.Info().Int64("nonce", nextNonce.Int64()).Msg("Next nonce")
743+
744+
claimTx, dErr := claimSingleDeposit(cmd, client, bridgeContract, withNonce(opts, nextNonce), *deposit, urls, currentNetworkID)
745+
if dErr != nil {
746+
log.Warn().Err(dErr).Uint32("DepositCnt", deposit.DepositCnt).
747+
Uint32("OrigNet", deposit.OrigNet).
748+
Uint32("DestNet", deposit.DestNet).
749+
Uint32("NetworkID", deposit.NetworkID).
750+
Str("OrigAddr", deposit.OrigAddr).
751+
Str("DestAddr", deposit.DestAddr).
752+
Int64("nonce", nextNonce.Int64()).
753+
Msg("There was an error claiming")
754+
755+
// Some nonces should not be reused
756+
if strings.Contains(dErr.Error(), "could not replace existing") {
757+
return
758+
}
759+
if strings.Contains(dErr.Error(), "already known") {
760+
return
761+
}
762+
if strings.Contains(dErr.Error(), "nonce is too low") {
763+
return
764+
}
765+
// are there other cases?
766+
retryNonces <- nextNonce
767+
return
768+
}
769+
dErr = WaitMineTransaction(cmd.Context(), client, claimTx, timeoutTxnReceipt)
770+
if dErr != nil {
771+
log.Error().Err(dErr).Msg("error while waiting for tx to mine")
772+
}
773+
}(d)
720774
}
721775

776+
wg.Wait()
722777
return nil
723778
}
724779

780+
func currentNonce(ctx context.Context, client *ethclient.Client, address string) (*big.Int, error) {
781+
addr := common.HexToAddress(address)
782+
nonce, err := client.NonceAt(ctx, addr, nil)
783+
if err != nil {
784+
log.Error().Err(err).Str("address", addr.Hex()).Msg("Failed to get nonce")
785+
return nil, err
786+
}
787+
n := int64(nonce)
788+
return big.NewInt(n), nil
789+
}
790+
791+
// todo: implement for other fields in library, or find a library that does this
792+
func withNonce(opts *bind.TransactOpts, newNonce *big.Int) *bind.TransactOpts {
793+
if opts == nil {
794+
return nil
795+
}
796+
clone := &bind.TransactOpts{
797+
From: opts.From,
798+
Signer: opts.Signer,
799+
GasLimit: opts.GasLimit,
800+
Context: opts.Context, // Usually OK to share, unless you need a separate context
801+
NoSend: opts.NoSend,
802+
}
803+
// Deep-copy big.Int fields
804+
if opts.Value != nil {
805+
clone.Value = new(big.Int).Set(opts.Value)
806+
}
807+
if opts.GasFeeCap != nil {
808+
clone.GasFeeCap = new(big.Int).Set(opts.GasFeeCap)
809+
}
810+
if opts.GasTipCap != nil {
811+
clone.GasTipCap = new(big.Int).Set(opts.GasTipCap)
812+
}
813+
// Set the new nonce
814+
if newNonce != nil {
815+
clone.Nonce = new(big.Int).Set(newNonce)
816+
}
817+
818+
return clone
819+
}
820+
725821
func claimSingleDeposit(cmd *cobra.Command, client *ethclient.Client, bridgeContract *ulxly.Ulxly, opts *bind.TransactOpts, deposit BridgeDeposit, bridgeURLs map[uint32]string, currentNetworkID uint32) (*types.Transaction, error) {
726822
networkIDForBridgeService := deposit.NetworkID
727823
if deposit.NetworkID == 0 {
@@ -807,7 +903,7 @@ func WaitMineTransaction(ctx context.Context, client *ethclient.Client, tx *type
807903
continue
808904
}
809905
if r.Status != 0 {
810-
log.Info().Interface("txHash", r.TxHash).Msg("Deposit transaction successful")
906+
log.Info().Interface("txHash", r.TxHash).Msg("transaction successful")
811907
return nil
812908
} else if r.Status == 0 {
813909
log.Error().Interface("txHash", r.TxHash).Msg("Deposit transaction failed")
@@ -1486,6 +1582,7 @@ type ulxlyArgs struct {
14861582
bridgeLimit *int
14871583
bridgeOffset *int
14881584
wait *time.Duration
1585+
concurrency *uint
14891586
}
14901587

14911588
var inputUlxlyArgs = ulxlyArgs{}
@@ -1543,6 +1640,7 @@ const (
15431640
ArgBridgeLimit = "bridge-limit"
15441641
ArgBridgeOffset = "bridge-offset"
15451642
ArgWait = "wait"
1643+
ArgConcurrency = "concurrency"
15461644
)
15471645

15481646
func prepInputs(cmd *cobra.Command, args []string) error {
@@ -1821,6 +1919,8 @@ or if it's actually an intermediate hash.`,
18211919
inputUlxlyArgs.bridgeServiceURLs = claimEverythingCommand.Flags().StringSlice(ArgBridgeMappings, nil, "Mappings between network ids and bridge service urls. E.g. '1=http://network-1-bridgeurl,7=http://network-2-bridgeurl'")
18221920
inputUlxlyArgs.bridgeLimit = claimEverythingCommand.Flags().Int(ArgBridgeLimit, 25, "Limit the number or responses returned by the bridge service when claiming")
18231921
inputUlxlyArgs.bridgeOffset = claimEverythingCommand.Flags().Int(ArgBridgeOffset, 0, "The offset to specify for pagination of the underlying bridge service deposits")
1922+
inputUlxlyArgs.concurrency = claimEverythingCommand.Flags().Uint(ArgConcurrency, 1, "The worker pool size for claims")
1923+
18241924
fatalIfError(claimEverythingCommand.MarkFlagRequired(ArgBridgeMappings))
18251925

18261926
// Top Level

doc/polycli_ulxly__claim-everything.md

+1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ polycli ulxly claim-everything [flags]
2323
--bridge-limit int Limit the number or responses returned by the bridge service when claiming (default 25)
2424
--bridge-offset int The offset to specify for pagination of the underlying bridge service deposits
2525
--bridge-service-map strings Mappings between network ids and bridge service urls. E.g. '1=http://network-1-bridgeurl,7=http://network-2-bridgeurl'
26+
--concurrency uint The worker pool size for claims (default 1)
2627
-h, --help help for claim-everything
2728
```
2829

0 commit comments

Comments
 (0)