@@ -2,6 +2,7 @@ package stages
2
2
3
3
import (
4
4
"context"
5
+ "sync"
5
6
6
7
"github.com/ledgerwatch/erigon-lib/common"
7
8
"github.com/ledgerwatch/erigon-lib/kv"
@@ -90,35 +91,69 @@ func extractTransactionsFromSlot(slot *types2.TxsRlp, currentHeight uint64, cfg
90
91
transactions := make ([]types.Transaction , 0 , len (slot .Txs ))
91
92
toRemove := make ([]common.Hash , 0 )
92
93
signer := types .MakeSigner (cfg .chainConfig , currentHeight , 0 )
93
- cryptoContext := secp256k1 .ContextForThread (1 )
94
+ type result struct {
95
+ index int
96
+ transaction types.Transaction
97
+ id common.Hash
98
+ toRemove bool
99
+ err error
100
+ }
101
+
102
+ results := make ([]* result , len (slot .Txs ))
103
+ var wg sync.WaitGroup
104
+
94
105
for idx , txBytes := range slot .Txs {
95
- transaction , err := types .DecodeTransaction (txBytes )
96
- if err == io .EOF {
97
- continue
98
- }
99
- if err != nil {
100
- // we have a transaction that cannot be decoded or a similar issue. We don't want to handle
101
- // this tx so just WARN about it and remove it from the pool and continue
102
- log .Warn ("[extractTransaction] Failed to decode transaction from pool, skipping and removing from pool" ,
103
- "error" , err ,
104
- "id" , slot .TxIds [idx ])
105
- toRemove = append (toRemove , slot .TxIds [idx ])
106
- continue
107
- }
106
+ wg .Add (1 )
107
+ go func (idx int , txBytes []byte ) {
108
+ defer wg .Done ()
108
109
109
- // now attempt to recover the sender
110
- sender , err := signer .SenderWithContext (cryptoContext , transaction )
111
- if err != nil {
112
- log .Warn ("[extractTransaction] Failed to recover sender from transaction, skipping and removing from pool" ,
113
- "error" , err ,
114
- "hash" , transaction .Hash ())
115
- toRemove = append (toRemove , slot .TxIds [idx ])
110
+ cryptoContext := secp256k1 .ContextForThread (1 )
111
+
112
+ res := & result {index : idx }
113
+
114
+ transaction , err := types .DecodeTransaction (txBytes )
115
+ if err == io .EOF {
116
+ res .toRemove = true
117
+ results [idx ] = res
118
+ return
119
+ }
120
+ if err != nil {
121
+ res .toRemove = true
122
+ res .id = slot .TxIds [idx ]
123
+ res .err = err
124
+ results [idx ] = res
125
+ return
126
+ }
127
+
128
+ sender , err := signer .SenderWithContext (cryptoContext , transaction )
129
+ if err != nil {
130
+ res .toRemove = true
131
+ res .id = slot .TxIds [idx ]
132
+ res .err = err
133
+ results [idx ] = res
134
+ return
135
+ }
136
+
137
+ transaction .SetSender (sender )
138
+ res .transaction = transaction
139
+ res .id = slot .TxIds [idx ]
140
+ results [idx ] = res
141
+ }(idx , txBytes )
142
+ }
143
+
144
+ wg .Wait ()
145
+
146
+ for _ , res := range results {
147
+ if res .toRemove {
148
+ toRemove = append (toRemove , res .id )
149
+ if res .err != nil {
150
+ log .Warn ("[extractTransaction] Failed to process transaction" ,
151
+ "error" , res .err , "id" , res .id )
152
+ }
116
153
continue
117
154
}
118
-
119
- transaction .SetSender (sender )
120
- transactions = append (transactions , transaction )
121
- ids = append (ids , slot .TxIds [idx ])
155
+ transactions = append (transactions , res .transaction )
156
+ ids = append (ids , res .id )
122
157
}
123
158
return transactions , ids , toRemove , nil
124
159
}
0 commit comments