Skip to content

Commit 6c590b8

Browse files
committed
blockservice: add NewSessionContext and EmbedSessionInContext
This also include cleanup for session code.
1 parent 76d9292 commit 6c590b8

File tree

3 files changed

+195
-109
lines changed

3 files changed

+195
-109
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ The following emojis are used to highlight certain changes:
1616

1717
### Added
1818

19+
- `blockservice` now have `ContextWithSession` and `EmbedSessionInContext` functions, which allows to embed a session in a context, future calls to `BlockGetter.GetBlock`, `BlockGetter.GetBlocks` and `NewSession` will then use the session in the context.
20+
1921
### Changed
2022

2123
### Removed

blockservice/blockservice.go

+128-109
Original file line numberDiff line numberDiff line change
@@ -144,29 +144,19 @@ func (s *blockService) Allowlist() verifcid.Allowlist {
144144
// If the current exchange is a SessionExchange, a new exchange
145145
// session will be created. Otherwise, the current exchange will be used
146146
// directly.
147+
// Sessions are lazily setup, this is cheap.
147148
func NewSession(ctx context.Context, bs BlockService) *Session {
148-
allowlist := verifcid.Allowlist(verifcid.DefaultAllowlist)
149+
ses := grabSessionFromContext(ctx, bs)
150+
if ses != nil {
151+
return ses
152+
}
153+
154+
var allowlist verifcid.Allowlist = verifcid.DefaultAllowlist
149155
if bbs, ok := bs.(BoundedBlockService); ok {
150156
allowlist = bbs.Allowlist()
151157
}
152-
exch := bs.Exchange()
153-
if sessEx, ok := exch.(exchange.SessionExchange); ok {
154-
return &Session{
155-
allowlist: allowlist,
156-
sessCtx: ctx,
157-
ses: nil,
158-
sessEx: sessEx,
159-
bs: bs.Blockstore(),
160-
notifier: exch,
161-
}
162-
}
163-
return &Session{
164-
allowlist: allowlist,
165-
ses: exch,
166-
sessCtx: ctx,
167-
bs: bs.Blockstore(),
168-
notifier: exch,
169-
}
158+
159+
return &Session{bs: bs, allowlist: allowlist, sesctx: ctx}
170160
}
171161

172162
// AddBlock adds a particular block to the service, Putting it into the datastore.
@@ -248,75 +238,80 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error {
248238
// GetBlock retrieves a particular block from the service,
249239
// Getting it from the datastore using the key (hash).
250240
func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
241+
if ses := grabSessionFromContext(ctx, s); ses != nil {
242+
return ses.GetBlock(ctx, c)
243+
}
244+
251245
ctx, span := internal.StartSpan(ctx, "blockService.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
252246
defer span.End()
253247

254-
var f func() notifiableFetcher
255-
if s.exchange != nil {
256-
f = s.getExchange
257-
}
258-
259-
return getBlock(ctx, c, s.blockstore, s.allowlist, f)
248+
return getBlock(ctx, c, s, s.allowlist, s.getExchangeFetcher)
260249
}
261250

262-
func (s *blockService) getExchange() notifiableFetcher {
251+
// Look at what I have to do, no interface covariance :'(
252+
func (s *blockService) getExchangeFetcher() exchange.Fetcher {
263253
return s.exchange
264254
}
265255

266-
func getBlock(ctx context.Context, c cid.Cid, bs blockstore.Blockstore, allowlist verifcid.Allowlist, fget func() notifiableFetcher) (blocks.Block, error) {
256+
func getBlock(ctx context.Context, c cid.Cid, bs BlockService, allowlist verifcid.Allowlist, fetchFactory func() exchange.Fetcher) (blocks.Block, error) {
267257
err := verifcid.ValidateCid(allowlist, c) // hash security
268258
if err != nil {
269259
return nil, err
270260
}
271261

272-
block, err := bs.Get(ctx, c)
273-
if err == nil {
262+
blockstore := bs.Blockstore()
263+
264+
block, err := blockstore.Get(ctx, c)
265+
switch {
266+
case err == nil:
274267
return block, nil
268+
case ipld.IsNotFound(err):
269+
break
270+
default:
271+
return nil, err
275272
}
276273

277-
if ipld.IsNotFound(err) && fget != nil {
278-
f := fget() // Don't load the exchange until we have to
274+
fetch := fetchFactory() // lazily create session if needed
275+
if fetch == nil {
276+
logger.Debug("BlockService GetBlock: Not found")
277+
return nil, err
278+
}
279279

280-
// TODO be careful checking ErrNotFound. If the underlying
281-
// implementation changes, this will break.
282-
logger.Debug("BlockService: Searching")
283-
blk, err := f.GetBlock(ctx, c)
284-
if err != nil {
285-
return nil, err
286-
}
287-
// also write in the blockstore for caching, inform the exchange that the block is available
288-
err = bs.Put(ctx, blk)
289-
if err != nil {
290-
return nil, err
291-
}
292-
err = f.NotifyNewBlocks(ctx, blk)
280+
logger.Debug("BlockService: Searching")
281+
blk, err := fetch.GetBlock(ctx, c)
282+
if err != nil {
283+
return nil, err
284+
}
285+
// also write in the blockstore for caching, inform the exchange that the block is available
286+
err = blockstore.Put(ctx, blk)
287+
if err != nil {
288+
return nil, err
289+
}
290+
if ex := bs.Exchange(); ex != nil {
291+
err = ex.NotifyNewBlocks(ctx, blk)
293292
if err != nil {
294293
return nil, err
295294
}
296-
logger.Debugf("BlockService.BlockFetched %s", c)
297-
return blk, nil
298295
}
299-
300-
logger.Debug("BlockService GetBlock: Not found")
301-
return nil, err
296+
logger.Debugf("BlockService.BlockFetched %s", c)
297+
return blk, nil
302298
}
303299

304300
// GetBlocks gets a list of blocks asynchronously and returns through
305301
// the returned channel.
306302
// NB: No guarantees are made about order.
307303
func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
304+
if ses := grabSessionFromContext(ctx, s); ses != nil {
305+
return ses.GetBlocks(ctx, ks)
306+
}
307+
308308
ctx, span := internal.StartSpan(ctx, "blockService.GetBlocks")
309309
defer span.End()
310310

311-
var f func() notifiableFetcher
312-
if s.exchange != nil {
313-
f = s.getExchange
314-
}
315-
316-
return getBlocks(ctx, ks, s.blockstore, s.allowlist, f)
311+
return getBlocks(ctx, ks, s, s.allowlist, s.getExchangeFetcher)
317312
}
318313

319-
func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, allowlist verifcid.Allowlist, fget func() notifiableFetcher) <-chan blocks.Block {
314+
func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, allowlist verifcid.Allowlist, fetchFactory func() exchange.Fetcher) <-chan blocks.Block {
320315
out := make(chan blocks.Block)
321316

322317
go func() {
@@ -344,6 +339,8 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, allo
344339
ks = ks2
345340
}
346341

342+
bs := blockservice.Blockstore()
343+
347344
var misses []cid.Cid
348345
for _, c := range ks {
349346
hit, err := bs.Get(ctx, c)
@@ -358,17 +355,18 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, allo
358355
}
359356
}
360357

361-
if len(misses) == 0 || fget == nil {
358+
fetch := fetchFactory() // don't load exchange unless we have to
359+
if len(misses) == 0 || fetch == nil {
362360
return
363361
}
364362

365-
f := fget() // don't load exchange unless we have to
366-
rblocks, err := f.GetBlocks(ctx, misses)
363+
rblocks, err := fetch.GetBlocks(ctx, misses)
367364
if err != nil {
368365
logger.Debugf("Error with GetBlocks: %s", err)
369366
return
370367
}
371368

369+
ex := blockservice.Exchange()
372370
var cache [1]blocks.Block // preallocate once for all iterations
373371
for {
374372
var b blocks.Block
@@ -389,14 +387,16 @@ func getBlocks(ctx context.Context, ks []cid.Cid, bs blockstore.Blockstore, allo
389387
return
390388
}
391389

392-
// inform the exchange that the blocks are available
393-
cache[0] = b
394-
err = f.NotifyNewBlocks(ctx, cache[:]...)
395-
if err != nil {
396-
logger.Errorf("could not tell the exchange about new blocks: %s", err)
397-
return
390+
if ex != nil {
391+
// inform the exchange that the blocks are available
392+
cache[0] = b
393+
err = ex.NotifyNewBlocks(ctx, cache[:]...)
394+
if err != nil {
395+
logger.Errorf("could not tell the exchange about new blocks: %s", err)
396+
return
397+
}
398+
cache[0] = nil // early gc
398399
}
399-
cache[0] = nil // early gc
400400

401401
select {
402402
case out <- b:
@@ -428,70 +428,89 @@ func (s *blockService) Close() error {
428428
return s.exchange.Close()
429429
}
430430

431-
type notifier interface {
432-
NotifyNewBlocks(context.Context, ...blocks.Block) error
433-
}
434-
435431
// Session is a helper type to provide higher level access to bitswap sessions
436432
type Session struct {
437-
allowlist verifcid.Allowlist
438-
bs blockstore.Blockstore
439-
ses exchange.Fetcher
440-
sessEx exchange.SessionExchange
441-
sessCtx context.Context
442-
notifier notifier
443-
lk sync.Mutex
433+
createSession sync.Once
434+
bs BlockService
435+
ses exchange.Fetcher
436+
sesctx context.Context
437+
allowlist verifcid.Allowlist
444438
}
445439

446-
type notifiableFetcher interface {
447-
exchange.Fetcher
448-
notifier
449-
}
440+
// grabSession is used to lazily create sessions.
441+
func (s *Session) grabSession() exchange.Fetcher {
442+
s.createSession.Do(func() {
443+
defer func() {
444+
s.sesctx = nil // early gc
445+
}()
450446

451-
type notifiableFetcherWrapper struct {
452-
exchange.Fetcher
453-
notifier
454-
}
455-
456-
func (s *Session) getSession() notifiableFetcher {
457-
s.lk.Lock()
458-
defer s.lk.Unlock()
459-
if s.ses == nil {
460-
s.ses = s.sessEx.NewSession(s.sessCtx)
461-
}
462-
463-
return notifiableFetcherWrapper{s.ses, s.notifier}
464-
}
447+
ex := s.bs.Exchange()
448+
if ex == nil {
449+
return
450+
}
451+
s.ses = ex // always fallback to non session fetches
465452

466-
func (s *Session) getExchange() notifiableFetcher {
467-
return notifiableFetcherWrapper{s.ses, s.notifier}
468-
}
453+
sesEx, ok := ex.(exchange.SessionExchange)
454+
if !ok {
455+
return
456+
}
457+
s.ses = sesEx.NewSession(s.sesctx)
458+
})
469459

470-
func (s *Session) getFetcherFactory() func() notifiableFetcher {
471-
if s.sessEx != nil {
472-
return s.getSession
473-
}
474-
if s.ses != nil {
475-
// Our exchange isn't session compatible, let's fallback to non sessions fetches
476-
return s.getExchange
477-
}
478-
return nil
460+
return s.ses
479461
}
480462

481463
// GetBlock gets a block in the context of a request session
482464
func (s *Session) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
483465
ctx, span := internal.StartSpan(ctx, "Session.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
484466
defer span.End()
485467

486-
return getBlock(ctx, c, s.bs, s.allowlist, s.getFetcherFactory())
468+
return getBlock(ctx, c, s.bs, s.allowlist, s.grabSession)
487469
}
488470

489471
// GetBlocks gets blocks in the context of a request session
490472
func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
491473
ctx, span := internal.StartSpan(ctx, "Session.GetBlocks")
492474
defer span.End()
493475

494-
return getBlocks(ctx, ks, s.bs, s.allowlist, s.getFetcherFactory())
476+
return getBlocks(ctx, ks, s.bs, s.allowlist, s.grabSession)
495477
}
496478

497479
var _ BlockGetter = (*Session)(nil)
480+
481+
// ContextWithSession is a helper which creates a context with an embded session,
482+
// future calls to [BlockGetter.GetBlock], [BlockGetter.GetBlocks] and [NewSession] with the same [BlockService]
483+
// will be redirected to this same session instead.
484+
// Sessions are lazily setup, this is cheap.
485+
// It wont make a new session if one exists already in the context.
486+
func ContextWithSession(ctx context.Context, bs BlockService) context.Context {
487+
if grabSessionFromContext(ctx, bs) != nil {
488+
return ctx
489+
}
490+
return EmbedSessionInContext(ctx, NewSession(ctx, bs))
491+
}
492+
493+
// EmbedSessionInContext is like [NewSessionContext] but it allows to embed an existing session.
494+
func EmbedSessionInContext(ctx context.Context, ses *Session) context.Context {
495+
// use ses.bs as a key, so if multiple blockservices use embeded sessions it gets dispatched to the matching blockservice.
496+
return context.WithValue(ctx, ses.bs, ses)
497+
}
498+
499+
// grabSessionFromContext returns nil if the session was not found
500+
// This is a private API on purposes, I dislike when consumers tradeoff compiletime typesafety with runtime typesafety,
501+
// if this API is public it is too easy to forget to pass a [BlockService] or [Session] object around in your app.
502+
// By having this private we allow consumers to follow the trace of where the blockservice is passed and used.
503+
func grabSessionFromContext(ctx context.Context, bs BlockService) *Session {
504+
s := ctx.Value(bs)
505+
if s == nil {
506+
return nil
507+
}
508+
509+
ss, ok := s.(*Session)
510+
if !ok {
511+
// idk what to do here, that kinda sucks, giveup
512+
return nil
513+
}
514+
515+
return ss
516+
}

0 commit comments

Comments
 (0)