Skip to content

Commit a057c75

Browse files
MechanicalTylerTyler Ruppert
and
Tyler Ruppert
authored
Support for parallel processing (#182)
Co-authored-by: Tyler Ruppert <{ID}+{username}@users.noreply.github.com>
1 parent fe88669 commit a057c75

16 files changed

+1148
-404
lines changed

relayer/build_processors.go

+43-10
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package relayer
33
import (
44
"context"
55
"math/big"
6+
"sync"
67

78
"github.com/VolumeFi/whoops"
89
"github.com/ethereum/go-ethereum/common"
@@ -12,28 +13,60 @@ import (
1213
log "github.com/sirupsen/logrus"
1314
)
1415

15-
func (r *Relayer) buildProcessors(ctx context.Context) ([]chain.Processor, error) {
16-
chainsInfos, err := r.palomaClient.QueryGetEVMChainInfos(ctx)
16+
func (r *Relayer) buildProcessors(ctx context.Context, locker sync.Locker) error {
17+
locker.Lock()
18+
defer locker.Unlock()
19+
queriedChainsInfos, err := r.palomaClient.QueryGetEVMChainInfos(ctx)
1720
if err != nil {
18-
return nil, err
21+
return err
1922
}
20-
log.WithField("chains-infos", chainsInfos).Trace("got chain infos")
23+
log.WithField("chains-infos", queriedChainsInfos).Trace("got chain infos")
2124

22-
processors := []chain.Processor{}
23-
for _, chainInfo := range chainsInfos {
25+
// See if we need to update
26+
if (r.processors != nil) && (r.chainsInfos != nil) && (len(r.chainsInfos) == len(queriedChainsInfos)) {
27+
chainsChanged := false
28+
for k, c := range r.chainsInfos {
29+
if c.Id != queriedChainsInfos[k].Id ||
30+
c.ChainReferenceID != queriedChainsInfos[k].ChainReferenceID ||
31+
c.ChainID != queriedChainsInfos[k].ChainID ||
32+
string(c.SmartContractUniqueID) != string(queriedChainsInfos[k].SmartContractUniqueID) ||
33+
c.SmartContractAddr != queriedChainsInfos[k].SmartContractAddr ||
34+
c.ReferenceBlockHeight != queriedChainsInfos[k].ReferenceBlockHeight ||
35+
c.ReferenceBlockHash != queriedChainsInfos[k].ReferenceBlockHash ||
36+
c.Abi != queriedChainsInfos[k].Abi ||
37+
string(c.Bytecode) != string(queriedChainsInfos[k].Bytecode) ||
38+
string(c.ConstructorInput) != string(queriedChainsInfos[k].ConstructorInput) ||
39+
c.Status != queriedChainsInfos[k].Status ||
40+
c.ActiveSmartContractID != queriedChainsInfos[k].ActiveSmartContractID ||
41+
c.MinOnChainBalance != queriedChainsInfos[k].MinOnChainBalance {
42+
chainsChanged = true
43+
}
44+
}
45+
if !chainsChanged {
46+
log.Debug("chain infos unchanged since last tick")
47+
return nil
48+
}
49+
}
50+
51+
log.Debug("chain infos changed. building processors")
52+
53+
r.processors = []chain.Processor{}
54+
r.chainsInfos = []evmtypes.ChainInfo{}
55+
for _, chainInfo := range queriedChainsInfos {
2456
processor, err := r.processorFactory(chainInfo)
2557
if errors.IsUnrecoverable(err) {
26-
return nil, err
58+
return err
2759
}
2860

2961
if err := processor.IsRightChain(ctx); err != nil {
30-
return nil, err
62+
return err
3163
}
3264

33-
processors = append(processors, processor)
65+
r.processors = append(r.processors, processor)
66+
r.chainsInfos = append(r.chainsInfos, *chainInfo)
3467
}
3568

36-
return processors, nil
69+
return nil
3770
}
3871

3972
func (r *Relayer) processorFactory(chainInfo *evmtypes.ChainInfo) (chain.Processor, error) {

relayer/build_processors_test.go

+311
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,311 @@
1+
package relayer
2+
3+
import (
4+
"context"
5+
"testing"
6+
7+
"github.com/palomachain/paloma/x/evm/types"
8+
"github.com/palomachain/pigeon/chain"
9+
chainmocks "github.com/palomachain/pigeon/chain/mocks"
10+
"github.com/palomachain/pigeon/config"
11+
"github.com/palomachain/pigeon/relayer/mocks"
12+
"github.com/palomachain/pigeon/testutil"
13+
timemocks "github.com/palomachain/pigeon/util/time/mocks"
14+
"github.com/stretchr/testify/assert"
15+
"github.com/stretchr/testify/mock"
16+
)
17+
18+
func TestBuildProcessors(t *testing.T) {
19+
testcases := []struct {
20+
name string
21+
setup func(t *testing.T) (*Relayer, []chain.Processor, []types.ChainInfo)
22+
expectedErr error
23+
}{
24+
{
25+
name: "when there are no processors on relayer yet it builds processors",
26+
setup: func(t *testing.T) (*Relayer, []chain.Processor, []types.ChainInfo) {
27+
chain1Info := types.ChainInfo{
28+
Id: 1,
29+
ChainReferenceID: "chain-1",
30+
MinOnChainBalance: "5",
31+
}
32+
pc := mocks.NewPalomaClienter(t)
33+
pc.On(
34+
"QueryGetEVMChainInfos",
35+
mock.Anything,
36+
mock.Anything,
37+
).Return(
38+
[]*types.ChainInfo{
39+
&chain1Info,
40+
},
41+
nil,
42+
)
43+
44+
processorMock := chainmocks.NewProcessor(t)
45+
processorMock.On("IsRightChain", mock.Anything).Return(nil)
46+
47+
evmFactoryMock := mocks.NewEvmFactorier(t)
48+
evmFactoryMock.On("Build", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(processorMock, nil)
49+
50+
r := New(
51+
config.Root{
52+
EVM: map[string]config.EVM{
53+
"chain-1": {},
54+
},
55+
},
56+
pc,
57+
evmFactoryMock,
58+
timemocks.NewTime(t),
59+
Config{},
60+
)
61+
r.chainsInfos = []types.ChainInfo{
62+
chain1Info,
63+
}
64+
65+
return r,
66+
[]chain.Processor{
67+
processorMock,
68+
},
69+
[]types.ChainInfo{
70+
chain1Info,
71+
}
72+
},
73+
},
74+
{
75+
name: "when there are no chainsInfos on relayer yet it builds processors",
76+
setup: func(t *testing.T) (*Relayer, []chain.Processor, []types.ChainInfo) {
77+
chain1Info := types.ChainInfo{
78+
Id: 1,
79+
ChainReferenceID: "chain-1",
80+
MinOnChainBalance: "5",
81+
}
82+
pc := mocks.NewPalomaClienter(t)
83+
pc.On(
84+
"QueryGetEVMChainInfos",
85+
mock.Anything,
86+
mock.Anything,
87+
).Return(
88+
[]*types.ChainInfo{
89+
&chain1Info,
90+
},
91+
nil,
92+
)
93+
94+
processorMock := chainmocks.NewProcessor(t)
95+
processorMock.On("IsRightChain", mock.Anything).Return(nil)
96+
97+
evmFactoryMock := mocks.NewEvmFactorier(t)
98+
evmFactoryMock.On("Build", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(processorMock, nil)
99+
100+
r := New(
101+
config.Root{
102+
EVM: map[string]config.EVM{
103+
"chain-1": {},
104+
},
105+
},
106+
pc,
107+
evmFactoryMock,
108+
timemocks.NewTime(t),
109+
Config{},
110+
)
111+
r.processors = []chain.Processor{
112+
chainmocks.NewProcessor(t),
113+
}
114+
115+
return r,
116+
[]chain.Processor{
117+
processorMock,
118+
},
119+
[]types.ChainInfo{
120+
chain1Info,
121+
}
122+
},
123+
},
124+
{
125+
name: "when the chains lengths are different it builds processors",
126+
setup: func(t *testing.T) (*Relayer, []chain.Processor, []types.ChainInfo) {
127+
chain1Info := types.ChainInfo{
128+
Id: 1,
129+
ChainReferenceID: "chain-1",
130+
MinOnChainBalance: "5",
131+
}
132+
chain2Info := types.ChainInfo{
133+
Id: 2,
134+
ChainReferenceID: "chain-2",
135+
MinOnChainBalance: "5",
136+
}
137+
138+
pc := mocks.NewPalomaClienter(t)
139+
pc.On(
140+
"QueryGetEVMChainInfos",
141+
mock.Anything,
142+
mock.Anything,
143+
).Return(
144+
[]*types.ChainInfo{
145+
&chain1Info,
146+
},
147+
nil,
148+
)
149+
150+
processorMock := chainmocks.NewProcessor(t)
151+
processorMock.On("IsRightChain", mock.Anything).Return(nil)
152+
153+
evmFactoryMock := mocks.NewEvmFactorier(t)
154+
evmFactoryMock.On("Build", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(processorMock, nil)
155+
156+
r := New(
157+
config.Root{
158+
EVM: map[string]config.EVM{
159+
"chain-1": {},
160+
},
161+
},
162+
pc,
163+
evmFactoryMock,
164+
timemocks.NewTime(t),
165+
Config{},
166+
)
167+
r.processors = []chain.Processor{
168+
chainmocks.NewProcessor(t),
169+
}
170+
171+
r.chainsInfos = []types.ChainInfo{
172+
chain1Info,
173+
chain2Info,
174+
}
175+
176+
return r,
177+
[]chain.Processor{
178+
processorMock,
179+
},
180+
[]types.ChainInfo{
181+
chain1Info,
182+
}
183+
},
184+
},
185+
{
186+
name: "when there is a difference in the chain data it builds processors",
187+
setup: func(t *testing.T) (*Relayer, []chain.Processor, []types.ChainInfo) {
188+
chain1Info := types.ChainInfo{
189+
Id: 1,
190+
ChainReferenceID: "chain-1",
191+
MinOnChainBalance: "5",
192+
}
193+
194+
chain1NewInfo := types.ChainInfo{
195+
Id: 1,
196+
ChainReferenceID: "chain-1",
197+
MinOnChainBalance: "50",
198+
}
199+
pc := mocks.NewPalomaClienter(t)
200+
pc.On(
201+
"QueryGetEVMChainInfos",
202+
mock.Anything,
203+
mock.Anything,
204+
).Return(
205+
[]*types.ChainInfo{
206+
&chain1NewInfo,
207+
},
208+
nil,
209+
)
210+
211+
processorMock := chainmocks.NewProcessor(t)
212+
processorMock.On("IsRightChain", mock.Anything).Return(nil)
213+
214+
evmFactoryMock := mocks.NewEvmFactorier(t)
215+
evmFactoryMock.On("Build", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(processorMock, nil)
216+
217+
r := New(
218+
config.Root{
219+
EVM: map[string]config.EVM{
220+
"chain-1": {},
221+
},
222+
},
223+
pc,
224+
evmFactoryMock,
225+
timemocks.NewTime(t),
226+
Config{},
227+
)
228+
r.processors = []chain.Processor{
229+
chainmocks.NewProcessor(t),
230+
}
231+
232+
r.chainsInfos = []types.ChainInfo{
233+
chain1Info,
234+
}
235+
236+
return r,
237+
[]chain.Processor{
238+
processorMock,
239+
},
240+
[]types.ChainInfo{
241+
chain1NewInfo,
242+
}
243+
},
244+
},
245+
{
246+
name: "when the chains are the same it doesn't build processors",
247+
setup: func(t *testing.T) (*Relayer, []chain.Processor, []types.ChainInfo) {
248+
chain1Info := types.ChainInfo{
249+
Id: 1,
250+
ChainReferenceID: "chain-1",
251+
MinOnChainBalance: "5",
252+
}
253+
254+
pc := mocks.NewPalomaClienter(t)
255+
pc.On(
256+
"QueryGetEVMChainInfos",
257+
mock.Anything,
258+
mock.Anything,
259+
).Return(
260+
[]*types.ChainInfo{
261+
&chain1Info,
262+
},
263+
nil,
264+
)
265+
266+
r := New(
267+
config.Root{
268+
EVM: map[string]config.EVM{
269+
"chain-1": {},
270+
},
271+
},
272+
pc,
273+
mocks.NewEvmFactorier(t),
274+
timemocks.NewTime(t),
275+
Config{},
276+
)
277+
278+
origProcessor := chainmocks.NewProcessor(t)
279+
r.processors = []chain.Processor{
280+
origProcessor,
281+
}
282+
283+
r.chainsInfos = []types.ChainInfo{
284+
chain1Info,
285+
}
286+
287+
return r,
288+
[]chain.Processor{
289+
origProcessor,
290+
},
291+
[]types.ChainInfo{
292+
chain1Info,
293+
}
294+
},
295+
},
296+
}
297+
298+
asserter := assert.New(t)
299+
ctx := context.Background()
300+
for _, tt := range testcases {
301+
t.Run(tt.name, func(t *testing.T) {
302+
relayer, expectedProcessors, expectedChainsInfos := tt.setup(t)
303+
var locker testutil.FakeMutex
304+
305+
actualErr := relayer.buildProcessors(ctx, locker)
306+
asserter.Equal(tt.expectedErr, actualErr)
307+
asserter.Equal(expectedProcessors, relayer.processors)
308+
asserter.Equal(expectedChainsInfos, relayer.chainsInfos)
309+
})
310+
}
311+
}

0 commit comments

Comments
 (0)