@@ -19,6 +19,7 @@ import {
19
19
NullBlockInput ,
20
20
getBlockInput ,
21
21
getBlockInputBlobs ,
22
+ getBlockInputDataColumns ,
22
23
} from "../../chain/blocks/types.js" ;
23
24
import { BlockInputAvailabilitySource } from "../../chain/seenCache/seenGossipBlockInput.js" ;
24
25
import { IExecutionEngine } from "../../execution/index.js" ;
@@ -150,6 +151,9 @@ export async function beaconBlocksMaybeBlobsByRoot(
150
151
151
152
// The last arg is to provide slot to which all blobs should be exausted in matching
152
153
// and here it should be infinity since all bobs should match
154
+ // TODO: should not call matchBlockWithDataColumns() because it's supposed for range sync
155
+ // in that function, peers should return all requested data columns, this function runs at gossip time
156
+ // and it should not expect that
153
157
const blockInputWithBlobs = matchBlockWithDataColumns (
154
158
network ,
155
159
peerId ,
@@ -434,12 +438,20 @@ export async function unavailableBeaconBlobsByRootPreFulu(
434
438
return getBlockInput . availableData ( config , block , BlockSource . byRoot , blockData ) ;
435
439
}
436
440
441
+
442
+ /**
443
+ * Download more columns for a BlockInput
444
+ * - unavailableBlockInput should have block, but not enough blobs (deneb) or data columns (fulu)
445
+ *
446
+ * This function may return data promise, and consumer should continue with fetching more blobs or columns from other peers
447
+ * see UnknownBlockSync.fetchUnavailableBlockInput()
448
+ */
437
449
export async function unavailableBeaconBlobsByRootPostFulu (
438
450
config : ChainForkConfig ,
439
451
network : INetwork ,
440
452
peerId : PeerIdStr ,
441
453
peerClient : string ,
442
- unavailableBlockInput : BlockInput | NullBlockInput ,
454
+ unavailableBlockInput : BlockInput ,
443
455
block : SignedBeaconBlock ,
444
456
cachedData : NullBlockInput [ "cachedData" ] ,
445
457
opts : {
@@ -451,7 +463,6 @@ export async function unavailableBeaconBlobsByRootPostFulu(
451
463
return unavailableBlockInput ;
452
464
}
453
465
454
- let availableBlockInput : BlockInput ;
455
466
if ( cachedData . fork === ForkName . deneb || cachedData . fork === ForkName . electra ) {
456
467
const { blobsCache, resolveAvailability} = cachedData ;
457
468
@@ -489,113 +500,108 @@ export async function unavailableBeaconBlobsByRootPostFulu(
489
500
const blockData = { fork : cachedData . fork , ...allBlobs , blobsSource : BlobsSource . byRoot } as BlockInputBlobs ;
490
501
resolveAvailability ( blockData ) ;
491
502
opts . metrics ?. syncUnknownBlock . resolveAvailabilitySource . inc ( { source : BlockInputAvailabilitySource . UNKNOWN_SYNC } ) ;
492
- availableBlockInput = getBlockInput . availableData ( config , block , BlockSource . byRoot , blockData ) ;
493
- } else if ( cachedData . fork === ForkName . fulu ) {
494
- const { dataColumnsCache, resolveAvailability} = cachedData as CachedDataColumns ;
503
+ return getBlockInput . availableData ( config , block , BlockSource . byRoot , blockData ) ;
504
+ }
495
505
496
- // resolve missing blobs
497
- const dataColumnIdentifiers : fulu . DataColumnIdentifier [ ] = [ ] ;
498
- const slot = block . message . slot ;
499
- const blockRoot = config . getForkTypes ( slot ) . BeaconBlock . hashTreeRoot ( block . message ) ;
506
+ // fulu fork
507
+ const { dataColumnsCache, resolveAvailability} = cachedData as CachedDataColumns ;
500
508
501
- const blobKzgCommitmentsLen = ( block . message . body as deneb . BeaconBlockBody ) . blobKzgCommitments . length ;
502
- if ( blobKzgCommitmentsLen === 0 ) {
503
- const blockData = {
504
- fork : cachedData . fork ,
505
- dataColumns : [ ] ,
506
- dataColumnsBytes : [ ] ,
507
- dataColumnsSource : DataColumnsSource . gossip ,
508
- } as BlockInputDataColumns ;
509
+ // resolve missing blobs
510
+ const dataColumnIdentifiers : fulu . DataColumnIdentifier [ ] = [ ] ;
511
+ const slot = block . message . slot ;
512
+ const blockRoot = config . getForkTypes ( slot ) . BeaconBlock . hashTreeRoot ( block . message ) ;
509
513
510
- resolveAvailability ( blockData ) ;
511
- opts . metrics ?. syncUnknownBlock . resolveAvailabilitySource . inc ( { source : BlockInputAvailabilitySource . UNKNOWN_SYNC } ) ;
512
- availableBlockInput = getBlockInput . availableData ( config , block , BlockSource . byRoot , blockData ) ;
513
- } else {
514
- const { custodyConfig} = network ;
515
- const neededColumns = custodyConfig . sampledColumns . reduce ( ( acc , elem ) => {
516
- if ( dataColumnsCache . get ( elem ) === undefined ) {
517
- acc . push ( elem ) ;
518
- }
519
- return acc ;
520
- } , [ ] as number [ ] ) ;
514
+ const blobKzgCommitmentsLen = ( block . message . body as deneb . BeaconBlockBody ) . blobKzgCommitments . length ;
515
+ if ( blobKzgCommitmentsLen === 0 ) {
516
+ const blockData = {
517
+ fork : cachedData . fork ,
518
+ dataColumns : [ ] ,
519
+ dataColumnsBytes : [ ] ,
520
+ dataColumnsSource : DataColumnsSource . gossip ,
521
+ } as BlockInputDataColumns ;
521
522
522
- const peerColumns = network . getConnectedPeerCustody ( peerId ) ;
523
+ resolveAvailability ( blockData ) ;
524
+ opts . metrics ?. syncUnknownBlock . resolveAvailabilitySource . inc ( { source : BlockInputAvailabilitySource . UNKNOWN_SYNC } ) ;
525
+ return getBlockInput . availableData ( config , block , BlockSource . byRoot , blockData ) ;
526
+ } else {
527
+ const { custodyConfig} = network ;
528
+ let neededColumns = custodyConfig . sampledColumns . reduce ( ( acc , elem ) => {
529
+ if ( dataColumnsCache . get ( elem ) === undefined ) {
530
+ acc . push ( elem ) ;
531
+ }
532
+ return acc ;
533
+ } , [ ] as number [ ] ) ;
523
534
524
- // get match
525
- const columns = peerColumns . reduce ( ( acc , elem ) => {
526
- if ( neededColumns . includes ( elem ) ) {
527
- acc . push ( elem ) ;
528
- }
529
- return acc ;
530
- } , [ ] as number [ ] ) ;
535
+ const peerColumns = network . getConnectedPeerCustody ( peerId ) ;
531
536
532
- // this peer can't help fetching columns for this block
533
- if ( unavailableBlockInput . block !== null && columns . length === 0 && neededColumns . length > 0 ) {
534
- return unavailableBlockInput ;
537
+ // get match
538
+ const columns = peerColumns . reduce ( ( acc , elem ) => {
539
+ if ( neededColumns . includes ( elem ) ) {
540
+ acc . push ( elem ) ;
535
541
}
542
+ return acc ;
543
+ } , [ ] as number [ ] ) ;
536
544
537
- for ( const columnIndex of columns ) {
538
- dataColumnIdentifiers . push ( { blockRoot, index : columnIndex } ) ;
539
- }
545
+ // this peer can't help fetching columns for this block
546
+ if ( unavailableBlockInput . block !== null && columns . length === 0 && neededColumns . length > 0 ) {
547
+ return unavailableBlockInput ;
548
+ }
540
549
541
- // console.log("unavailableBlockInput fetching", {
542
- // neededColumns: neededColumns.length,
543
- // peerColumns: peerColumns.length,
544
- // intersectingColumns: columns.length,
545
- // dataColumnIdentifiers: dataColumnIdentifiers.length,
546
- // cacheId,
547
- // dataColumnsCache: dataColumnsCache.size,
548
- // blockRoot: toHexString(blockRoot),
549
- // });
550
-
551
- let allDataColumnSidecars : fulu . DataColumnSidecar [ ] ;
552
- if ( dataColumnIdentifiers . length > 0 ) {
553
- allDataColumnSidecars = await network . sendDataColumnSidecarsByRoot ( peerId , dataColumnIdentifiers ) ;
554
- } else {
555
- allDataColumnSidecars = [ ] ;
550
+ for ( const columnIndex of columns ) {
551
+ dataColumnIdentifiers . push ( { blockRoot, index : columnIndex } ) ;
552
+ }
553
+
554
+ let allDataColumnSidecars : fulu . DataColumnSidecar [ ] ;
555
+ if ( dataColumnIdentifiers . length > 0 ) {
556
+ allDataColumnSidecars = await network . sendDataColumnSidecarsByRoot ( peerId , dataColumnIdentifiers ) ;
557
+ } else {
558
+ allDataColumnSidecars = [ ] ;
559
+ }
560
+
561
+ const logCtx = {
562
+ slot : block . message . slot ,
563
+ requestedColumns : columns . join ( "," ) ,
564
+ respondedColumns : allDataColumnSidecars . map ( ( dcs ) => dcs . index ) . join ( "," ) ,
565
+ peerClient,
566
+ } ;
567
+
568
+ // the same to matchBlockWithDataColumns() without expecting requested data columns = responded data columns
569
+ // because at gossip time peer may not have enough column to return
570
+ for ( const dataColumnSidecar of allDataColumnSidecars ) {
571
+ dataColumnsCache . set ( dataColumnSidecar . index , {
572
+ dataColumn : dataColumnSidecar ,
573
+ // TODO: req/resp should return bytes here
574
+ dataColumnBytes : null ,
575
+ } ) ;
576
+ }
577
+
578
+ // reevaluate needeColumns and resolve availability if possible
579
+ neededColumns = custodyConfig . sampledColumns . reduce ( ( acc , elem ) => {
580
+ if ( dataColumnsCache . get ( elem ) === undefined ) {
581
+ acc . push ( elem ) ;
556
582
}
583
+ return acc ;
584
+ } , [ ] as number [ ] ) ;
557
585
558
- // console.log("unavailableBlockInput fetched", {
559
- // neededColumns: neededColumns.length,
560
- // peerColumns: peerColumns.length,
561
- // intersectingColumns: columns.length,
562
- // dataColumnIdentifiers: dataColumnIdentifiers.length,
563
- // allDataColumnSidecars: allDataColumnSidecars.length,
564
- // cacheId,
565
- // dataColumnsCache: dataColumnsCache.size,
566
- // blockRoot: toHexString(blockRoot),
567
- // });
568
-
569
- [ availableBlockInput ] = matchBlockWithDataColumns (
570
- network ,
571
- peerId ,
572
- config ,
573
- custodyConfig ,
574
- columns ,
575
- [ { data : block } ] ,
576
- allDataColumnSidecars ,
577
- block . message . slot ,
578
- BlockSource . byRoot ,
579
- DataColumnsSource . byRoot ,
580
- unavailableBlockInput . block !== null
581
- ? { blocks : [ unavailableBlockInput ] , pendingDataColumns : neededColumns }
582
- : null ,
583
- peerClient ,
584
- opts . logger
586
+ if ( neededColumns . length === 0 ) {
587
+ const { dataColumns, dataColumnsBytes} = getBlockInputDataColumns (
588
+ ( cachedData as CachedDataColumns ) . dataColumnsCache ,
589
+ custodyConfig . sampledColumns
585
590
) ;
586
591
587
592
// don't forget to resolve availability as the block may be stuck in availability wait
588
- if ( availableBlockInput !== undefined && availableBlockInput . type === BlockInputType . availableData ) {
589
- const { blockData} = availableBlockInput ;
590
- if ( blockData . fork !== ForkName . fulu ) {
591
- throw Error ( `unexpected blockData fork=${ blockData . fork } returned by matchBlockWithDataColumns` ) ;
592
- }
593
- resolveAvailability ( blockData as BlockInputDataColumns ) ;
594
- }
593
+ const blockData = {
594
+ fork : config . getForkName ( block . message . slot ) ,
595
+ dataColumns,
596
+ dataColumnsBytes,
597
+ dataColumnsSource : DataColumnsSource . byRoot ,
598
+ } as BlockInputDataColumns ;
599
+ resolveAvailability ( blockData ) ;
600
+ opts . logger ?. verbose ( "unavailableBeaconBlobsByRootPostFulu: Resolved availability for block with all data columns" , logCtx ) ;
601
+ return getBlockInput . availableData ( config , block , BlockSource . byRoot , blockData ) ;
602
+ } else {
603
+ opts . logger ?. verbose ( "unavailableBeaconBlobsByRootPostFulu: Still missing data columns for block" , logCtx ) ;
604
+ return getBlockInput . dataPromise ( config , block , BlockSource . byRoot , cachedData ) ;
595
605
}
596
- } else {
597
- throw Error ( "Invalid cachedData for unavailableBeaconBlobsByRoot" ) ;
598
606
}
599
-
600
- return availableBlockInput ;
601
607
}
0 commit comments