Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: peerDAS - fix mismatch error when downloading columns at synced time #7467

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
NullBlockInput,
getBlockInput,
getBlockInputBlobs,
getBlockInputDataColumns,
} from "../../chain/blocks/types.js";
import {BlockInputAvailabilitySource} from "../../chain/seenCache/seenGossipBlockInput.js";
import {IExecutionEngine} from "../../execution/index.js";
Expand Down Expand Up @@ -150,6 +151,9 @@ export async function beaconBlocksMaybeBlobsByRoot(

// The last arg is to provide slot to which all blobs should be exausted in matching
// and here it should be infinity since all bobs should match
// TODO: should not call matchBlockWithDataColumns() because it's supposed for range sync
// in that function, peers should return all requested data columns, this function runs at gossip time
// and it should not expect that
const blockInputWithBlobs = matchBlockWithDataColumns(
network,
peerId,
Expand Down Expand Up @@ -434,12 +438,20 @@ export async function unavailableBeaconBlobsByRootPreFulu(
return getBlockInput.availableData(config, block, BlockSource.byRoot, blockData);
}


/**
* Download more columns for a BlockInput
* - unavailableBlockInput should have block, but not enough blobs (deneb) or data columns (fulu)
*
* This function may return data promise, and consumer should continue with fetching more blobs or columns from other peers
* see UnknownBlockSync.fetchUnavailableBlockInput()
*/
export async function unavailableBeaconBlobsByRootPostFulu(
config: ChainForkConfig,
network: INetwork,
peerId: PeerIdStr,
peerClient: string,
unavailableBlockInput: BlockInput | NullBlockInput,
unavailableBlockInput: BlockInput,
block: SignedBeaconBlock,
cachedData: NullBlockInput["cachedData"],
opts: {
Expand All @@ -451,7 +463,6 @@ export async function unavailableBeaconBlobsByRootPostFulu(
return unavailableBlockInput;
}

let availableBlockInput: BlockInput;
if (cachedData.fork === ForkName.deneb || cachedData.fork === ForkName.electra) {
const {blobsCache, resolveAvailability} = cachedData;

Expand Down Expand Up @@ -489,113 +500,108 @@ export async function unavailableBeaconBlobsByRootPostFulu(
const blockData = {fork: cachedData.fork, ...allBlobs, blobsSource: BlobsSource.byRoot} as BlockInputBlobs;
resolveAvailability(blockData);
opts.metrics?.syncUnknownBlock.resolveAvailabilitySource.inc({source: BlockInputAvailabilitySource.UNKNOWN_SYNC});
availableBlockInput = getBlockInput.availableData(config, block, BlockSource.byRoot, blockData);
} else if (cachedData.fork === ForkName.fulu) {
const {dataColumnsCache, resolveAvailability} = cachedData as CachedDataColumns;
return getBlockInput.availableData(config, block, BlockSource.byRoot, blockData);
}

// resolve missing blobs
const dataColumnIdentifiers: fulu.DataColumnIdentifier[] = [];
const slot = block.message.slot;
const blockRoot = config.getForkTypes(slot).BeaconBlock.hashTreeRoot(block.message);
// fulu fork
const {dataColumnsCache, resolveAvailability} = cachedData as CachedDataColumns;

const blobKzgCommitmentsLen = (block.message.body as deneb.BeaconBlockBody).blobKzgCommitments.length;
if (blobKzgCommitmentsLen === 0) {
const blockData = {
fork: cachedData.fork,
dataColumns: [],
dataColumnsBytes: [],
dataColumnsSource: DataColumnsSource.gossip,
} as BlockInputDataColumns;
// resolve missing blobs
const dataColumnIdentifiers: fulu.DataColumnIdentifier[] = [];
const slot = block.message.slot;
const blockRoot = config.getForkTypes(slot).BeaconBlock.hashTreeRoot(block.message);

resolveAvailability(blockData);
opts.metrics?.syncUnknownBlock.resolveAvailabilitySource.inc({source: BlockInputAvailabilitySource.UNKNOWN_SYNC});
availableBlockInput = getBlockInput.availableData(config, block, BlockSource.byRoot, blockData);
} else {
const {custodyConfig} = network;
const neededColumns = custodyConfig.sampledColumns.reduce((acc, elem) => {
if (dataColumnsCache.get(elem) === undefined) {
acc.push(elem);
}
return acc;
}, [] as number[]);
const blobKzgCommitmentsLen = (block.message.body as deneb.BeaconBlockBody).blobKzgCommitments.length;
if (blobKzgCommitmentsLen === 0) {
const blockData = {
fork: cachedData.fork,
dataColumns: [],
dataColumnsBytes: [],
dataColumnsSource: DataColumnsSource.gossip,
} as BlockInputDataColumns;

const peerColumns = network.getConnectedPeerCustody(peerId);
resolveAvailability(blockData);
opts.metrics?.syncUnknownBlock.resolveAvailabilitySource.inc({source: BlockInputAvailabilitySource.UNKNOWN_SYNC});
return getBlockInput.availableData(config, block, BlockSource.byRoot, blockData);
} else {
const {custodyConfig} = network;
let neededColumns = custodyConfig.sampledColumns.reduce((acc, elem) => {
if (dataColumnsCache.get(elem) === undefined) {
acc.push(elem);
}
return acc;
}, [] as number[]);

// get match
const columns = peerColumns.reduce((acc, elem) => {
if (neededColumns.includes(elem)) {
acc.push(elem);
}
return acc;
}, [] as number[]);
const peerColumns = network.getConnectedPeerCustody(peerId);

// this peer can't help fetching columns for this block
if (unavailableBlockInput.block !== null && columns.length === 0 && neededColumns.length > 0) {
return unavailableBlockInput;
// get match
const columns = peerColumns.reduce((acc, elem) => {
if (neededColumns.includes(elem)) {
acc.push(elem);
}
return acc;
}, [] as number[]);

for (const columnIndex of columns) {
dataColumnIdentifiers.push({blockRoot, index: columnIndex});
}
// this peer can't help fetching columns for this block
if (unavailableBlockInput.block !== null && columns.length === 0 && neededColumns.length > 0) {
return unavailableBlockInput;
}

// console.log("unavailableBlockInput fetching", {
// neededColumns: neededColumns.length,
// peerColumns: peerColumns.length,
// intersectingColumns: columns.length,
// dataColumnIdentifiers: dataColumnIdentifiers.length,
// cacheId,
// dataColumnsCache: dataColumnsCache.size,
// blockRoot: toHexString(blockRoot),
// });

let allDataColumnSidecars: fulu.DataColumnSidecar[];
if (dataColumnIdentifiers.length > 0) {
allDataColumnSidecars = await network.sendDataColumnSidecarsByRoot(peerId, dataColumnIdentifiers);
} else {
allDataColumnSidecars = [];
for (const columnIndex of columns) {
dataColumnIdentifiers.push({blockRoot, index: columnIndex});
}

let allDataColumnSidecars: fulu.DataColumnSidecar[];
if (dataColumnIdentifiers.length > 0) {
allDataColumnSidecars = await network.sendDataColumnSidecarsByRoot(peerId, dataColumnIdentifiers);
} else {
allDataColumnSidecars = [];
}

const logCtx = {
slot: block.message.slot,
requestedColumns: columns.join(","),
respondedColumns: allDataColumnSidecars.map((dcs) => dcs.index).join(","),
peerClient,
};

// the same to matchBlockWithDataColumns() without expecting requested data columns = responded data columns
// because at gossip time peer may not have enough column to return
for (const dataColumnSidecar of allDataColumnSidecars) {
dataColumnsCache.set(dataColumnSidecar.index, {
dataColumn: dataColumnSidecar,
// TODO: req/resp should return bytes here
dataColumnBytes: null,
});
}

// reevaluate needeColumns and resolve availability if possible
neededColumns = custodyConfig.sampledColumns.reduce((acc, elem) => {
if (dataColumnsCache.get(elem) === undefined) {
acc.push(elem);
}
return acc;
}, [] as number[]);

// console.log("unavailableBlockInput fetched", {
// neededColumns: neededColumns.length,
// peerColumns: peerColumns.length,
// intersectingColumns: columns.length,
// dataColumnIdentifiers: dataColumnIdentifiers.length,
// allDataColumnSidecars: allDataColumnSidecars.length,
// cacheId,
// dataColumnsCache: dataColumnsCache.size,
// blockRoot: toHexString(blockRoot),
// });

[availableBlockInput] = matchBlockWithDataColumns(
network,
peerId,
config,
custodyConfig,
columns,
[{data: block}],
allDataColumnSidecars,
block.message.slot,
BlockSource.byRoot,
DataColumnsSource.byRoot,
unavailableBlockInput.block !== null
? {blocks: [unavailableBlockInput], pendingDataColumns: neededColumns}
: null,
peerClient,
opts.logger
if (neededColumns.length === 0) {
const {dataColumns, dataColumnsBytes} = getBlockInputDataColumns(
(cachedData as CachedDataColumns).dataColumnsCache,
custodyConfig.sampledColumns
);

// don't forget to resolve availability as the block may be stuck in availability wait
if (availableBlockInput !== undefined && availableBlockInput.type === BlockInputType.availableData) {
const {blockData} = availableBlockInput;
if (blockData.fork !== ForkName.fulu) {
throw Error(`unexpected blockData fork=${blockData.fork} returned by matchBlockWithDataColumns`);
}
resolveAvailability(blockData as BlockInputDataColumns);
}
const blockData = {
fork: config.getForkName(block.message.slot),
dataColumns,
dataColumnsBytes,
dataColumnsSource: DataColumnsSource.byRoot,
} as BlockInputDataColumns;
resolveAvailability(blockData);
opts.logger?.verbose("unavailableBeaconBlobsByRootPostFulu: Resolved availability for block with all data columns", logCtx);
return getBlockInput.availableData(config, block, BlockSource.byRoot, blockData);
} else {
opts.logger?.verbose("unavailableBeaconBlobsByRootPostFulu: Still missing data columns for block", logCtx);
return getBlockInput.dataPromise(config, block, BlockSource.byRoot, cachedData);
}
} else {
throw Error("Invalid cachedData for unavailableBeaconBlobsByRoot");
}

return availableBlockInput;
}
Loading