Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refactor block input and data pipelines prior to block processing #7474

Draft
wants to merge 31 commits into
base: peerDAS
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
4b42c45
wip: rough out blockInput
matthewkeil Feb 12, 2025
182ff90
feat: rough out SeenBlockInputCache
matthewkeil Feb 13, 2025
7c5a30b
wip: move more logic into BlockInput
matthewkeil Feb 14, 2025
7d4e079
feat: rough out BlockInputSync
matthewkeil Feb 15, 2025
ccdaef5
feat: flesh out fetch of blobs during sync
matthewkeil Feb 16, 2025
5f4b84c
feat: rough out metrics for BlockInputSync
matthewkeil Feb 16, 2025
8802ba2
feat: pass custodyConfig to chain an network
matthewkeil Feb 17, 2025
f630a64
wip: cleaning up for temp review with wemeetagain
matthewkeil Feb 17, 2025
5e4e606
feat: rough out heuristic for fetching columns
matthewkeil Feb 18, 2025
dc3c9e3
feat: update publishBlockV2 to use BlockInput class
matthewkeil Feb 24, 2025
ac8e516
fix: move unknownBlockParent to new unknownParent event
matthewkeil Feb 24, 2025
8fcc0ce
feat: update verifyBlocksSanityChecks to use BlockInput class
matthewkeil Feb 24, 2025
cda5141
feat: update verifyBlocksStateTransitionOnly to use BlockInput class
matthewkeil Feb 24, 2025
dd54178
feat: update Chain.processBlock and Chain.processChainSegment to use …
matthewkeil Feb 24, 2025
d4a6738
feat: update writeBlockInputToDb and removeEagerlyPersistedBlockInput…
matthewkeil Feb 24, 2025
a9080a4
feat: update gossipHandlers to use BlockInput class
matthewkeil Feb 28, 2025
32bfc7e
feat: update searchUnknownSlotRoot to pass BlockInputSourceType
matthewkeil Feb 28, 2025
7096752
feat: clean up abstract BlockInput and BlockInputPreDeneb
matthewkeil Feb 28, 2025
f24a452
feat: clean up BlockInputBlobs, BlockInputColumns and BlockInputCache
matthewkeil Mar 1, 2025
ac46a38
refactor: remove TODO
matthewkeil Mar 1, 2025
783477a
refactor: update verifyBlocksExecutionPayload to use BlockInput class
matthewkeil Mar 2, 2025
1daca03
refactor: update verifyBlocksSanityChecks to use BlockInput class
matthewkeil Mar 2, 2025
bd5d005
refactor: update verifyBlocksStateTransitionOnly to use BlockInput class
matthewkeil Mar 2, 2025
06d56ec
refactor: update writeBlockInputToDb to use BlockInput class
matthewkeil Mar 2, 2025
6817a7e
refactor: update verifyBlocksDataAvailability to use BlockInput class
matthewkeil Mar 3, 2025
9bcb7aa
refactor: update verifyBlocksInEpoch to use BlockInput class
matthewkeil Mar 3, 2025
1da6521
refactor: update processBlocks to use BlockInput class
matthewkeil Mar 3, 2025
3ba14cf
refactor: update importBlock to use BlockInput class
matthewkeil Mar 3, 2025
e229365
wip: BlockInput and BlockInputCache for interim review
matthewkeil Mar 3, 2025
a22fd09
feat: combine DataAvailabilityStatus from fork-choice and state-trans…
matthewkeil Mar 4, 2025
32554a0
wip: update RangeSync to use BlockInput
matthewkeil Mar 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 104 additions & 81 deletions packages/beacon-node/src/api/impl/beacon/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ import {ApiError, ApplicationMethods} from "@lodestar/api/server";
import {
ForkName,
ForkPostBellatrix,
NUMBER_OF_COLUMNS,
SLOTS_PER_HISTORICAL_ROOT,
isForkBlobs,
isForkPostBellatrix,
isForkPostElectra,
isForkPostFulu,
Expand Down Expand Up @@ -36,6 +38,15 @@ import {
ImportBlockOpts,
getBlockInput,
} from "../../../../chain/blocks/types.js";
import {
BlockInput as BlockInputNew,
BlockInputBlobs as BlockInputBlobsNew,
BlockInputColumns as BlockInputColumnsNew,
BlockInputType,
BlockInputSourceType,
isBlockInputBlobs,
isBlockInputColumns,
} from "../../../../chain/blocks/utils/blockInput.js";
import {verifyBlocksInEpoch} from "../../../../chain/blocks/verifyBlock.js";
import {BeaconChain} from "../../../../chain/chain.js";
import {BlockError, BlockErrorCode, BlockGossipError} from "../../../../chain/errors/index.js";
Expand Down Expand Up @@ -77,66 +88,64 @@ export function getBeaconBlockApi({
opts: PublishBlockOpts = {}
) => {
const seenTimestampSec = Date.now() / 1000;
let blockForImport: BlockInput,
signedBlock: SignedBeaconBlock,
blobSidecars: deneb.BlobSidecars,
dataColumnSidecars: fulu.DataColumnSidecars;

if (isSignedBlockContents(signedBlockOrContents)) {
({signedBlock} = signedBlockOrContents);
const fork = config.getForkName(signedBlock.message.slot);
let blockData: BlockInputAvailableData;
if (isForkPostFulu(fork)) {
dataColumnSidecars = computeDataColumnSidecars(config, signedBlock, signedBlockOrContents);
blockData = {
fork,
dataColumnsLen: dataColumnSidecars.length,
// dataColumnsIndex is a 1 based index of ith column present in dataColumns[custodyColumns[i-1]]
dataColumnsIndex: new Uint8Array(Array.from({length: dataColumnSidecars.length}, (_, j) => 1 + j)),
dataColumns: dataColumnSidecars,
dataColumnsBytes: dataColumnSidecars.map(() => null),
dataColumnsSource: DataColumnsSource.api,
} as BlockInputDataColumns;
blobSidecars = [];
} else if (fork === ForkName.deneb || fork === ForkName.electra) {
blobSidecars = computeBlobSidecars(config, signedBlock, signedBlockOrContents);
blockData = {
fork,
blobs: blobSidecars,
blobsSource: BlobsSource.api,
} as BlockInputBlobs;
dataColumnSidecars = [];
} else {
throw Error(`Invalid data fork=${fork} for publish`);
}

blockForImport = getBlockInput.availableData(config, signedBlock, BlockSource.api, blockData);
let blockInput: BlockInputNew;
if (!isSignedBlockContents(signedBlockOrContents)) {
const blockRoot = this.config
.getForkTypes(signedBlockOrContents.message.slot)
.SignedBeaconBlock.hashTreeRoot(signedBlockOrContents.message);
blockInput = chain.blockInputCache.getBlockInputByBlock({blockRoot, block: signedBlockOrContents});
} else {
signedBlock = signedBlockOrContents;
blobSidecars = [];
dataColumnSidecars = [];
blockForImport = getBlockInput.preData(config, signedBlock, BlockSource.api);
const blockRoot = this.config
.getForkTypes(signedBlockOrContents.signedBlock.message.slot)
.SignedBeaconBlock.hashTreeRoot(signedBlockOrContents.signedBlock.message);
blockInput = chain.blockInputCache.getBlockInputByBlock({blockRoot, block: signedBlockOrContents.signedBlock});
switch (blockInput.type) {
case BlockInputType.PreDeneb:
throw new Error("SignedBlockContents were sent to publishBlockV2 but BlockInput is PreDeneb");
case BlockInputType.Blobs:
// TODO (@matthewkeil) Look at this function signature to see if we can simplify the second param of
// computeBlobSidecars to SignedBlockContents and get rid of the third
for (const blobSidecar of computeBlobSidecars(
config,
signedBlockOrContents.signedBlock,
signedBlockOrContents
)) {
(blockInput as BlockInputBlobsNew).addBlob(blobSidecar, BlockInputSourceType.api);
}
break;
case BlockInputType.Columns:
// TODO (@matthewkeil) Look at this function signature to see if we can simplify the second param of
// computeDataColumnSidecars to SignedBlockContents and get rid of the third
for (const columnSidecar of computeDataColumnSidecars(
config,
signedBlockOrContents.signedBlock,
signedBlockOrContents
)) {
(blockInput as BlockInputColumnsNew).addColumnSidecar(columnSidecar, BlockInputSourceType.api);
}
break;
}
}

// check what validations have been requested before broadcasting and publishing the block
// TODO: add validation time to metrics
broadcastValidation = broadcastValidation ?? routes.beacon.BroadcastValidation.gossip;
// if block is locally produced, full or blinded, it already is 'consensus' validated as it went through
// state transition to produce the stateRoot
const slot = signedBlock.message.slot;
const fork = config.getForkName(slot);
const blockRoot = toRootHex(chain.config.getForkTypes(slot).BeaconBlock.hashTreeRoot(signedBlock.message));
const slot = blockInput.getSlot();
const forkName = blockInput.getForkName();
const signedBlock = blockInput.getBlock();
const blockLocallyProduced =
chain.producedBlockRoot.has(blockInput.rootHex) || chain.producedBlindedBlockRoot.has(blockInput.rootHex);
// bodyRoot should be the same to produced block
const bodyRoot = toRootHex(chain.config.getForkTypes(slot).BeaconBlockBody.hashTreeRoot(signedBlock.message.body));
const blockLocallyProduced =
chain.producedBlockRoot.has(blockRoot) || chain.producedBlindedBlockRoot.has(blockRoot);
const valLogMeta = {slot, blockRoot, bodyRoot, broadcastValidation, blockLocallyProduced};

const valLogMeta = {slot, blockRoot: blockInput.rootHex, bodyRoot, broadcastValidation, blockLocallyProduced};
switch (broadcastValidation) {
case routes.beacon.BroadcastValidation.none: {
chain.logger.debug("Skipping broadcast validation", valLogMeta);
break;
}

case routes.beacon.BroadcastValidation.gossip: {
if (!blockLocallyProduced) {
try {
await validateGossipBlock(config, chain, signedBlock, fork);
await validateGossipBlock(config, chain, signedBlock, forkName);
} catch (error) {
if (error instanceof BlockGossipError && error.type.code === BlockErrorCode.ALREADY_KNOWN) {
chain.logger.debug("Ignoring known block during publishing", valLogMeta);
Expand Down Expand Up @@ -164,9 +173,12 @@ export function getBeaconBlockApi({
if (!blockLocallyProduced) {
const parentBlock = chain.forkChoice.getBlock(signedBlock.message.parentRoot);
if (parentBlock === null) {
network.events.emit(NetworkEvent.unknownBlockParent, {
blockInput: blockForImport,
peer: IDENTITY_PEER_ID,
// TODO (@matthewkeil) why do we try to sync the unknown parent? How did the block get built and published
// to the API if its parent is unknown? Seems like this is an invalid case or if its valid should
// the block be stored and published once its ancestry is known?
network.events.emit(NetworkEvent.unknownParent, {
blockInput: blockInput,
source: BlockInputSourceType.api,
});
chain.persistInvalidSszValue(
chain.config.getForkTypes(slot).SignedBeaconBlock,
Expand All @@ -180,7 +192,7 @@ export function getBeaconBlockApi({
}

try {
await verifyBlocksInEpoch.call(chain as BeaconChain, parentBlock, [blockForImport], {
await verifyBlocksInEpoch.call(chain as BeaconChain, parentBlock, [blockInput], {
...opts,
verifyOnly: true,
skipVerifyBlockSignatures: true,
Expand Down Expand Up @@ -210,11 +222,6 @@ export function getBeaconBlockApi({
break;
}

case routes.beacon.BroadcastValidation.none: {
chain.logger.debug("Skipping broadcast validation", valLogMeta);
break;
}

default: {
// error or log warning we do not support this validation
const message = `Broadcast validation of ${broadcastValidation} type not implemented yet`;
Expand All @@ -227,44 +234,60 @@ export function getBeaconBlockApi({

// Simple implementation of a pending block queue. Keeping the block here recycles the API logic, and keeps the
// REST request promise without any extra infrastructure.
const msToBlockSlot =
computeTimeAtSlot(config, blockForImport.block.message.slot, chain.genesisTime) * 1000 - Date.now();
const msToBlockSlot = computeTimeAtSlot(config, slot, chain.genesisTime) * 1000 - Date.now();
if (msToBlockSlot <= MAX_API_CLOCK_DISPARITY_MS && msToBlockSlot > 0) {
// If block is a bit early, hold it in a promise. Equivalent to a pending queue.
await sleep(msToBlockSlot);
}

chain.emitter.emit(routes.events.EventType.blockGossip, {slot, block: blockRoot});
chain.emitter.emit(routes.events.EventType.blockGossip, {slot, block: blockInput.rootHex});

// TODO: Validate block
metrics?.registerBeaconBlock(OpSource.api, seenTimestampSec, blockForImport.block.message);
metrics?.registerBeaconBlock(OpSource.api, seenTimestampSec, signedBlock.message);
chain.logger.info("Publishing block", valLogMeta);
const publishPromises = [
// Send the block, regardless of whether or not it is valid. The API
// specification is very clear that this is the desired behaviour.
//
// i) Publish blobs and block before importing so that network can see them asap
// ii) publish blobs first because
// a) by the times nodes see block, they might decide to pull blobs
// b) they might require more hops to reach recipients in peerDAS kind of setup where
// blobs might need to hop between nodes because of partial subnet subscription
() => network.publishBeaconBlock(signedBlock) as Promise<unknown>,
...dataColumnSidecars.map((dataColumnSidecar) => () => network.publishDataColumnSidecar(dataColumnSidecar)),
...blobSidecars.map((blobSidecar) => () => network.publishBlobSidecar(blobSidecar)),
// Send the block, regardless of whether or not it is valid. The API
// specification is very clear that this is the desired behavior.
//
// i) Publish blobs and block before importing so that network can see them asap
// ii) publish blobs first because
// a) by the times nodes see block, they might decide to pull blobs
// b) they might require more hops to reach recipients in peerDAS kind of setup where
// blobs might need to hop between nodes because of partial subnet subscription
const publishPromises: Array<() => Promise<unknown>> = [];
if (isBlockInputBlobs(blockInput)) {
publishPromises.push(...blockInput.getBlobs().map((blobSidecar) => network.publishBlobSidecar(blobSidecar)));
} else if (isBlockInputColumns(blockInput)) {
const dataColumnSidecars = blockInput.getAllColumns();
// TODO (@matthewkeil) not sure if this check is necessary because they should have all been added above.... hmmm
// if (dataColumnSidecars !== NUMBER_OF_COLUMNS) {
// chain.logger.warn(
// `Attempting to publish ${NUMBER_OF_COLUMNS} columns but ${NUMBER_OF_COLUMNS - dataColumnSidecars.length} are missing`
// );
// }
publishPromises.push(
...dataColumnSidecars.map((dataColumnSidecar) => () => network.publishDataColumnSidecar(dataColumnSidecar))
);
}
publishPromises.push(
() => network.publishBeaconBlock(signedBlock),
() =>
// there is no rush to persist block since we published it to gossip anyway
chain
.processBlock(blockForImport, {...opts, eagerPersistBlock: false})
.processBlock(blockInput, {...opts, eagerPersistBlock: false})
.catch((e) => {
if (e instanceof BlockError && e.type.code === BlockErrorCode.PARENT_UNKNOWN) {
network.events.emit(NetworkEvent.unknownBlockParent, {
blockInput: blockForImport,
peer: IDENTITY_PEER_ID,
// TODO (@matthewkeil) why do we try to sync the unknown parent? How did the block get built and published
// to the API if its parent is unknown? Seems like this is an invalid case or if its valid should
// the block be stored and published once its ancestry is known?
network.events.emit(NetworkEvent.unknownParent, {
blockInput: blockInput,
source: BlockInputSourceType.api,
});
}
throw e;
}),
];
})
);

await promiseAllMaybeAsync(publishPromises);
};

Expand Down
10 changes: 9 additions & 1 deletion packages/beacon-node/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {validateApiVoluntaryExit} from "../../../../chain/validation/voluntaryEx
import {validateGossipFnRetryUnknownRoot} from "../../../../network/processor/gossipHandlers.js";
import {ApiError, FailureList, IndexedError} from "../../errors.js";
import {ApiModules} from "../../types.js";
import {BlockInputSourceType} from "../../../../chain/blocks/utils/blockInput.js";

export function getBeaconPoolApi({
chain,
Expand Down Expand Up @@ -115,7 +116,14 @@ export function getBeaconPoolApi({
// and the block hasn't been in our forkchoice since we haven't seen / processing that block
// see https://github.com/ChainSafe/lodestar/issues/5098
const {indexedAttestation, subnet, attDataRootHex, committeeIndex, committeeValidatorIndex, committeeSize} =
await validateGossipFnRetryUnknownRoot(validateFn, network, chain, slot, beaconBlockRoot);
await validateGossipFnRetryUnknownRoot(
validateFn,
network,
chain,
slot,
beaconBlockRoot,
BlockInputSourceType.api
);

if (network.shouldAggregate(subnet, slot)) {
const insertOutcome = chain.attestationPool.add(
Expand Down
9 changes: 6 additions & 3 deletions packages/beacon-node/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {PubkeyIndexMap} from "@chainsafe/pubkey-index-map";
import {routes} from "@lodestar/api";
import {ApplicationMethods} from "@lodestar/api/server";
import {DataAvailabilityStatus, ExecutionStatus} from "@lodestar/fork-choice";
import {ExecutionStatus} from "@lodestar/fork-choice";
import {
ForkPostBellatrix,
ForkPostDeneb,
Expand All @@ -17,6 +17,7 @@ import {
} from "@lodestar/params";
import {
CachedBeaconStateAllForks,
DataAvailabilityStatus,
attesterShufflingDecisionRoot,
beaconBlockToBlinded,
calculateCommitteeAssignments,
Expand Down Expand Up @@ -81,6 +82,7 @@ import {getStateResponseWithRegen} from "../beacon/state/utils.js";
import {ApiError, NodeIsSyncing, OnlySupportedByDVT} from "../errors.js";
import {ApiModules} from "../types.js";
import {computeSubnetForCommitteesAtSlot, getPubkeysForIndices, selectBlockProductionSource} from "./utils.js";
import {BlockInputSourceType} from "../../../chain/blocks/utils/blockInput.js";

/**
* If the node is within this many epochs from the head, we declare it to be synced regardless of
Expand Down Expand Up @@ -978,7 +980,7 @@ export function getValidatorApi(
// see https://github.com/ChainSafe/lodestar/issues/5063
if (!chain.forkChoice.hasBlock(beaconBlockRoot)) {
const rootHex = toRootHex(beaconBlockRoot);
network.searchUnknownSlotRoot({slot, root: rootHex});
network.searchUnknownSlotRoot(slot, rootHex, BlockInputSourceType.api);
// if result of this call is false, i.e. block hasn't seen after 1 slot then the below notOnOptimisticBlockRoot call will throw error
await chain.waitForBlock(slot, rootHex);
}
Expand Down Expand Up @@ -1306,7 +1308,8 @@ export function getValidatorApi(
network,
chain,
slot,
beaconBlockRoot
beaconBlockRoot,
BlockInputSourceType.api
);

chain.aggregatedAttestationPool.add(
Expand Down
Loading
Loading