@@ -144,29 +144,19 @@ func (s *blockService) Allowlist() verifcid.Allowlist {
144
144
// If the current exchange is a SessionExchange, a new exchange
145
145
// session will be created. Otherwise, the current exchange will be used
146
146
// directly.
147
+ // Sessions are lazily setup, this is cheap.
147
148
func NewSession (ctx context.Context , bs BlockService ) * Session {
148
- allowlist := verifcid .Allowlist (verifcid .DefaultAllowlist )
149
+ ses := grabSessionFromContext (ctx , bs )
150
+ if ses != nil {
151
+ return ses
152
+ }
153
+
154
+ var allowlist verifcid.Allowlist = verifcid .DefaultAllowlist
149
155
if bbs , ok := bs .(BoundedBlockService ); ok {
150
156
allowlist = bbs .Allowlist ()
151
157
}
152
- exch := bs .Exchange ()
153
- if sessEx , ok := exch .(exchange.SessionExchange ); ok {
154
- return & Session {
155
- allowlist : allowlist ,
156
- sessCtx : ctx ,
157
- ses : nil ,
158
- sessEx : sessEx ,
159
- bs : bs .Blockstore (),
160
- notifier : exch ,
161
- }
162
- }
163
- return & Session {
164
- allowlist : allowlist ,
165
- ses : exch ,
166
- sessCtx : ctx ,
167
- bs : bs .Blockstore (),
168
- notifier : exch ,
169
- }
158
+
159
+ return & Session {bs : bs , allowlist : allowlist , sesctx : ctx }
170
160
}
171
161
172
162
// AddBlock adds a particular block to the service, Putting it into the datastore.
@@ -248,75 +238,80 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error {
248
238
// GetBlock retrieves a particular block from the service,
249
239
// Getting it from the datastore using the key (hash).
250
240
func (s * blockService ) GetBlock (ctx context.Context , c cid.Cid ) (blocks.Block , error ) {
241
+ if ses := grabSessionFromContext (ctx , s ); ses != nil {
242
+ return ses .GetBlock (ctx , c )
243
+ }
244
+
251
245
ctx , span := internal .StartSpan (ctx , "blockService.GetBlock" , trace .WithAttributes (attribute .Stringer ("CID" , c )))
252
246
defer span .End ()
253
247
254
- var f func () notifiableFetcher
255
- if s .exchange != nil {
256
- f = s .getExchange
257
- }
258
-
259
- return getBlock (ctx , c , s .blockstore , s .allowlist , f )
248
+ return getBlock (ctx , c , s , s .allowlist , s .getExchangeFetcher )
260
249
}
261
250
262
- func (s * blockService ) getExchange () notifiableFetcher {
251
+ // Look at what I have to do, no interface covariance :'(
252
+ func (s * blockService ) getExchangeFetcher () exchange.Fetcher {
263
253
return s .exchange
264
254
}
265
255
266
- func getBlock (ctx context.Context , c cid.Cid , bs blockstore. Blockstore , allowlist verifcid.Allowlist , fget func () notifiableFetcher ) (blocks.Block , error ) {
256
+ func getBlock (ctx context.Context , c cid.Cid , bs BlockService , allowlist verifcid.Allowlist , fetchFactory func () exchange. Fetcher ) (blocks.Block , error ) {
267
257
err := verifcid .ValidateCid (allowlist , c ) // hash security
268
258
if err != nil {
269
259
return nil , err
270
260
}
271
261
272
- block , err := bs .Get (ctx , c )
273
- if err == nil {
262
+ blockstore := bs .Blockstore ()
263
+
264
+ block , err := blockstore .Get (ctx , c )
265
+ switch {
266
+ case err == nil :
274
267
return block , nil
268
+ case ipld .IsNotFound (err ):
269
+ break
270
+ default :
271
+ return nil , err
275
272
}
276
273
277
- if ipld .IsNotFound (err ) && fget != nil {
278
- f := fget () // Don't load the exchange until we have to
274
+ fetch := fetchFactory () // lazily create session if needed
275
+ if fetch == nil {
276
+ logger .Debug ("BlockService GetBlock: Not found" )
277
+ return nil , err
278
+ }
279
279
280
- // TODO be careful checking ErrNotFound. If the underlying
281
- // implementation changes, this will break.
282
- logger .Debug ("BlockService: Searching" )
283
- blk , err := f .GetBlock (ctx , c )
284
- if err != nil {
285
- return nil , err
286
- }
287
- // also write in the blockstore for caching, inform the exchange that the block is available
288
- err = bs .Put (ctx , blk )
289
- if err != nil {
290
- return nil , err
291
- }
292
- err = f .NotifyNewBlocks (ctx , blk )
280
+ logger .Debug ("BlockService: Searching" )
281
+ blk , err := fetch .GetBlock (ctx , c )
282
+ if err != nil {
283
+ return nil , err
284
+ }
285
+ // also write in the blockstore for caching, inform the exchange that the block is available
286
+ err = blockstore .Put (ctx , blk )
287
+ if err != nil {
288
+ return nil , err
289
+ }
290
+ if ex := bs .Exchange (); ex != nil {
291
+ err = ex .NotifyNewBlocks (ctx , blk )
293
292
if err != nil {
294
293
return nil , err
295
294
}
296
- logger .Debugf ("BlockService.BlockFetched %s" , c )
297
- return blk , nil
298
295
}
299
-
300
- logger .Debug ("BlockService GetBlock: Not found" )
301
- return nil , err
296
+ logger .Debugf ("BlockService.BlockFetched %s" , c )
297
+ return blk , nil
302
298
}
303
299
304
300
// GetBlocks gets a list of blocks asynchronously and returns through
305
301
// the returned channel.
306
302
// NB: No guarantees are made about order.
307
303
func (s * blockService ) GetBlocks (ctx context.Context , ks []cid.Cid ) <- chan blocks.Block {
304
+ if ses := grabSessionFromContext (ctx , s ); ses != nil {
305
+ return ses .GetBlocks (ctx , ks )
306
+ }
307
+
308
308
ctx , span := internal .StartSpan (ctx , "blockService.GetBlocks" )
309
309
defer span .End ()
310
310
311
- var f func () notifiableFetcher
312
- if s .exchange != nil {
313
- f = s .getExchange
314
- }
315
-
316
- return getBlocks (ctx , ks , s .blockstore , s .allowlist , f )
311
+ return getBlocks (ctx , ks , s , s .allowlist , s .getExchangeFetcher )
317
312
}
318
313
319
- func getBlocks (ctx context.Context , ks []cid.Cid , bs blockstore. Blockstore , allowlist verifcid.Allowlist , fget func () notifiableFetcher ) <- chan blocks.Block {
314
+ func getBlocks (ctx context.Context , ks []cid.Cid , blockservice BlockService , allowlist verifcid.Allowlist , fetchFactory func () exchange. Fetcher ) <- chan blocks.Block {
320
315
out := make (chan blocks.Block )
321
316
322
317
go func () {
@@ -344,6 +339,8 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, allo
344
339
ks = ks2
345
340
}
346
341
342
+ bs := blockservice .Blockstore ()
343
+
347
344
var misses []cid.Cid
348
345
for _ , c := range ks {
349
346
hit , err := bs .Get (ctx , c )
@@ -358,17 +355,18 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, allo
358
355
}
359
356
}
360
357
361
- if len (misses ) == 0 || fget == nil {
358
+ fetch := fetchFactory () // don't load exchange unless we have to
359
+ if len (misses ) == 0 || fetch == nil {
362
360
return
363
361
}
364
362
365
- f := fget () // don't load exchange unless we have to
366
- rblocks , err := f .GetBlocks (ctx , misses )
363
+ rblocks , err := fetch .GetBlocks (ctx , misses )
367
364
if err != nil {
368
365
logger .Debugf ("Error with GetBlocks: %s" , err )
369
366
return
370
367
}
371
368
369
+ ex := blockservice .Exchange ()
372
370
var cache [1 ]blocks.Block // preallocate once for all iterations
373
371
for {
374
372
var b blocks.Block
@@ -389,14 +387,16 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, allo
389
387
return
390
388
}
391
389
392
- // inform the exchange that the blocks are available
393
- cache [0 ] = b
394
- err = f .NotifyNewBlocks (ctx , cache [:]... )
395
- if err != nil {
396
- logger .Errorf ("could not tell the exchange about new blocks: %s" , err )
397
- return
390
+ if ex != nil {
391
+ // inform the exchange that the blocks are available
392
+ cache [0 ] = b
393
+ err = ex .NotifyNewBlocks (ctx , cache [:]... )
394
+ if err != nil {
395
+ logger .Errorf ("could not tell the exchange about new blocks: %s" , err )
396
+ return
397
+ }
398
+ cache [0 ] = nil // early gc
398
399
}
399
- cache [0 ] = nil // early gc
400
400
401
401
select {
402
402
case out <- b :
@@ -428,70 +428,89 @@ func (s *blockService) Close() error {
428
428
return s .exchange .Close ()
429
429
}
430
430
431
- type notifier interface {
432
- NotifyNewBlocks (context.Context , ... blocks.Block ) error
433
- }
434
-
435
431
// Session is a helper type to provide higher level access to bitswap sessions
436
432
type Session struct {
437
- allowlist verifcid.Allowlist
438
- bs blockstore.Blockstore
439
- ses exchange.Fetcher
440
- sessEx exchange.SessionExchange
441
- sessCtx context.Context
442
- notifier notifier
443
- lk sync.Mutex
433
+ createSession sync.Once
434
+ bs BlockService
435
+ ses exchange.Fetcher
436
+ sesctx context.Context
437
+ allowlist verifcid.Allowlist
444
438
}
445
439
446
- type notifiableFetcher interface {
447
- exchange.Fetcher
448
- notifier
449
- }
440
+ // grabSession is used to lazily create sessions.
441
+ func (s * Session ) grabSession () exchange.Fetcher {
442
+ s .createSession .Do (func () {
443
+ defer func () {
444
+ s .sesctx = nil // early gc
445
+ }()
450
446
451
- type notifiableFetcherWrapper struct {
452
- exchange.Fetcher
453
- notifier
454
- }
455
-
456
- func (s * Session ) getSession () notifiableFetcher {
457
- s .lk .Lock ()
458
- defer s .lk .Unlock ()
459
- if s .ses == nil {
460
- s .ses = s .sessEx .NewSession (s .sessCtx )
461
- }
462
-
463
- return notifiableFetcherWrapper {s .ses , s .notifier }
464
- }
447
+ ex := s .bs .Exchange ()
448
+ if ex == nil {
449
+ return
450
+ }
451
+ s .ses = ex // always fallback to non session fetches
465
452
466
- func (s * Session ) getExchange () notifiableFetcher {
467
- return notifiableFetcherWrapper {s .ses , s .notifier }
468
- }
453
+ sesEx , ok := ex .(exchange.SessionExchange )
454
+ if ! ok {
455
+ return
456
+ }
457
+ s .ses = sesEx .NewSession (s .sesctx )
458
+ })
469
459
470
- func (s * Session ) getFetcherFactory () func () notifiableFetcher {
471
- if s .sessEx != nil {
472
- return s .getSession
473
- }
474
- if s .ses != nil {
475
- // Our exchange isn't session compatible, let's fallback to non sessions fetches
476
- return s .getExchange
477
- }
478
- return nil
460
+ return s .ses
479
461
}
480
462
481
463
// GetBlock gets a block in the context of a request session
482
464
func (s * Session ) GetBlock (ctx context.Context , c cid.Cid ) (blocks.Block , error ) {
483
465
ctx , span := internal .StartSpan (ctx , "Session.GetBlock" , trace .WithAttributes (attribute .Stringer ("CID" , c )))
484
466
defer span .End ()
485
467
486
- return getBlock (ctx , c , s .bs , s .allowlist , s .getFetcherFactory () )
468
+ return getBlock (ctx , c , s .bs , s .allowlist , s .grabSession )
487
469
}
488
470
489
471
// GetBlocks gets blocks in the context of a request session
490
472
func (s * Session ) GetBlocks (ctx context.Context , ks []cid.Cid ) <- chan blocks.Block {
491
473
ctx , span := internal .StartSpan (ctx , "Session.GetBlocks" )
492
474
defer span .End ()
493
475
494
- return getBlocks (ctx , ks , s .bs , s .allowlist , s .getFetcherFactory () )
476
+ return getBlocks (ctx , ks , s .bs , s .allowlist , s .grabSession )
495
477
}
496
478
497
479
var _ BlockGetter = (* Session )(nil )
480
+
481
+ // ContextWithSession is a helper which creates a context with an embded session,
482
+ // future calls to [BlockGetter.GetBlock], [BlockGetter.GetBlocks] and [NewSession] with the same [BlockService]
483
+ // will be redirected to this same session instead.
484
+ // Sessions are lazily setup, this is cheap.
485
+ // It wont make a new session if one exists already in the context.
486
+ func ContextWithSession (ctx context.Context , bs BlockService ) context.Context {
487
+ if grabSessionFromContext (ctx , bs ) != nil {
488
+ return ctx
489
+ }
490
+ return EmbedSessionInContext (ctx , NewSession (ctx , bs ))
491
+ }
492
+
493
+ // EmbedSessionInContext is like [NewSessionContext] but it allows to embed an existing session.
494
+ func EmbedSessionInContext (ctx context.Context , ses * Session ) context.Context {
495
+ // use ses.bs as a key, so if multiple blockservices use embeded sessions it gets dispatched to the matching blockservice.
496
+ return context .WithValue (ctx , ses .bs , ses )
497
+ }
498
+
499
+ // grabSessionFromContext returns nil if the session was not found
500
+ // This is a private API on purposes, I dislike when consumers tradeoff compiletime typesafety with runtime typesafety,
501
+ // if this API is public it is too easy to forget to pass a [BlockService] or [Session] object around in your app.
502
+ // By having this private we allow consumers to follow the trace of where the blockservice is passed and used.
503
+ func grabSessionFromContext (ctx context.Context , bs BlockService ) * Session {
504
+ s := ctx .Value (bs )
505
+ if s == nil {
506
+ return nil
507
+ }
508
+
509
+ ss , ok := s .(* Session )
510
+ if ! ok {
511
+ // idk what to do here, that kinda sucks, giveup
512
+ return nil
513
+ }
514
+
515
+ return ss
516
+ }
0 commit comments