5
5
"github.com/tendermint/tendermint/p2p"
6
6
"github.com/tendermint/tendermint/pkg/trace/schema"
7
7
propproto "github.com/tendermint/tendermint/proto/tendermint/propagation"
8
- "github.com/tendermint/tendermint/types"
9
8
)
10
9
11
10
// handleHaves is called when a peer sends a have message. This is used to
@@ -37,25 +36,28 @@ func (blockProp *Reactor) handleHaves(peer p2p.ID, haves *proptypes.HaveParts, b
37
36
blockProp .Logger .Error ("peer not found" , "peer" , peer )
38
37
return
39
38
}
39
+
40
40
_ , parts , fullReqs , has := blockProp .getAllState (height , round )
41
41
if ! has {
42
42
// TODO disconnect from the peer
43
43
blockProp .Logger .Error ("received part state for unknown proposal" , "peer" , peer , "height" , height , "round" , round )
44
44
return
45
45
}
46
46
47
- blockProp .mtx .RLock ()
48
- defer blockProp .mtx .RUnlock ()
47
+ p .Initialize (height , round , int (parts .Total ()))
49
48
50
- // Update the peer's haves.
51
- p .SetHaves (height , round , haves )
49
+ bm , _ := p .GetHaves (height , round )
52
50
53
- if parts .IsComplete () {
51
+ for _ , pmd := range haves .Parts {
52
+ bm .SetIndex (int (pmd .Index ), true )
53
+ }
54
+
55
+ if parts .Original ().IsComplete () {
54
56
return
55
57
}
56
58
57
59
// Check if the sender has parts that we don't have.
58
- hc := haves .Copy ( )
60
+ hc := haves .BitArray ( int ( parts . Total ()) )
59
61
hc .Sub (parts .BitArray ())
60
62
61
63
// remove any parts that we have already requested sufficient times.
@@ -74,7 +76,7 @@ func (blockProp *Reactor) handleHaves(peer p2p.ID, haves *proptypes.HaveParts, b
74
76
reqs := blockProp .countRequests (height , round , partIndex )
75
77
if len (reqs ) >= reqLimit {
76
78
// TODO unify the types for the indexes and similar
77
- hc .RemoveIndex ( uint32 ( partIndex ) )
79
+ hc .SetIndex ( partIndex , false )
78
80
// mark the part as fully requested.
79
81
fullReqs .SetIndex (partIndex , true )
80
82
}
@@ -83,7 +85,7 @@ func (blockProp *Reactor) handleHaves(peer p2p.ID, haves *proptypes.HaveParts, b
83
85
for _ , p := range reqs {
84
86
// p == peer means we have already requested the part from this peer.
85
87
if p == peer {
86
- hc .RemoveIndex ( uint32 ( partIndex ) )
88
+ hc .SetIndex ( partIndex , false )
87
89
}
88
90
}
89
91
}
@@ -100,7 +102,7 @@ func (blockProp *Reactor) handleHaves(peer p2p.ID, haves *proptypes.HaveParts, b
100
102
Message : & propproto.WantParts {
101
103
Height : height ,
102
104
Round : round ,
103
- Parts : * hc .ToBitArray (). ToProto (),
105
+ Parts : * hc .ToProto (),
104
106
},
105
107
}
106
108
@@ -121,8 +123,8 @@ func (blockProp *Reactor) handleHaves(peer p2p.ID, haves *proptypes.HaveParts, b
121
123
122
124
// keep track of the parts that this node has requested.
123
125
// TODO check if we need to persist the have parts or just their bitarray
124
- p .SetRequests (height , round , hc . ToBitArray () )
125
- blockProp .broadcastHaves (hc , peer )
126
+ p .AddRequests (height , round , hc )
127
+ blockProp .broadcastHaves (haves , peer , int ( parts . Total ()) )
126
128
}
127
129
128
130
// todo(evan): refactor to not iterate so often and just store which peers
@@ -143,46 +145,35 @@ func (blockProp *Reactor) countRequests(height int64, round int32, part int) []p
143
145
// broadcastHaves gossips the provided have msg to all peers except to the
144
146
// original sender. This should only be called upon receiving a new have for the
145
147
// first time.
146
- func (blockProp * Reactor ) broadcastHaves (haves * proptypes.HaveParts , from p2p.ID ) {
147
- e := p2p.Envelope {
148
- ChannelID : DataChannel ,
149
- Message : & propproto.HaveParts {
150
- Height : haves .Height ,
151
- Round : haves .Round ,
152
- Parts : haves .ToProto ().Parts ,
153
- },
154
- }
148
+ //
149
+ // todo: add a test to ensure that we don't send the same haves to the same
150
+ // peers more than once.
151
+ func (blockProp * Reactor ) broadcastHaves (haves * proptypes.HaveParts , from p2p.ID , partSetSize int ) {
155
152
for _ , peer := range blockProp .getPeers () {
156
153
if peer .peer .ID () == from {
157
154
continue
158
155
}
159
156
160
- // skip sending anything to this peer if they already have all the
161
- // parts.
162
- ph , has := peer . GetHaves ( haves . Height , haves . Round )
163
- if has {
164
- havesCopy := haves . Copy ()
165
- havesCopy . Sub ( ph . ToBitArray ())
166
- if havesCopy . IsEmpty () {
167
- continue
168
- }
157
+ // todo: don't re-send haves to peers that already have it.
158
+
159
+ e := p2p. Envelope {
160
+ ChannelID : DataChannel ,
161
+ Message : & propproto. HaveParts {
162
+ Height : haves . Height ,
163
+ Round : haves . Round ,
164
+ Parts : haves . ToProto (). Parts ,
165
+ },
169
166
}
170
167
171
168
// todo(evan): don't rely strictly on try, however since we're using
172
169
// pull based gossip, this isn't as big as a deal since if someone asks
173
170
// for data, they must already have the proposal.
174
171
// TODO: use retry and logs
175
- if p2p .SendEnvelopeShim (peer .peer , e , blockProp .Logger ) { //nolint:staticcheck
176
- schema .WriteBlockPartState (
177
- blockProp .traceClient ,
178
- haves .Height ,
179
- haves .Round ,
180
- haves .GetTrueIndices (),
181
- true ,
182
- string (peer .peer .ID ()),
183
- schema .Upload ,
184
- )
172
+ if ! p2p .TrySendEnvelopeShim (peer .peer , e , blockProp .Logger ) { //nolint:staticcheck
173
+ blockProp .Logger .Debug ("failed to send haves to peer" , "peer" , peer .peer .ID ())
174
+ continue
185
175
}
176
+ peer .AddHaves (haves .Height , haves .Round , haves .BitArray (partSetSize ))
186
177
}
187
178
}
188
179
@@ -268,11 +259,7 @@ func (blockProp *Reactor) handleWants(peer p2p.ID, wants *proptypes.WantParts) {
268
259
// for parts that we don't have, but they still want, store the wants.
269
260
stillMissing := wants .Parts .Sub (canSend )
270
261
if ! stillMissing .IsEmpty () {
271
- p .SetWants (& proptypes.WantParts {
272
- Parts : stillMissing ,
273
- Height : height ,
274
- Round : round ,
275
- })
262
+ p .AddWants (height , round , stillMissing )
276
263
}
277
264
}
278
265
@@ -319,8 +306,8 @@ func (blockProp *Reactor) handleRecoveryPart(peer p2p.ID, part *proptypes.Recove
319
306
}
320
307
// the peer must always send the proposal before sending parts, if they did
321
308
// not this node must disconnect from them.
322
- _ , parts , has := blockProp .GetProposal (part .Height , part .Round )
323
- if ! has { // fmt.Println("unknown proposal")
309
+ _ , parts , _ , has := blockProp .getAllState (part .Height , part .Round )
310
+ if ! has {
324
311
blockProp .Logger .Error ("received part for unknown proposal" , "peer" , peer , "height" , part .Height , "round" , part .Round )
325
312
// d.pswitch.StopPeerForError(p.peer, fmt.Errorf("received part for unknown proposal"))
326
313
return
@@ -330,8 +317,9 @@ func (blockProp *Reactor) handleRecoveryPart(peer p2p.ID, part *proptypes.Recove
330
317
return
331
318
}
332
319
333
- // TODO this is not verifying the proof. make it verify it
334
- added , err := parts .AddPartWithoutProof (& types.Part {Index : part .Index , Bytes : part .Data })
320
+ // TODO: to verify, compare the hash with that of the have that was sent for
321
+ // this part and verified.
322
+ added , err := parts .AddPart (part )
335
323
if err != nil {
336
324
blockProp .Logger .Error ("failed to add part to part set" , "peer" , peer , "height" , part .Height , "round" , part .Round , "part" , part .Index , "error" , err )
337
325
return
@@ -345,13 +333,35 @@ func (blockProp *Reactor) handleRecoveryPart(peer p2p.ID, part *proptypes.Recove
345
333
346
334
// attempt to decode the remaining block parts. If they are decoded, then
347
335
// this node should send all the wanted parts that nodes have requested.
348
- if parts .IsReadyForDecoding () {
349
- // TODO decode once we have parity data support
336
+ if parts .CanDecode () {
337
+ err := parts .Decode ()
338
+ if err != nil {
339
+ blockProp .Logger .Error ("failed to decode parts" , "peer" , peer , "height" , part .Height , "round" , part .Round , "error" , err )
340
+ return
341
+ }
342
+
343
+ // broadcast haves for all parts since we've decoded the entire block.
344
+ // rely on the broadcast method to ensure that parts are only sent once.
345
+ haves := & proptypes.HaveParts {
346
+ Height : part .Height ,
347
+ Round : part .Round ,
348
+ }
349
+
350
+ for i := uint32 (0 ); i < parts .Total (); i ++ {
351
+ p , has := parts .GetPart (i )
352
+ if ! has {
353
+ blockProp .Logger .Error ("failed to get decoded part" , "peer" , peer , "height" , part .Height , "round" , part .Round , "part" , i )
354
+ continue
355
+ }
356
+ haves .Parts = append (haves .Parts , proptypes.PartMetaData {Index : i , Proof : p .Proof , Hash : p .Proof .LeafHash })
357
+ }
358
+
359
+ blockProp .broadcastHaves (haves , peer , int (parts .Total ()))
350
360
351
361
// clear all the wants if they exist
352
- go func (height int64 , round int32 , parts * types. PartSet ) {
362
+ go func (height int64 , round int32 , parts * proptypes. CombinedPartSet ) {
353
363
for i := uint32 (0 ); i < parts .Total (); i ++ {
354
- p := parts .GetPart (int ( i ) )
364
+ p , _ := parts .GetPart (i )
355
365
msg := & proptypes.RecoveryPart {
356
366
Height : height ,
357
367
Round : round ,
@@ -365,8 +375,6 @@ func (blockProp *Reactor) handleRecoveryPart(peer p2p.ID, part *proptypes.Recove
365
375
return
366
376
}
367
377
368
- // todo(evan): temporarily disabling
369
- // go d.broadcastHaves(part.Height, part.Round, parts.BitArray(), peer)
370
378
// TODO better go routines management
371
379
go blockProp .clearWants (part )
372
380
}
@@ -385,17 +393,19 @@ func (blockProp *Reactor) clearWants(part *proptypes.RecoveryPart) {
385
393
ChannelID : DataChannel ,
386
394
Message : & propproto.RecoveryPart {Height : part .Height , Round : part .Round , Index : part .Index , Data : part .Data },
387
395
}
388
- if p2p .SendEnvelopeShim (peer .peer , e , blockProp .Logger ) { //nolint:staticcheck
389
- peer . SetHave ( part .Height , part .Round , int ( part .Index ) )
390
- peer . SetWant ( part . Height , part . Round , int ( part . Index ), false )
391
- catchup := false
392
- blockProp . pmtx . RLock ( )
393
- if part .Height < blockProp . currentHeight {
394
- catchup = true
395
- }
396
- blockProp .pmtx . RUnlock ()
397
- schema . WriteBlockPart ( blockProp . traceClient , part . Height , part . Round , part . Index , catchup , string ( peer . peer . ID ()), schema . Upload )
396
+ if ! p2p .TrySendEnvelopeShim (peer .peer , e , blockProp .Logger ) { //nolint:staticcheck
397
+ blockProp . Logger . Error ( "failed to send part" , " peer" , peer . peer . ID (), "height" , part .Height , "round" , part .Round , " part" , part .Index )
398
+ continue
399
+ }
400
+ peer . SetHave ( part . Height , part . Round , int ( part . Index ) )
401
+ peer . SetWant ( part .Height , part . Round , int ( part . Index ), false )
402
+ catchup := false
403
+ blockProp . pmtx . Lock ()
404
+ if part . Height < blockProp .currentHeight {
405
+ catchup = true
398
406
}
407
+ blockProp .pmtx .Unlock ()
408
+ schema .WriteBlockPart (blockProp .traceClient , part .Height , part .Round , part .Index , catchup , string (peer .peer .ID ()), schema .Upload )
399
409
}
400
410
}
401
411
}
0 commit comments