Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pipeline loop #10

Merged
merged 12 commits into from
Oct 6, 2022
Merged

feat: pipeline loop #10

merged 12 commits into from
Oct 6, 2022

Conversation

onbjerg
Copy link
Collaborator

@onbjerg onbjerg commented Oct 3, 2022

Implements the execution and unwinding loops of the pipeline. The saving/loading of stage progress is also implemented, although it's not nice to look at w/o db abstractions

///
/// The unwind priority is set with [Pipeline::push_with_unwind_priority]. Stages with higher unwind
/// priorities are unwound first.
#[derive(Default)]
pub struct Pipeline {
stages: Vec<QueuedStage>,
unwind_to: Option<U64>,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can just call unwind directly

max_block: Option<U64>,
exit_after_sync: bool,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Essentially the same as max_block so I just deduplicated

@onbjerg onbjerg changed the title wip: pipeline loop feat: pipeline loop Oct 4, 2022
@onbjerg onbjerg requested review from mattsse and rkrasiuk and removed request for mattsse October 4, 2022 22:41
Copy link
Member

@rkrasiuk rkrasiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left some comments

Comment on lines +272 to +283
let mut unwind_pipeline = {
let mut stages: Vec<_> = self.stages.iter_mut().enumerate().collect();
stages.sort_by_key(|(id, stage)| {
if stage.unwind_priority > 0 {
(id - stage.unwind_priority, 0)
} else {
(*id, 1)
}
});
stages.reverse();
stages
};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a possible overflow since id & unwind_priority are usizes. in general, the sorting here is a bit confusing. let's say, we have the queue of [stage1, stage2, stage3, stage4] and we want the resulting unwind queue to be [stage4, stage1, stage2, stage3], so we assign priorities stage3 - 1, stage2 - 2, stage1 - 3. if we were to add stage5 at arbitrary place, wouldn't we need to modify all of the existing priorities?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct. I've been thinking about a better way to do this as well

rx.send(PipelineEvent::Ran { stage_id, result: None }).await?
}

return Err(e)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, any additional error handling we'd need to do here? just making sure that if it's a fatal error, on next restart the pipeline should recover the previous state from the db

Copy link
Collaborator Author

@onbjerg onbjerg Oct 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should never encounter fatal errors that result in unwinds, those are bugs that we should fix as it alludes to something major has gone wrong. If an unwind is necessary the stage should specify that

If we encounter this branch then the current transaction is aborted, so it will sync from the point where the transaction was committed last. If we want to go further back than that then I'm not sure how we would determine that without the stage telling us since we don't track a history of stage progress

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perfect, ty 👌

Copy link
Collaborator

@mattsse mattsse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm lacking a bit of context here and it's not obvious what the pipeline does.

