Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Visualisation logging for sync batch states #6034

Merged
merged 10 commits into from
Jul 29, 2024
30 changes: 30 additions & 0 deletions beacon_node/network/src/sync/range_sync/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,11 @@ impl<E: EthSpec, B: BatchConfig> BatchInfo<E, B> {
}
}
}

// Visualizes the state of this batch using state::visualize()
pub fn visualize(&self) -> &'static str {
self.state.visualize()
}
}

/// Represents a peer's attempt and providing the result for this batch.
Expand Down Expand Up @@ -525,3 +530,28 @@ impl<E: EthSpec> std::fmt::Debug for BatchState<E> {
}
}
}

impl<E: EthSpec> BatchState<E> {
/// Creates a unicode visualisation for the batch state to display in logs for quicker and
/// easier recognition
/// NOTE: May require nerd-fonts
///
/// The current icons are:
/// - Empty/Uninitialized: 
/// - Downloading: ⏬
/// - Awaiting Download: 📥
/// - Awaiting Validation:⏳
/// - Failed: ❌
/// - AwaitingProcessing: 
fn visualize(&self) -> &'static str {
match self {
BatchState::Downloading(_, _) => "⏬",
BatchState::Processing(_) => "🔄",
BatchState::AwaitingValidation(_) => "⏳",
BatchState::AwaitingDownload => "📥",
BatchState::Failed => "❌",
BatchState::AwaitingProcessing(_, _) => "🕓",
BatchState::Poisoned => "💀",
}
}
}
56 changes: 49 additions & 7 deletions beacon_node/network/src/sync/range_sync/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
let awaiting_batches = batch_id
.saturating_sub(self.optimistic_start.unwrap_or(self.processing_target))
/ EPOCHS_PER_BATCH;
debug!(self.log, "Completed batch received"; "epoch" => batch_id, "blocks" => received, "awaiting_batches" => awaiting_batches);
debug!(self.log, "Batch downloaded"; "epoch" => batch_id, "blocks" => received, "batch_state" => self.visualize_batch_state(), "awaiting_batches" => awaiting_batches);

// pre-emptively request more blocks from peers whilst we process current blocks,
self.request_batches(network)?;
Expand Down Expand Up @@ -433,6 +433,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
) -> ProcessingResult {
// the first two cases are possible if the chain advances while waiting for a processing
// result
let batch_state = self.visualize_batch_state();
let batch = match &self.current_processing_batch {
Some(processing_id) if *processing_id != batch_id => {
debug!(self.log, "Unexpected batch result";
Expand Down Expand Up @@ -465,7 +466,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {

// Log the process result and the batch for debugging purposes.
debug!(self.log, "Batch processing result"; "result" => ?result, &batch,
"batch_epoch" => batch_id, "client" => %network.client_type(&peer));
"batch_epoch" => batch_id, "client" => %network.client_type(&peer), "batch_state" => batch_state);

// We consider three cases. Batch was successfully processed, Batch failed processing due
// to a faulty peer, or batch failed processing but the peer can't be deemed faulty.
Expand Down Expand Up @@ -815,6 +816,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
peer_id: &PeerId,
request_id: Id,
) -> ProcessingResult {
let batch_state = self.visualize_batch_state();
if let Some(batch) = self.batches.get_mut(&batch_id) {
// A batch could be retried without the peer failing the request (disconnecting/
// sending an error /timeout) if the peer is removed from the chain for other
Expand All @@ -826,7 +828,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
"batch_epoch" => batch_id,
"batch_state" => ?batch.state(),
"peer_id" => %peer_id,
"request_id" => %request_id
"request_id" => %request_id,
"batch_state" => batch_state
);
return Ok(KeepChain);
}
Expand All @@ -836,7 +839,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
"batch_epoch" => batch_id,
"batch_state" => ?batch.state(),
"peer_id" => %peer_id,
"request_id" => %request_id
"request_id" => %request_id,
"batch_state" => batch_state
);
if let Some(active_requests) = self.peers.get_mut(peer_id) {
active_requests.remove(&batch_id);
Expand All @@ -854,7 +858,8 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
"Batch not found";
"batch_epoch" => batch_id,
"peer_id" => %peer_id,
"request_id" => %request_id
"request_id" => %request_id,
"batch_state" => batch_state
);
// this could be an error for an old batch, removed when the chain advances
Ok(KeepChain)
Expand Down Expand Up @@ -900,6 +905,7 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
batch_id: BatchId,
peer: PeerId,
) -> ProcessingResult {
let batch_state = self.visualize_batch_state();
if let Some(batch) = self.batches.get_mut(&batch_id) {
let (request, batch_type) = batch.to_blocks_by_range_request();
match network.blocks_and_blobs_by_range_request(
Expand All @@ -919,9 +925,9 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
.map(|epoch| epoch == batch_id)
.unwrap_or(false)
{
debug!(self.log, "Requesting optimistic batch"; "epoch" => batch_id, &batch);
debug!(self.log, "Requesting optimistic batch"; "epoch" => batch_id, &batch, "batch_state" => batch_state);
} else {
debug!(self.log, "Requesting batch"; "epoch" => batch_id, &batch);
debug!(self.log, "Requesting batch"; "epoch" => batch_id, &batch, "batch_state" => batch_state);
}
// register the batch for this peer
return self
Expand Down Expand Up @@ -1082,6 +1088,42 @@ impl<T: BeaconChainTypes> SyncingChain<T> {
}
}
}

/// Creates a string visualization of the current state of the chain, to make it easier for debugging and understanding
/// where sync is up to from glancing at the logs.
///
/// This produces a string of the form: [⏳⏬⏬⏬⏬]
/// to indicate the current buffer state of the chain. The symbols are defined on each of the
/// batch states. See [BatchState::visualize] for symbol definitions.
fn visualize_batch_state(&self) -> String {
let mut visualization_string = String::with_capacity((BATCH_BUFFER_SIZE * 3) as usize);

// Start of the block
visualization_string.push('[');

for mut batch_index in 0..BATCH_BUFFER_SIZE {
if let Some(batch) = self
.batches
.get(&(self.processing_target + batch_index as u64 * EPOCHS_PER_BATCH))
{
visualization_string.push_str(batch.visualize());
if batch_index != BATCH_BUFFER_SIZE {
// Add a space in between elements
visualization_string.push(' ');
}
} else {
// No batch exists, it is on our list to be downloaded
// Fill in the rest of the gaps
while batch_index < BATCH_BUFFER_SIZE {
visualization_string.push('');
batch_index += 1;
}
break;
}
}
visualization_string.push(']');
visualization_string
}
}

impl<T: BeaconChainTypes> slog::KV for &mut SyncingChain<T> {
Expand Down
Loading