Skip to content

Commit 4ac5b37

Browse files
TxCorpi0xoshorefueled
authored andcommitted
feat: copy cosmos message handler and assign to substrate (cosmos#1017)
1 parent 6ebb2c9 commit 4ac5b37

File tree

1 file changed

+171
-2
lines changed

1 file changed

+171
-2
lines changed
+171-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,179 @@
11
package substrate
22

33
import (
4+
conntypes "github.com/cosmos/ibc-go/v5/modules/core/03-connection/types"
5+
chantypes "github.com/cosmos/ibc-go/v5/modules/core/04-channel/types"
46
"github.com/cosmos/relayer/v2/relayer/processor"
7+
"github.com/cosmos/relayer/v2/relayer/provider"
8+
"go.uber.org/zap"
9+
"go.uber.org/zap/zapcore"
510
)
611

712
func (ccp *SubstrateChainProcessor) handleMessage(m ibcMessage, c processor.IBCMessagesCache) {
8-
//TODO implement me
9-
panic("implement me")
13+
switch t := m.info.(type) {
14+
case *packetInfo:
15+
ccp.handlePacketMessage(m.eventType, provider.PacketInfo(*t), c)
16+
case *channelInfo:
17+
ccp.handleChannelMessage(m.eventType, provider.ChannelInfo(*t), c)
18+
case *connectionInfo:
19+
ccp.handleConnectionMessage(m.eventType, provider.ConnectionInfo(*t), c)
20+
case *clientInfo:
21+
ccp.handleClientMessage(m.eventType, *t)
22+
}
23+
}
24+
25+
func (ccp *SubstrateChainProcessor) handlePacketMessage(eventType string, pi provider.PacketInfo, c processor.IBCMessagesCache) {
26+
k, err := processor.PacketInfoChannelKey(eventType, pi)
27+
if err != nil {
28+
ccp.log.Error("Unexpected error handling packet message",
29+
zap.String("event_type", eventType),
30+
zap.Uint64("sequence", pi.Sequence),
31+
zap.Inline(k),
32+
zap.Error(err),
33+
)
34+
return
35+
}
36+
37+
if eventType == chantypes.EventTypeRecvPacket && len(pi.Ack) == 0 {
38+
// ignore recv packet with empty ack bytes
39+
return
40+
}
41+
42+
if !c.PacketFlow.ShouldRetainSequence(ccp.pathProcessors, k, ccp.chainProvider.ChainId(), eventType, pi.Sequence) {
43+
ccp.log.Debug("Not retaining packet message",
44+
zap.String("event_type", eventType),
45+
zap.Uint64("sequence", pi.Sequence),
46+
zap.Inline(k),
47+
)
48+
return
49+
}
50+
51+
ccp.log.Debug("Retaining packet message",
52+
zap.String("event_type", eventType),
53+
zap.Uint64("sequence", pi.Sequence),
54+
zap.Inline(k),
55+
)
56+
57+
c.PacketFlow.Retain(k, eventType, pi)
58+
ccp.logPacketMessage(eventType, pi)
59+
}
60+
61+
func (ccp *SubstrateChainProcessor) handleChannelMessage(eventType string, ci provider.ChannelInfo, ibcMessagesCache processor.IBCMessagesCache) {
62+
ccp.channelConnections[ci.ChannelID] = ci.ConnID
63+
channelKey := processor.ChannelInfoChannelKey(ci)
64+
65+
if eventType == chantypes.EventTypeChannelOpenInit {
66+
found := false
67+
for k := range ccp.channelStateCache {
68+
// Don't add a channelKey to the channelStateCache without counterparty channel ID
69+
// since we already have the channelKey in the channelStateCache which includes the
70+
// counterparty channel ID.
71+
if k.MsgInitKey() == channelKey {
72+
found = true
73+
break
74+
}
75+
}
76+
if !found {
77+
ccp.channelStateCache[channelKey] = false
78+
}
79+
} else {
80+
switch eventType {
81+
case chantypes.EventTypeChannelOpenTry:
82+
ccp.channelStateCache[channelKey] = false
83+
case chantypes.EventTypeChannelOpenAck, chantypes.EventTypeChannelOpenConfirm:
84+
ccp.channelStateCache[channelKey] = true
85+
case chantypes.EventTypeChannelCloseConfirm:
86+
for k := range ccp.channelStateCache {
87+
if k.PortID == ci.PortID && k.ChannelID == ci.ChannelID {
88+
ccp.channelStateCache[k] = false
89+
break
90+
}
91+
}
92+
}
93+
// Clear out MsgInitKeys once we have the counterparty channel ID
94+
delete(ccp.channelStateCache, channelKey.MsgInitKey())
95+
}
96+
97+
ibcMessagesCache.ChannelHandshake.Retain(channelKey, eventType, ci)
98+
99+
ccp.logChannelMessage(eventType, ci)
100+
}
101+
102+
func (ccp *SubstrateChainProcessor) handleConnectionMessage(eventType string, ci provider.ConnectionInfo, ibcMessagesCache processor.IBCMessagesCache) {
103+
ccp.connectionClients[ci.ConnID] = ci.ClientID
104+
connectionKey := processor.ConnectionInfoConnectionKey(ci)
105+
if eventType == conntypes.EventTypeConnectionOpenInit {
106+
found := false
107+
for k := range ccp.connectionStateCache {
108+
// Don't add a connectionKey to the connectionStateCache without counterparty connection ID
109+
// since we already have the connectionKey in the connectionStateCache which includes the
110+
// counterparty connection ID.
111+
if k.MsgInitKey() == connectionKey {
112+
found = true
113+
break
114+
}
115+
}
116+
if !found {
117+
ccp.connectionStateCache[connectionKey] = false
118+
}
119+
} else {
120+
// Clear out MsgInitKeys once we have the counterparty connection ID
121+
delete(ccp.connectionStateCache, connectionKey.MsgInitKey())
122+
open := (eventType == conntypes.EventTypeConnectionOpenAck || eventType == conntypes.EventTypeConnectionOpenConfirm)
123+
ccp.connectionStateCache[connectionKey] = open
124+
}
125+
ibcMessagesCache.ConnectionHandshake.Retain(connectionKey, eventType, ci)
126+
127+
ccp.logConnectionMessage(eventType, ci)
128+
}
129+
130+
func (ccp *SubstrateChainProcessor) handleClientMessage(eventType string, ci clientInfo) {
131+
ccp.latestClientState.update(ci)
132+
ccp.logObservedIBCMessage(eventType, zap.String("client_id", ci.ClientID))
133+
}
134+
135+
func (ccp *SubstrateChainProcessor) logObservedIBCMessage(m string, fields ...zap.Field) {
136+
ccp.log.With(zap.String("event_type", m)).Debug("Observed IBC message", fields...)
137+
}
138+
139+
func (ccp *SubstrateChainProcessor) logPacketMessage(message string, pi provider.PacketInfo) {
140+
if !ccp.log.Core().Enabled(zapcore.DebugLevel) {
141+
return
142+
}
143+
fields := []zap.Field{
144+
zap.Uint64("sequence", pi.Sequence),
145+
zap.String("src_channel", pi.SourceChannel),
146+
zap.String("src_port", pi.SourcePort),
147+
zap.String("dst_channel", pi.DestChannel),
148+
zap.String("dst_port", pi.DestPort),
149+
}
150+
if pi.TimeoutHeight.RevisionHeight > 0 {
151+
fields = append(fields, zap.Uint64("timeout_height", pi.TimeoutHeight.RevisionHeight))
152+
}
153+
if pi.TimeoutHeight.RevisionNumber > 0 {
154+
fields = append(fields, zap.Uint64("timeout_height_revision", pi.TimeoutHeight.RevisionNumber))
155+
}
156+
if pi.TimeoutTimestamp > 0 {
157+
fields = append(fields, zap.Uint64("timeout_timestamp", pi.TimeoutTimestamp))
158+
}
159+
ccp.logObservedIBCMessage(message, fields...)
160+
}
161+
162+
func (ccp *SubstrateChainProcessor) logChannelMessage(message string, ci provider.ChannelInfo) {
163+
ccp.logObservedIBCMessage(message,
164+
zap.String("channel_id", ci.ChannelID),
165+
zap.String("port_id", ci.PortID),
166+
zap.String("counterparty_channel_id", ci.CounterpartyChannelID),
167+
zap.String("counterparty_port_id", ci.CounterpartyPortID),
168+
zap.String("connection_id", ci.ConnID),
169+
)
170+
}
171+
172+
func (ccp *SubstrateChainProcessor) logConnectionMessage(message string, ci provider.ConnectionInfo) {
173+
ccp.logObservedIBCMessage(message,
174+
zap.String("client_id", ci.ClientID),
175+
zap.String("connection_id", ci.ConnID),
176+
zap.String("counterparty_client_id", ci.CounterpartyClientID),
177+
zap.String("counterparty_connection_id", ci.CounterpartyConnID),
178+
)
10179
}

0 commit comments

Comments
 (0)