@@ -60,18 +62,15 @@ pub enum StageError {
///
/// TODO: This depends on the consensus engine and should include the validation failure reason
#[error("Stage encountered a validation error.")]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

include block number in error?

/// The actual stage to execute.
stage: Box<dyn Stage>,
stage: Box<dyn Stage<'db, E>>,
/// The unwind priority of the stage.
unwind_priority: usize,
/// Whether or not this stage can only execute when we reach what we believe to be the tip of
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tip of tip?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsure what this refers to, it doesn't say tip of tip?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's another tip on the next line -.-
github annotations for unchanged lines could use some work

Comment on lines +70 to +73
/// Create a new pipeline.
pub fn new() -> Self {
Default::default()
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant new, can just use default

.map_or(false, |(progress, target)| progress >= target);

// Execute stage
let output = async {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it make sense to move this to a separate function?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsure, it won't really be used outside of this loop. If you just want to execute a stage directly you would call Stage::execute

@onbjerg
Copy link
Collaborator Author

onbjerg commented Oct 5, 2022

I'm lacking a bit of context here and it's not obvious what the pipeline does.

Do you have ideas for how to improve the rustdoc then? It just executes the stages serially, and once it has looped through all of them, it executes them again. It's the main driver in staged syncing. If a stage encounters a validation error, it unwinds the stages in reverse order and syncs again. That's all it does

Copy link
Collaborator

@mattsse mattsse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, was just lacking the context on Pipeline/Stage in general.

should merge and iterate over with follow ups?

@onbjerg
Copy link
Collaborator Author

onbjerg commented Oct 6, 2022

Yeah let's merge and think of a better unwind priority system

@onbjerg onbjerg merged commit c749658 into master Oct 6, 2022
@onbjerg onbjerg deleted the onbjerg/pipeline-loop branch October 6, 2022 16:24
BrianBland pushed a commit to BrianBland/reth that referenced this pull request Jul 13, 2023
…ve-import

chore(executor): removed useless import
clabby added a commit to clabby/reth that referenced this pull request Aug 13, 2023
Resolution checkpoint

Resolution checkpoint paradigmxyz#2

Resolution checkpoint paradigmxyz#3

x

Resolution checkpoint paradigmxyz#4

Resolution checkpoint paradigmxyz#5

Resolution checkpoint paradigmxyz#6

Resolution checkpoint paradigmxyz#7

Resolution checkpoint paradigmxyz#8

Resolve checkpoint paradigmxyz#9 (transaction primitive)

Resolve checkpoint paradigmxyz#10 (rpc api transactions)

Resolve checkpoint paradigmxyz#11 (building w/o feature flag)

Start review

Compiling with and without `optimism` feature flag

Remove `DepositTx` from txpool mock tests, they never go into the txpool

fmt

code lint

fix signature tests

Co-authored-by: nicolas <[email protected]>

Use free CI runners (revert before upstream)

Co-authored-by: refcell <[email protected]>

Signature test fixes

Co-authored-by refcell <[email protected]>

Fix Receipt proptest

Co-authored-by BB <[email protected]>

lint

Fix variable-length compact for txtype/transaction

Co-authored-by: Brian Bland <[email protected]>

Fix basefee tests

Remove unnecessary rpc deps

Co-authored-by: Brian Bland <[email protected]>
Co-authored-by: refcell <[email protected]>
Co-authored-by: nicolas <[email protected]>
Co-authored-by: Roberto <[email protected]>
clabby added a commit to clabby/reth that referenced this pull request Aug 13, 2023
Resolution checkpoint

Resolution checkpoint paradigmxyz#2

Resolution checkpoint paradigmxyz#3

x

Resolution checkpoint paradigmxyz#4

Resolution checkpoint paradigmxyz#5

Resolution checkpoint paradigmxyz#6

Resolution checkpoint paradigmxyz#7

Resolution checkpoint paradigmxyz#8

Resolve checkpoint paradigmxyz#9 (transaction primitive)

Resolve checkpoint paradigmxyz#10 (rpc api transactions)

Resolve checkpoint paradigmxyz#11 (building w/o feature flag)

Start review

Compiling with and without `optimism` feature flag

Remove `DepositTx` from txpool mock tests, they never go into the txpool

fmt

code lint

fix signature tests

Co-authored-by: nicolas <[email protected]>

Use free CI runners (revert before upstream)

Co-authored-by: refcell <[email protected]>

Signature test fixes

Co-authored-by refcell <[email protected]>

Fix Receipt proptest

Co-authored-by BB <[email protected]>

lint

Fix variable-length compact for txtype/transaction

Co-authored-by: Brian Bland <[email protected]>

Fix basefee tests

Remove unnecessary rpc deps

Co-authored-by: Brian Bland <[email protected]>
Co-authored-by: refcell <[email protected]>
Co-authored-by: nicolas <[email protected]>
Co-authored-by: Roberto <[email protected]>
anonymousGiga added a commit to anonymousGiga/reth that referenced this pull request Feb 20, 2024
yutianwu pushed a commit to yutianwu/reth that referenced this pull request Jun 5, 2024
* feat: support opBNB

* chore: update revm

* fix: merge conflicts

* feat: add opbnb features

* chore: fix fmt

* fixup! chore: fix fmt

* fix: add opbnb features to more code spaces

* feat: add mgasps log

* ci: change ut runner to github official provided

* fix: mgas ps log

* ci: fix unit, integration test job

* fix: revm spec dispatcher

* chore: update revm

* fix: add fermat fork timestamp to fix revm init

* update revm and alloy-chain version

---------

Co-authored-by: j75689 <[email protected]>
Co-authored-by: Keefe Liu <[email protected]>
AshinGau added a commit to AshinGau/reth that referenced this pull request Sep 20, 2024
Add pre-round context to next round execution
AshinGau added a commit to AshinGau/reth that referenced this pull request Oct 13, 2024
Add pre-round context to next round execution
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants