Skip to content

Commit 94be73e

Browse files
committed
bitswap/server: remove provide
We always had a very weird relationship between bitswap and providing. Bitswap took care of doing the initial provide and then reprovider did it later. The Bitswap server had a complicated providing workflow where it slurped thing into memory. Reprovide accepts provides and is able to queue them in a database, such as on disk, this is much better. I'll add options to hook initial provide logic from the blockservice to the reprovider queue so consumers don't have to do this themselves.
1 parent 507356b commit 94be73e

File tree

7 files changed

+17
-178
lines changed

7 files changed

+17
-178
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ The following emojis are used to highlight certain changes:
2323

2424
### Removed
2525

26+
- 🛠 `bitswap` & `bitswap/server` no longer provide to content routers, instead you can use the `provider` package because it uses a datastore queue and batches calls to ProvideMany.
27+
2628
### Security
2729

2830
## [v0.17.0]

bitswap/bitswap.go

+2-10
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66

77
"github.com/ipfs/boxo/bitswap/client"
8-
"github.com/ipfs/boxo/bitswap/internal/defaults"
98
"github.com/ipfs/boxo/bitswap/message"
109
"github.com/ipfs/boxo/bitswap/network"
1110
"github.com/ipfs/boxo/bitswap/server"
@@ -45,9 +44,8 @@ type bitswap interface {
4544
}
4645

4746
var (
48-
_ exchange.SessionExchange = (*Bitswap)(nil)
49-
_ bitswap = (*Bitswap)(nil)
50-
HasBlockBufferSize = defaults.HasBlockBufferSize
47+
_ exchange.SessionExchange = (*Bitswap)(nil)
48+
_ bitswap = (*Bitswap)(nil)
5149
)
5250

5351
type Bitswap struct {
@@ -85,10 +83,6 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc
8583
serverOptions = append(serverOptions, server.WithTracer(tracer))
8684
}
8785

88-
if HasBlockBufferSize != defaults.HasBlockBufferSize {
89-
serverOptions = append(serverOptions, server.HasBlockBufferSize(HasBlockBufferSize))
90-
}
91-
9286
ctx = metrics.CtxSubScope(ctx, "bitswap")
9387

9488
bs.Server = server.New(ctx, net, bstore, serverOptions...)
@@ -115,7 +109,6 @@ type Stat struct {
115109
MessagesReceived uint64
116110
BlocksSent uint64
117111
DataSent uint64
118-
ProvideBufLen int
119112
}
120113

121114
func (bs *Bitswap) Stat() (*Stat, error) {
@@ -138,7 +131,6 @@ func (bs *Bitswap) Stat() (*Stat, error) {
138131
Peers: ss.Peers,
139132
BlocksSent: ss.BlocksSent,
140133
DataSent: ss.DataSent,
141-
ProvideBufLen: ss.ProvideBufLen,
142134
}, nil
143135
}
144136

bitswap/bitswap_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
120120
func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
121121
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
122122
block := blocks.NewBlock([]byte("block"))
123-
bsOpts := []bitswap.Option{bitswap.ProvideEnabled(false), bitswap.ProviderSearchDelay(50 * time.Millisecond)}
123+
bsOpts := []bitswap.Option{bitswap.ProviderSearchDelay(50 * time.Millisecond)}
124124
ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts)
125125
defer ig.Close()
126126

bitswap/client/bitswap_with_sessions_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk
3737
if err != nil {
3838
t.Fatal(err)
3939
}
40+
err = inst.Adapter.Provide(ctx, blk.Cid())
41+
if err != nil {
42+
t.Fatal(err)
43+
}
4044
}
4145

