-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: pipeline loop #10
Conversation
/// | ||
/// The unwind priority is set with [Pipeline::push_with_unwind_priority]. Stages with higher unwind | ||
/// priorities are unwound first. | ||
#[derive(Default)] | ||
pub struct Pipeline { | ||
stages: Vec<QueuedStage>, | ||
unwind_to: Option<U64>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can just call unwind
directly
max_block: Option<U64>, | ||
exit_after_sync: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Essentially the same as max_block
so I just deduplicated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left some comments
let mut unwind_pipeline = { | ||
let mut stages: Vec<_> = self.stages.iter_mut().enumerate().collect(); | ||
stages.sort_by_key(|(id, stage)| { | ||
if stage.unwind_priority > 0 { | ||
(id - stage.unwind_priority, 0) | ||
} else { | ||
(*id, 1) | ||
} | ||
}); | ||
stages.reverse(); | ||
stages | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a possible overflow since id
& unwind_priority
are usize
s. in general, the sorting here is a bit confusing. let's say, we have the queue of [stage1, stage2, stage3, stage4]
and we want the resulting unwind queue to be [stage4, stage1, stage2, stage3]
, so we assign priorities stage3
- 1
, stage2
- 2
, stage1
- 3
. if we were to add stage5
at arbitrary place, wouldn't we need to modify all of the existing priorities?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct. I've been thinking about a better way to do this as well
rx.send(PipelineEvent::Ran { stage_id, result: None }).await? | ||
} | ||
|
||
return Err(e) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, any additional error handling we'd need to do here? just making sure that if it's a fatal error, on next restart the pipeline should recover the previous state from the db
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should never encounter fatal errors that result in unwinds, those are bugs that we should fix as it alludes to something major has gone wrong. If an unwind is necessary the stage should specify that
If we encounter this branch then the current transaction is aborted, so it will sync from the point where the transaction was committed last. If we want to go further back than that then I'm not sure how we would determine that without the stage telling us since we don't track a history of stage progress
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perfect, ty 👌
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm lacking a bit of context here and it's not obvious what the pipeline does.
crates/stages/src/lib.rs
Outdated
@@ -60,18 +62,15 @@ pub enum StageError { | |||
/// | |||
/// TODO: This depends on the consensus engine and should include the validation failure reason | |||
#[error("Stage encountered a validation error.")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
include block number in error?
/// The actual stage to execute. | ||
stage: Box<dyn Stage>, | ||
stage: Box<dyn Stage<'db, E>>, | ||
/// The unwind priority of the stage. | ||
unwind_priority: usize, | ||
/// Whether or not this stage can only execute when we reach what we believe to be the tip of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tip of tip?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unsure what this refers to, it doesn't say tip of tip?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there's another tip
on the next line -.-
github annotations for unchanged lines could use some work
/// Create a new pipeline. | ||
pub fn new() -> Self { | ||
Default::default() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
redundant new, can just use default
.map_or(false, |(progress, target)| progress >= target); | ||
|
||
// Execute stage | ||
let output = async { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it make sense to move this to a separate function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unsure, it won't really be used outside of this loop. If you just want to execute a stage directly you would call Stage::execute
Do you have ideas for how to improve the rustdoc then? It just executes the stages serially, and once it has looped through all of them, it executes them again. It's the main driver in staged syncing. If a stage encounters a validation error, it unwinds the stages in reverse order and syncs again. That's all it does |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, was just lacking the context on Pipeline
/Stage
in general.
should merge and iterate over with follow ups?
Yeah let's merge and think of a better unwind priority system |
…ve-import chore(executor): removed useless import
Resolution checkpoint Resolution checkpoint paradigmxyz#2 Resolution checkpoint paradigmxyz#3 x Resolution checkpoint paradigmxyz#4 Resolution checkpoint paradigmxyz#5 Resolution checkpoint paradigmxyz#6 Resolution checkpoint paradigmxyz#7 Resolution checkpoint paradigmxyz#8 Resolve checkpoint paradigmxyz#9 (transaction primitive) Resolve checkpoint paradigmxyz#10 (rpc api transactions) Resolve checkpoint paradigmxyz#11 (building w/o feature flag) Start review Compiling with and without `optimism` feature flag Remove `DepositTx` from txpool mock tests, they never go into the txpool fmt code lint fix signature tests Co-authored-by: nicolas <[email protected]> Use free CI runners (revert before upstream) Co-authored-by: refcell <[email protected]> Signature test fixes Co-authored-by refcell <[email protected]> Fix Receipt proptest Co-authored-by BB <[email protected]> lint Fix variable-length compact for txtype/transaction Co-authored-by: Brian Bland <[email protected]> Fix basefee tests Remove unnecessary rpc deps Co-authored-by: Brian Bland <[email protected]> Co-authored-by: refcell <[email protected]> Co-authored-by: nicolas <[email protected]> Co-authored-by: Roberto <[email protected]>
Resolution checkpoint Resolution checkpoint paradigmxyz#2 Resolution checkpoint paradigmxyz#3 x Resolution checkpoint paradigmxyz#4 Resolution checkpoint paradigmxyz#5 Resolution checkpoint paradigmxyz#6 Resolution checkpoint paradigmxyz#7 Resolution checkpoint paradigmxyz#8 Resolve checkpoint paradigmxyz#9 (transaction primitive) Resolve checkpoint paradigmxyz#10 (rpc api transactions) Resolve checkpoint paradigmxyz#11 (building w/o feature flag) Start review Compiling with and without `optimism` feature flag Remove `DepositTx` from txpool mock tests, they never go into the txpool fmt code lint fix signature tests Co-authored-by: nicolas <[email protected]> Use free CI runners (revert before upstream) Co-authored-by: refcell <[email protected]> Signature test fixes Co-authored-by refcell <[email protected]> Fix Receipt proptest Co-authored-by BB <[email protected]> lint Fix variable-length compact for txtype/transaction Co-authored-by: Brian Bland <[email protected]> Fix basefee tests Remove unnecessary rpc deps Co-authored-by: Brian Bland <[email protected]> Co-authored-by: refcell <[email protected]> Co-authored-by: nicolas <[email protected]> Co-authored-by: Roberto <[email protected]>
* feat: support opBNB * chore: update revm * fix: merge conflicts * feat: add opbnb features * chore: fix fmt * fixup! chore: fix fmt * fix: add opbnb features to more code spaces * feat: add mgasps log * ci: change ut runner to github official provided * fix: mgas ps log * ci: fix unit, integration test job * fix: revm spec dispatcher * chore: update revm * fix: add fermat fork timestamp to fix revm init * update revm and alloy-chain version --------- Co-authored-by: j75689 <[email protected]> Co-authored-by: Keefe Liu <[email protected]>
Add pre-round context to next round execution
Add pre-round context to next round execution
Implements the execution and unwinding loops of the pipeline. The saving/loading of stage progress is also implemented, although it's not nice to look at w/o db abstractions