Skip to content

Commit 75a7c08

Browse files
authoredJan 28, 2025··
feat: new da_dispatcher metrics (#3464)
## What ❔ Add new metrics to the da_dispatcher: - sealed -> dispatched lag - operator balance ## Why ❔ To improve transparency and be able to create alerts based on these values ## Checklist <!-- Check your PR fulfills the following items. --> <!-- For draft PRs check the boxes as you complete them. --> - [ ] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [ ] Tests for the changes have been added / updated. - [ ] Documentation comments have been added / updated. - [ ] Code has been formatted via `zkstack dev fmt` and `zkstack dev lint`.
1 parent 416ea31 commit 75a7c08

17 files changed

+1380
-37
lines changed
 

‎core/lib/da_client/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ pub trait DataAvailabilityClient: Sync + Send + fmt::Debug {
2323

2424
/// Returns the maximum size of the blob (in bytes) that can be dispatched. None means no limit.
2525
fn blob_size_limit(&self) -> Option<usize>;
26+
27+
async fn balance(&self) -> Result<u64, DAError>;
2628
}
2729

2830
impl Clone for Box<dyn DataAvailabilityClient> {

‎core/lib/dal/.sqlx/query-2a2083fd04ebd006eb0aa4e0e5f62f3339768a85aaff9a509901e9f42b09097b.json

-28
This file was deleted.

‎core/lib/dal/.sqlx/query-f2eeb448a856b9e57bcc2a724791fb0ee6299fddc9f89cf70c5b69c7182f0a54.json

+34
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎core/lib/dal/src/data_availability_dal.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,8 @@ impl DataAvailabilityDal<'_, '_> {
184184
r#"
185185
SELECT
186186
number,
187-
pubdata_input
187+
pubdata_input,
188+
sealed_at
188189
FROM
189190
l1_batches
190191
LEFT JOIN
@@ -195,6 +196,7 @@ impl DataAvailabilityDal<'_, '_> {
195196
AND number != 0
196197
AND data_availability.blob_id IS NULL
197198
AND pubdata_input IS NOT NULL
199+
AND sealed_at IS NOT NULL
198200
ORDER BY
199201
number
200202
LIMIT
@@ -213,6 +215,7 @@ impl DataAvailabilityDal<'_, '_> {
213215
// `unwrap` is safe here because we have a `WHERE` clause that filters out `NULL` values
214216
pubdata: row.pubdata_input.unwrap(),
215217
l1_batch_number: L1BatchNumber(row.number as u32),
218+
sealed_at: row.sealed_at.unwrap().and_utc(),
216219
})
217220
.collect())
218221
}

‎core/lib/dal/src/models/storage_data_availability.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use chrono::NaiveDateTime;
1+
use chrono::{DateTime, NaiveDateTime, Utc};
22
use zksync_types::{pubdata_da::DataAvailabilityBlob, L1BatchNumber};
33

44
/// Represents a blob in the data availability layer.
@@ -26,4 +26,5 @@ impl From<StorageDABlob> for DataAvailabilityBlob {
2626
pub struct L1BatchDA {
2727
pub pubdata: Vec<u8>,
2828
pub l1_batch_number: L1BatchNumber,
29+
pub sealed_at: DateTime<Utc>,
2930
}

‎core/node/da_clients/src/avail/client.rs

+23
Original file line numberDiff line numberDiff line change
@@ -245,4 +245,27 @@ impl DataAvailabilityClient for AvailClient {
245245
fn blob_size_limit(&self) -> Option<usize> {
246246
Some(RawAvailClient::MAX_BLOB_SIZE)
247247
}
248+
249+
async fn balance(&self) -> Result<u64, DAError> {
250+
match self.sdk_client.as_ref() {
251+
AvailClientMode::Default(client) => {
252+
let AvailClientConfig::FullClient(default_config) = &self.config.config else {
253+
unreachable!(); // validated in protobuf config
254+
};
255+
256+
let ws_client = WsClientBuilder::default()
257+
.build(default_config.api_node_url.clone().as_str())
258+
.await
259+
.map_err(to_non_retriable_da_error)?;
260+
261+
Ok(client
262+
.balance(&ws_client)
263+
.await
264+
.map_err(to_non_retriable_da_error)?)
265+
}
266+
AvailClientMode::GasRelay(_) => {
267+
Ok(0) // TODO: implement balance for gas relay (PE-304)
268+
}
269+
}
270+
}
248271
}

‎core/node/da_clients/src/avail/sdk.rs

+18-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
44
use std::{fmt::Debug, sync::Arc, time};
55

6+
use anyhow::Context;
67
use backon::{ConstantBuilder, Retryable};
78
use bytes::Bytes;
89
use jsonrpsee::{
@@ -22,7 +23,6 @@ use crate::utils::to_non_retriable_da_error;
2223

2324
const PROTOCOL_VERSION: u8 = 4;
2425

25-
/// An implementation of the `DataAvailabilityClient` trait that interacts with the Avail network.
2626
#[derive(Debug, Clone)]
2727
pub(crate) struct RawAvailClient {
2828
app_id: u32,
@@ -344,6 +344,23 @@ impl RawAvailClient {
344344

345345
Ok(tx_id)
346346
}
347+
348+
/// Returns the balance of the address controlled by the `keypair`
349+
pub async fn balance(&self, client: &Client) -> anyhow::Result<u64> {
350+
let address = to_addr(self.keypair.clone());
351+
let resp: serde_json::Value = client
352+
.request("state_getStorage", rpc_params![address])
353+
.await
354+
.context("Error calling state_getStorage RPC")?;
355+
356+
let balance = resp
357+
.as_str()
358+
.ok_or_else(|| anyhow::anyhow!("Invalid balance"))?;
359+
360+
balance
361+
.parse()
362+
.context("Unable to parse the account balance")
363+
}
347364
}
348365

349366
fn blake2<const N: usize>(data: Vec<u8>) -> [u8; N] {

‎core/node/da_clients/src/celestia/client.rs

+7
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,13 @@ impl DataAvailabilityClient for CelestiaClient {
9797
fn blob_size_limit(&self) -> Option<usize> {
9898
Some(1973786) // almost 2MB
9999
}
100+
101+
async fn balance(&self) -> Result<u64, DAError> {
102+
self.client
103+
.balance()
104+
.await
105+
.map_err(to_non_retriable_da_error)
106+
}
100107
}
101108

102109
impl Debug for CelestiaClient {

‎core/node/da_clients/src/celestia/generated/cosmos.bank.v1beta1.rs

+1,121
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// This file is @generated by prost-build.
2+
/// PageRequest is to be embedded in gRPC request messages for efficient
3+
/// pagination. Ex:
4+
///
5+
/// message SomeRequest {
6+
/// Foo some_parameter = 1;
7+
/// PageRequest pagination = 2;
8+
/// }
9+
#[derive(Clone, PartialEq, ::prost::Message)]
10+
pub struct PageRequest {
11+
/// key is a value returned in PageResponse.next_key to begin
12+
/// querying the next page most efficiently. Only one of offset or key
13+
/// should be set.
14+
#[prost(bytes = "vec", tag = "1")]
15+
pub key: ::prost::alloc::vec::Vec<u8>,
16+
/// offset is a numeric offset that can be used when key is unavailable.
17+
/// It is less efficient than using key. Only one of offset or key should
18+
/// be set.
19+
#[prost(uint64, tag = "2")]
20+
pub offset: u64,
21+
/// limit is the total number of results to be returned in the result page.
22+
/// If left empty it will default to a value to be set by each app.
23+
#[prost(uint64, tag = "3")]
24+
pub limit: u64,
25+
/// count_total is set to true to indicate that the result set should include
26+
/// a count of the total number of items available for pagination in UIs.
27+
/// count_total is only respected when offset is used. It is ignored when key
28+
/// is set.
29+
#[prost(bool, tag = "4")]
30+
pub count_total: bool,
31+
/// reverse is set to true if results are to be returned in the descending order.
32+
///
33+
/// Since: cosmos-sdk 0.43
34+
#[prost(bool, tag = "5")]
35+
pub reverse: bool,
36+
}
37+
impl ::prost::Name for PageRequest {
38+
const NAME: &'static str = "PageRequest";
39+
const PACKAGE: &'static str = "cosmos.base.query.v1beta1";
40+
fn full_name() -> ::prost::alloc::string::String {
41+
"cosmos.base.query.v1beta1.PageRequest".into()
42+
}
43+
fn type_url() -> ::prost::alloc::string::String {
44+
"/cosmos.base.query.v1beta1.PageRequest".into()
45+
}
46+
}
47+
/// PageResponse is to be embedded in gRPC response messages where the
48+
/// corresponding request message has used PageRequest.
49+
///
50+
/// message SomeResponse {
51+
/// repeated Bar results = 1;
52+
/// PageResponse page = 2;
53+
/// }
54+
#[derive(::serde::Deserialize, ::serde::Serialize)]
55+
#[serde(default)]
56+
#[derive(Clone, PartialEq, ::prost::Message)]
57+
pub struct PageResponse {
58+
/// next_key is the key to be passed to PageRequest.key to
59+
/// query the next page most efficiently. It will be empty if
60+
/// there are no more results.
61+
#[prost(bytes = "vec", tag = "1")]
62+
pub next_key: ::prost::alloc::vec::Vec<u8>,
63+
/// total is total number of results available if PageRequest.count_total
64+
/// was set, its value is undefined otherwise
65+
#[prost(uint64, tag = "2")]
66+
pub total: u64,
67+
}
68+
impl ::prost::Name for PageResponse {
69+
const NAME: &'static str = "PageResponse";
70+
const PACKAGE: &'static str = "cosmos.base.query.v1beta1";
71+
fn full_name() -> ::prost::alloc::string::String {
72+
"cosmos.base.query.v1beta1.PageResponse".into()
73+
}
74+
fn type_url() -> ::prost::alloc::string::String {
75+
"/cosmos.base.query.v1beta1.PageResponse".into()
76+
}
77+
}

‎core/node/da_clients/src/celestia/mod.rs

+10
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ pub mod cosmos {
2424
pub mod v1beta1 {
2525
include!("generated/cosmos.base.v1beta1.rs");
2626
}
27+
28+
pub mod query {
29+
include!("generated/cosmos.base.query.v1beta1.rs");
30+
}
31+
}
32+
33+
pub mod bank {
34+
pub mod v1beta1 {
35+
include!("generated/cosmos.bank.v1beta1.rs");
36+
}
2737
}
2838

2939
pub mod tx {

‎core/node/da_clients/src/celestia/sdk.rs

+32
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use super::{
2020
query_client::QueryClient as AuthQueryClient, BaseAccount, QueryAccountRequest,
2121
QueryParamsRequest as QueryAuthParamsRequest,
2222
},
23+
bank::v1beta1::{query_client::QueryClient as BankQueryClient, QueryAllBalancesRequest},
2324
base::{
2425
node::{
2526
service_client::ServiceClient as MinGasPriceClient,
@@ -377,6 +378,37 @@ impl RawCelestiaClient {
377378
tracing::debug!(tx_hash = %tx_response.txhash, height, "transaction succeeded");
378379
Ok(Some(height))
379380
}
381+
382+
pub async fn balance(&self) -> anyhow::Result<u64> {
383+
let mut auth_query_client = BankQueryClient::new(self.grpc_channel.clone());
384+
let resp = auth_query_client
385+
.all_balances(QueryAllBalancesRequest {
386+
address: self.address.clone(),
387+
pagination: None,
388+
})
389+
.await?;
390+
391+
let micro_tia_balance = resp
392+
.into_inner()
393+
.balances
394+
.into_iter()
395+
.find(|coin| coin.denom == UNITS_SUFFIX)
396+
.map_or_else(
397+
|| {
398+
Err(anyhow::anyhow!(
399+
"no balance found for address: {}",
400+
self.address
401+
))
402+
},
403+
|coin| {
404+
coin.amount
405+
.parse::<u64>()
406+
.map_err(|e| anyhow::anyhow!("failed to parse balance: {}", e))
407+
},
408+
)?;
409+
410+
Ok(micro_tia_balance)
411+
}
380412
}
381413

382414
/// Returns a `BlobTx` for the given signed tx and blobs.

‎core/node/da_clients/src/eigen/client.rs

+4
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,8 @@ impl DataAvailabilityClient for EigenClient {
6262
fn blob_size_limit(&self) -> Option<usize> {
6363
Some(1920 * 1024) // 2mb - 128kb as a buffer
6464
}
65+
66+
async fn balance(&self) -> Result<u64, DAError> {
67+
Ok(0) // TODO fetch from API when payments are enabled in Eigen (PE-305)
68+
}
6569
}

‎core/node/da_clients/src/no_da.rs

+4
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,8 @@ impl DataAvailabilityClient for NoDAClient {
2525
fn blob_size_limit(&self) -> Option<usize> {
2626
None
2727
}
28+
29+
async fn balance(&self) -> Result<u64, DAError> {
30+
Ok(0)
31+
}
2832
}

‎core/node/da_clients/src/object_store.rs

+4
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ impl DataAvailabilityClient for ObjectStoreDAClient {
8787
fn blob_size_limit(&self) -> Option<usize> {
8888
None
8989
}
90+
91+
async fn balance(&self) -> Result<u64, DAError> {
92+
Ok(0)
93+
}
9094
}
9195

9296
/// Used as a wrapper for the pubdata to be stored in the GCS.

‎core/node/da_dispatcher/src/da_dispatcher.rs

+30-3
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ impl DataAvailabilityDispatcher {
103103
.await?;
104104
drop(conn);
105105

106-
for batch in batches {
106+
for batch in &batches {
107107
let dispatch_latency = METRICS.blob_dispatch_latency.start();
108108
let dispatch_response = retry(self.config.max_retries(), batch.l1_batch_number, || {
109109
self.client
@@ -119,14 +119,14 @@ impl DataAvailabilityDispatcher {
119119
})?;
120120
let dispatch_latency_duration = dispatch_latency.observe();
121121

122-
let sent_at = Utc::now().naive_utc();
122+
let sent_at = Utc::now();
123123

124124
let mut conn = self.pool.connection_tagged("da_dispatcher").await?;
125125
conn.data_availability_dal()
126126
.insert_l1_batch_da(
127127
batch.l1_batch_number,
128128
dispatch_response.blob_id.as_str(),
129-
sent_at,
129+
sent_at.naive_utc(),
130130
)
131131
.await?;
132132
drop(conn);
@@ -135,13 +135,40 @@ impl DataAvailabilityDispatcher {
135135
.last_dispatched_l1_batch
136136
.set(batch.l1_batch_number.0 as usize);
137137
METRICS.blob_size.observe(batch.pubdata.len());
138+
METRICS.sealed_to_dispatched_lag.observe(
139+
sent_at
140+
.signed_duration_since(batch.sealed_at)
141+
.to_std()
142+
.context("sent_at has to be higher than sealed_at")?,
143+
);
138144
tracing::info!(
139145
"Dispatched a DA for batch_number: {}, pubdata_size: {}, dispatch_latency: {dispatch_latency_duration:?}",
140146
batch.l1_batch_number,
141147
batch.pubdata.len(),
142148
);
143149
}
144150

151+
// We don't need to report this metric every iteration, only once when the balance is changed
152+
if !batches.is_empty() {
153+
let client_arc = Arc::new(self.client.clone_boxed());
154+
155+
tokio::spawn(async move {
156+
let balance = client_arc
157+
.balance()
158+
.await
159+
.with_context(|| "Unable to retrieve DA operator balance");
160+
161+
match balance {
162+
Ok(balance) => {
163+
METRICS.operator_balance.set(balance);
164+
}
165+
Err(err) => {
166+
tracing::error!("{err}")
167+
}
168+
}
169+
});
170+
}
171+
145172
Ok(())
146173
}
147174

‎core/node/da_dispatcher/src/metrics.rs

+8-3
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ use vise::{Buckets, Gauge, Histogram, Metrics, Unit};
44

55
/// Buckets for `blob_dispatch_latency` (from 0.1 to 120 seconds).
66
const DISPATCH_LATENCIES: Buckets =
7-
Buckets::values(&[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0]);
7+
Buckets::values(&[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0, 240.0]);
88

99
#[derive(Debug, Metrics)]
1010
#[metrics(prefix = "server_da_dispatcher")]
1111
pub(super) struct DataAvailabilityDispatcherMetrics {
12-
/// Latency of the dispatch of the blob.
12+
/// Latency of the dispatch of the blob. Only the communication with DA layer.
1313
#[metrics(buckets = DISPATCH_LATENCIES, unit = Unit::Seconds)]
1414
pub blob_dispatch_latency: Histogram<Duration>,
1515
/// The duration between the moment when the blob is dispatched and the moment when it is included.
@@ -19,14 +19,19 @@ pub(super) struct DataAvailabilityDispatcherMetrics {
1919
/// Buckets are bytes ranging from 1 KB to 16 MB, which has to satisfy all blob size values.
2020
#[metrics(buckets = Buckets::exponential(1_024.0..=16.0 * 1_024.0 * 1_024.0, 2.0), unit = Unit::Bytes)]
2121
pub blob_size: Histogram<usize>,
22-
2322
/// Number of transactions resent by the DA dispatcher.
2423
#[metrics(buckets = Buckets::linear(0.0..=10.0, 1.0))]
2524
pub dispatch_call_retries: Histogram<usize>,
2625
/// Last L1 batch that was dispatched to the DA layer.
2726
pub last_dispatched_l1_batch: Gauge<usize>,
2827
/// Last L1 batch that has its inclusion finalized by DA layer.
2928
pub last_included_l1_batch: Gauge<usize>,
29+
/// The delay between the moment batch was sealed and the moment it was dispatched. Includes
30+
/// both communication with DA layer and time it spends in the queue on the `da_dispatcher` side.
31+
#[metrics(buckets = DISPATCH_LATENCIES, unit = Unit::Seconds)]
32+
pub sealed_to_dispatched_lag: Histogram<Duration>,
33+
/// The balance of the operator wallet on DA network.
34+
pub operator_balance: Gauge<u64>,
3035
}
3136

3237
#[vise::register]

0 commit comments

Comments
 (0)
Please sign in to comment.