4246
func TestBasicSessions(t *testing.T) {

bitswap/internal/defaults/defaults.go

-5
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,6 @@ const (
2020
BitswapMaxOutstandingBytesPerPeer = 1 << 20
2121
// the number of bytes we attempt to make each outgoing bitswap message
2222
BitswapEngineTargetMessageSize = 16 * 1024
23-
// HasBlockBufferSize is the buffer size of the channel for new blocks
24-
// that need to be provided. They should get pulled over by the
25-
// provideCollector even before they are actually provided.
26-
// TODO: Does this need to be this large givent that?
27-
HasBlockBufferSize = 256
2823

2924
// Maximum size of the wantlist we are willing to keep in memory.
3025
MaxQueuedWantlistEntiresPerPeer = 1024

bitswap/options.go

-4
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,6 @@ func TaskWorkerCount(count int) Option {
4343
return Option{server.TaskWorkerCount(count)}
4444
}
4545

46-
func ProvideEnabled(enabled bool) Option {
47-
return Option{server.ProvideEnabled(enabled)}
48-
}
49-
5046
func SetSendDontHaves(send bool) Option {
5147
return Option{server.SetSendDontHaves(send)}
5248
}

bitswap/server/server.go

+8-158
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,15 @@ import (
2121
logging "github.com/ipfs/go-log/v2"
2222
"github.com/ipfs/go-metrics-interface"
2323
process "github.com/jbenet/goprocess"
24-
procctx "github.com/jbenet/goprocess/context"
2524
"github.com/libp2p/go-libp2p/core/peer"
2625
"go.uber.org/zap"
2726
)
2827

29-
var provideKeysBufferSize = 2048
30-
3128
var (
3229
log = logging.Logger("bitswap-server")
3330
sflog = log.Desugar()
3431
)
3532

36-
const provideWorkerMax = 6
37-
3833
type Option func(*Server)
3934

4035
type Server struct {
@@ -59,20 +54,8 @@ type Server struct {
5954

6055
process process.Process
6156

62-
// newBlocks is a channel for newly added blocks to be provided to the
63-
// network. blocks pushed down this channel get buffered and fed to the
64-
// provideKeys channel later on to avoid too much network activity
65-
newBlocks chan cid.Cid
66-
// provideKeys directly feeds provide workers
67-
provideKeys chan cid.Cid
68-
6957
// Extra options to pass to the decision manager
7058
engineOptions []decision.Option
71-
72-
// the size of channel buffer to use
73-
hasBlockBufferSize int
74-
// whether or not to make provide announcements
75-
provideEnabled bool
7659
}
7760

7861
func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Blockstore, options ...Option) *Server {
@@ -87,16 +70,12 @@ func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Bl
8770
}()
8871

8972
s := &Server{
90-
sentHistogram: bmetrics.SentHist(ctx),
91-
sendTimeHistogram: bmetrics.SendTimeHist(ctx),
92-
taskWorkerCount: defaults.BitswapTaskWorkerCount,
93-
network: network,
94-
process: px,
95-
provideEnabled: true,
96-
hasBlockBufferSize: defaults.HasBlockBufferSize,
97-
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
73+
sentHistogram: bmetrics.SentHist(ctx),
74+
sendTimeHistogram: bmetrics.SendTimeHist(ctx),
75+
taskWorkerCount: defaults.BitswapTaskWorkerCount,
76+
network: network,
77+
process: px,
9878
}
99-
s.newBlocks = make(chan cid.Cid, s.hasBlockBufferSize)
10079

10180
for _, o := range options {
10281
o(s)
@@ -131,13 +110,6 @@ func WithTracer(tap tracer.Tracer) Option {
131110
}
132111
}
133112

134-
// ProvideEnabled is an option for enabling/disabling provide announcements
135-
func ProvideEnabled(enabled bool) Option {
136-
return func(bs *Server) {
137-
bs.provideEnabled = enabled
138-
}
139-
}
140-
141113
func WithPeerBlockRequestFilter(pbrf decision.PeerBlockRequestFilter) Option {
142114
o := decision.WithPeerBlockRequestFilter(pbrf)
143115
return func(bs *Server) {
@@ -233,16 +205,6 @@ func MaxCidSize(n uint) Option {
233205
}
234206
}
235207

236-
// HasBlockBufferSize configure how big the new blocks buffer should be.
237-
func HasBlockBufferSize(count int) Option {
238-
if count < 0 {
239-
panic("cannot have negative buffer size")
240-
}
241-
return func(bs *Server) {
242-
bs.hasBlockBufferSize = count
243-
}
244-
}
245-
246208
// WantlistForPeer returns the currently understood list of blocks requested by a
247209
// given peer.
248210
func (bs *Server) WantlistForPeer(p peer.ID) []cid.Cid {
@@ -263,18 +225,6 @@ func (bs *Server) startWorkers(ctx context.Context, px process.Process) {
263225
bs.taskWorker(ctx, i)
264226
})
265227
}
266-
267-
if bs.provideEnabled {
268-
// Start up a worker to manage sending out provides messages
269-
px.Go(func(px process.Process) {
270-
bs.provideCollector(ctx)
271-
})
272-
273-
// Spawn up multiple workers to handle incoming blocks
274-
// consider increasing number if providing blocks bottlenecks
275-
// file transfers
276-
px.Go(bs.provideWorker)
277-
}
278228
}
279229

280230
func (bs *Server) taskWorker(ctx context.Context, id int) {
@@ -382,18 +332,16 @@ func (bs *Server) sendBlocks(ctx context.Context, env *decision.Envelope) {
382332
}
383333

384334
type Stat struct {
385-
Peers []string
386-
ProvideBufLen int
387-
BlocksSent uint64
388-
DataSent uint64
335+
Peers []string
336+
BlocksSent uint64
337+
DataSent uint64
389338
}
390339

391340
// Stat returns aggregated statistics about bitswap operations
392341
func (bs *Server) Stat() (Stat, error) {
393342
bs.counterLk.Lock()
394343
s := bs.counters
395344
bs.counterLk.Unlock()
396-
s.ProvideBufLen = len(bs.newBlocks)
397345

398346
peers := bs.engine.Peers()
399347
peersStr := make([]string, len(peers))
@@ -420,107 +368,9 @@ func (bs *Server) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) err
420368
// Send wanted blocks to decision engine
421369
bs.engine.NotifyNewBlocks(blks)
422370

423-
// If the reprovider is enabled, send block to reprovider
424-
if bs.provideEnabled {
425-
for _, blk := range blks {
426-
select {
427-
case bs.newBlocks <- blk.Cid():
428-
// send block off to be reprovided
429-
case <-bs.process.Closing():
430-
return bs.process.Close()
431-
}
432-
}
433-
}
434-
435371
return nil
436372
}
437373

438-
func (bs *Server) provideCollector(ctx context.Context) {
439-
defer close(bs.provideKeys)
440-
var toProvide []cid.Cid
441-
var nextKey cid.Cid
442-
var keysOut chan cid.Cid
443-
444-
for {
445-
select {
446-
case blkey, ok := <-bs.newBlocks:
447-
if !ok {
448-
log.Debug("newBlocks channel closed")
449-
return
450-
}
451-
452-
if keysOut == nil {
453-
nextKey = blkey
454-
keysOut = bs.provideKeys
455-
} else {
456-
toProvide = append(toProvide, blkey)
457-
}
458-
case keysOut <- nextKey:
459-
if len(toProvide) > 0 {
460-
nextKey = toProvide[0]
461-
toProvide = toProvide[1:]
462-
} else {
463-
keysOut = nil
464-
}
465-
case <-ctx.Done():
466-
return
467-
}
468-
}
469-
}
470-
471-
func (bs *Server) provideWorker(px process.Process) {
472-
// FIXME: OnClosingContext returns a _custom_ context type.
473-
// Unfortunately, deriving a new cancelable context from this custom
474-
// type fires off a goroutine. To work around this, we create a single
475-
// cancelable context up-front and derive all sub-contexts from that.
476-
//
477-
// See: https://github.com/ipfs/go-ipfs/issues/5810
478-
ctx := procctx.OnClosingContext(px)
479-
ctx, cancel := context.WithCancel(ctx)
480-
defer cancel()
481-
482-
limit := make(chan struct{}, provideWorkerMax)
483-
484-
limitedGoProvide := func(k cid.Cid, wid int) {
485-
defer func() {
486-
// replace token when done
487-
<-limit
488-
}()
489-
490-
log.Debugw("Bitswap.ProvideWorker.Start", "ID", wid, "cid", k)
491-
defer log.Debugw("Bitswap.ProvideWorker.End", "ID", wid, "cid", k)
492-
493-
ctx, cancel := context.WithTimeout(ctx, defaults.ProvideTimeout) // timeout ctx
494-
defer cancel()
495-
496-
if err := bs.network.Provide(ctx, k); err != nil {
497-
log.Warn(err)
498-
}
499-
}
500-
501-
// worker spawner, reads from bs.provideKeys until it closes, spawning a
502-
// _ratelimited_ number of workers to handle each key.
503-
for wid := 2; ; wid++ {
504-
log.Debug("Bitswap.ProvideWorker.Loop")
505-
506-
select {
507-
case <-px.Closing():
508-
return
509-
case k, ok := <-bs.provideKeys:
510-
if !ok {
511-
log.Debug("provideKeys channel closed")
512-
return
513-
}
514-
select {
515-
case <-px.Closing():
516-
return
517-
case limit <- struct{}{}:
518-
go limitedGoProvide(k, wid)
519-
}
520-
}
521-
}
522-
}
523-
524374
func (bs *Server) ReceiveMessage(ctx context.Context, p peer.ID, incoming message.BitSwapMessage) {
525375
// This call records changes to wantlists, blocks received,
526376
// and number of bytes transfered.

0 commit comments

Comments
 (0)