@@ -21,20 +21,15 @@ import (
21
21
logging "github.com/ipfs/go-log/v2"
22
22
"github.com/ipfs/go-metrics-interface"
23
23
process "github.com/jbenet/goprocess"
24
- procctx "github.com/jbenet/goprocess/context"
25
24
"github.com/libp2p/go-libp2p/core/peer"
26
25
"go.uber.org/zap"
27
26
)
28
27
29
- var provideKeysBufferSize = 2048
30
-
31
28
var (
32
29
log = logging .Logger ("bitswap-server" )
33
30
sflog = log .Desugar ()
34
31
)
35
32
36
- const provideWorkerMax = 6
37
-
38
33
type Option func (* Server )
39
34
40
35
type Server struct {
@@ -59,20 +54,8 @@ type Server struct {
59
54
60
55
process process.Process
61
56
62
- // newBlocks is a channel for newly added blocks to be provided to the
63
- // network. blocks pushed down this channel get buffered and fed to the
64
- // provideKeys channel later on to avoid too much network activity
65
- newBlocks chan cid.Cid
66
- // provideKeys directly feeds provide workers
67
- provideKeys chan cid.Cid
68
-
69
57
// Extra options to pass to the decision manager
70
58
engineOptions []decision.Option
71
-
72
- // the size of channel buffer to use
73
- hasBlockBufferSize int
74
- // whether or not to make provide announcements
75
- provideEnabled bool
76
59
}
77
60
78
61
func New (ctx context.Context , network bsnet.BitSwapNetwork , bstore blockstore.Blockstore , options ... Option ) * Server {
@@ -87,16 +70,12 @@ func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Bl
87
70
}()
88
71
89
72
s := & Server {
90
- sentHistogram : bmetrics .SentHist (ctx ),
91
- sendTimeHistogram : bmetrics .SendTimeHist (ctx ),
92
- taskWorkerCount : defaults .BitswapTaskWorkerCount ,
93
- network : network ,
94
- process : px ,
95
- provideEnabled : true ,
96
- hasBlockBufferSize : defaults .HasBlockBufferSize ,
97
- provideKeys : make (chan cid.Cid , provideKeysBufferSize ),
73
+ sentHistogram : bmetrics .SentHist (ctx ),
74
+ sendTimeHistogram : bmetrics .SendTimeHist (ctx ),
75
+ taskWorkerCount : defaults .BitswapTaskWorkerCount ,
76
+ network : network ,
77
+ process : px ,
98
78
}
99
- s .newBlocks = make (chan cid.Cid , s .hasBlockBufferSize )
100
79
101
80
for _ , o := range options {
102
81
o (s )
@@ -131,13 +110,6 @@ func WithTracer(tap tracer.Tracer) Option {
131
110
}
132
111
}
133
112
134
- // ProvideEnabled is an option for enabling/disabling provide announcements
135
- func ProvideEnabled (enabled bool ) Option {
136
- return func (bs * Server ) {
137
- bs .provideEnabled = enabled
138
- }
139
- }
140
-
141
113
func WithPeerBlockRequestFilter (pbrf decision.PeerBlockRequestFilter ) Option {
142
114
o := decision .WithPeerBlockRequestFilter (pbrf )
143
115
return func (bs * Server ) {
@@ -233,16 +205,6 @@ func MaxCidSize(n uint) Option {
233
205
}
234
206
}
235
207
236
- // HasBlockBufferSize configure how big the new blocks buffer should be.
237
- func HasBlockBufferSize (count int ) Option {
238
- if count < 0 {
239
- panic ("cannot have negative buffer size" )
240
- }
241
- return func (bs * Server ) {
242
- bs .hasBlockBufferSize = count
243
- }
244
- }
245
-
246
208
// WantlistForPeer returns the currently understood list of blocks requested by a
247
209
// given peer.
248
210
func (bs * Server ) WantlistForPeer (p peer.ID ) []cid.Cid {
@@ -263,18 +225,6 @@ func (bs *Server) startWorkers(ctx context.Context, px process.Process) {
263
225
bs .taskWorker (ctx , i )
264
226
})
265
227
}
266
-
267
- if bs .provideEnabled {
268
- // Start up a worker to manage sending out provides messages
269
- px .Go (func (px process.Process ) {
270
- bs .provideCollector (ctx )
271
- })
272
-
273
- // Spawn up multiple workers to handle incoming blocks
274
- // consider increasing number if providing blocks bottlenecks
275
- // file transfers
276
- px .Go (bs .provideWorker )
277
- }
278
228
}
279
229
280
230
func (bs * Server ) taskWorker (ctx context.Context , id int ) {
@@ -382,18 +332,16 @@ func (bs *Server) sendBlocks(ctx context.Context, env *decision.Envelope) {
382
332
}
383
333
384
334
type Stat struct {
385
- Peers []string
386
- ProvideBufLen int
387
- BlocksSent uint64
388
- DataSent uint64
335
+ Peers []string
336
+ BlocksSent uint64
337
+ DataSent uint64
389
338
}
390
339
391
340
// Stat returns aggregated statistics about bitswap operations
392
341
func (bs * Server ) Stat () (Stat , error ) {
393
342
bs .counterLk .Lock ()
394
343
s := bs .counters
395
344
bs .counterLk .Unlock ()
396
- s .ProvideBufLen = len (bs .newBlocks )
397
345
398
346
peers := bs .engine .Peers ()
399
347
peersStr := make ([]string , len (peers ))
@@ -420,107 +368,9 @@ func (bs *Server) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) err
420
368
// Send wanted blocks to decision engine
421
369
bs .engine .NotifyNewBlocks (blks )
422
370
423
- // If the reprovider is enabled, send block to reprovider
424
- if bs .provideEnabled {
425
- for _ , blk := range blks {
426
- select {
427
- case bs .newBlocks <- blk .Cid ():
428
- // send block off to be reprovided
429
- case <- bs .process .Closing ():
430
- return bs .process .Close ()
431
- }
432
- }
433
- }
434
-
435
371
return nil
436
372
}
437
373
438
- func (bs * Server ) provideCollector (ctx context.Context ) {
439
- defer close (bs .provideKeys )
440
- var toProvide []cid.Cid
441
- var nextKey cid.Cid
442
- var keysOut chan cid.Cid
443
-
444
- for {
445
- select {
446
- case blkey , ok := <- bs .newBlocks :
447
- if ! ok {
448
- log .Debug ("newBlocks channel closed" )
449
- return
450
- }
451
-
452
- if keysOut == nil {
453
- nextKey = blkey
454
- keysOut = bs .provideKeys
455
- } else {
456
- toProvide = append (toProvide , blkey )
457
- }
458
- case keysOut <- nextKey :
459
- if len (toProvide ) > 0 {
460
- nextKey = toProvide [0 ]
461
- toProvide = toProvide [1 :]
462
- } else {
463
- keysOut = nil
464
- }
465
- case <- ctx .Done ():
466
- return
467
- }
468
- }
469
- }
470
-
471
- func (bs * Server ) provideWorker (px process.Process ) {
472
- // FIXME: OnClosingContext returns a _custom_ context type.
473
- // Unfortunately, deriving a new cancelable context from this custom
474
- // type fires off a goroutine. To work around this, we create a single
475
- // cancelable context up-front and derive all sub-contexts from that.
476
- //
477
- // See: https://github.com/ipfs/go-ipfs/issues/5810
478
- ctx := procctx .OnClosingContext (px )
479
- ctx , cancel := context .WithCancel (ctx )
480
- defer cancel ()
481
-
482
- limit := make (chan struct {}, provideWorkerMax )
483
-
484
- limitedGoProvide := func (k cid.Cid , wid int ) {
485
- defer func () {
486
- // replace token when done
487
- <- limit
488
- }()
489
-
490
- log .Debugw ("Bitswap.ProvideWorker.Start" , "ID" , wid , "cid" , k )
491
- defer log .Debugw ("Bitswap.ProvideWorker.End" , "ID" , wid , "cid" , k )
492
-
493
- ctx , cancel := context .WithTimeout (ctx , defaults .ProvideTimeout ) // timeout ctx
494
- defer cancel ()
495
-
496
- if err := bs .network .Provide (ctx , k ); err != nil {
497
- log .Warn (err )
498
- }
499
- }
500
-
501
- // worker spawner, reads from bs.provideKeys until it closes, spawning a
502
- // _ratelimited_ number of workers to handle each key.
503
- for wid := 2 ; ; wid ++ {
504
- log .Debug ("Bitswap.ProvideWorker.Loop" )
505
-
506
- select {
507
- case <- px .Closing ():
508
- return
509
- case k , ok := <- bs .provideKeys :
510
- if ! ok {
511
- log .Debug ("provideKeys channel closed" )
512
- return
513
- }
514
- select {
515
- case <- px .Closing ():
516
- return
517
- case limit <- struct {}{}:
518
- go limitedGoProvide (k , wid )
519
- }
520
- }
521
- }
522
- }
523
-
524
374
func (bs * Server ) ReceiveMessage (ctx context.Context , p peer.ID , incoming message.BitSwapMessage ) {
525
375
// This call records changes to wantlists, blocks received,
526
376
// and number of bytes transfered.
0 commit comments