Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metric-stats): Report cardinality to metric stats #3360

Merged
merged 2 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
- Apply rate limits to span metrics. ([#3255](https://github.com/getsentry/relay/pull/3255))
- Extract metrics from transaction spans. ([#3273](https://github.com/getsentry/relay/pull/3273), [#3324](https://github.com/getsentry/relay/pull/3324))
- Implement volume metric stats. ([#3281](https://github.com/getsentry/relay/pull/3281))
- Implement cardinality metric stats. ([#3360](https://github.com/getsentry/relay/pull/3360))
- Scrub transactions before enforcing quotas. ([#3248](https://github.com/getsentry/relay/pull/3248))
- Implement metric name based cardinality limits. ([#3313](https://github.com/getsentry/relay/pull/3313))
- Kafka topic config supports default topic names as keys. ([#3282](https://github.com/getsentry/relay/pull/3282))
Expand Down
6 changes: 3 additions & 3 deletions relay-cardinality/benches/redis_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct NoopReporter;
impl<'a> Reporter<'a> for NoopReporter {
fn reject(&mut self, _limit_id: &'a CardinalityLimit, _entry_id: EntryId) {}

fn cardinality(&mut self, _limit: &'a CardinalityLimit, _report: CardinalityReport) {}
fn report_cardinality(&mut self, _limit: &'a CardinalityLimit, _report: CardinalityReport) {}
}

#[derive(Debug)]
Expand All @@ -58,7 +58,7 @@ struct Params {
}

impl Params {
fn new(limit: u64, rounds: usize, num_hashes: usize) -> Self {
fn new(limit: u32, rounds: usize, num_hashes: usize) -> Self {
Self {
limits: vec![CardinalityLimit {
id: "limit".to_owned(),
Expand Down Expand Up @@ -140,7 +140,7 @@ impl Params {
EntryId(usize::MAX - i as usize),
MetricNamespace::Custom,
&self.name,
i as u32,
i,
)
})
.collect::<Vec<_>>()
Expand Down
2 changes: 1 addition & 1 deletion relay-cardinality/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub struct CardinalityLimit {
/// The sliding window to enforce the cardinality limits in.
pub window: SlidingWindow,
/// The cardinality limit.
pub limit: u64,
pub limit: u32,

/// Scope which the limit applies to.
pub scope: CardinalityScope,
Expand Down
12 changes: 6 additions & 6 deletions relay-cardinality/src/limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct CardinalityReport {
pub name: Option<MetricName>,

/// The current cardinality.
pub cardinality: u64,
pub cardinality: u32,
}

/// Accumulator of all cardinality limiter decisions.
Expand All @@ -59,7 +59,7 @@ pub trait Reporter<'a> {
///
/// For example, with a name scoped limit can be called once for every
/// metric name matching the limit.
fn cardinality(&mut self, limit: &'a CardinalityLimit, report: CardinalityReport);
fn report_cardinality(&mut self, limit: &'a CardinalityLimit, report: CardinalityReport);
}

/// Limiter responsible to enforce limits.
Expand Down Expand Up @@ -204,7 +204,7 @@ impl<'a> Reporter<'a> for DefaultReporter<'a> {
}

#[inline(always)]
fn cardinality(&mut self, limit: &'a CardinalityLimit, report: CardinalityReport) {
fn report_cardinality(&mut self, limit: &'a CardinalityLimit, report: CardinalityReport) {
if !limit.report {
return;
}
Expand Down Expand Up @@ -624,7 +624,7 @@ mod tests {
I: IntoIterator<Item = Entry<'b>>,
T: Reporter<'a>,
{
reporter.cardinality(
reporter.report_cardinality(
&limits[0],
CardinalityReport {
organization_id: Some(scoping.organization_id),
Expand All @@ -634,7 +634,7 @@ mod tests {
},
);

reporter.cardinality(
reporter.report_cardinality(
&limits[0],
CardinalityReport {
organization_id: Some(scoping.organization_id),
Expand All @@ -644,7 +644,7 @@ mod tests {
},
);

reporter.cardinality(
reporter.report_cardinality(
&limits[2],
CardinalityReport {
organization_id: Some(scoping.organization_id),
Expand Down
6 changes: 3 additions & 3 deletions relay-cardinality/src/redis/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl<'a> CacheRead<'a> {
Self { inner, timestamp }
}

pub fn check(&self, scope: &QuotaScoping, hash: u32, limit: u64) -> CacheOutcome {
pub fn check(&self, scope: &QuotaScoping, hash: u32, limit: u32) -> CacheOutcome {
let Some(cache) = self.inner.cache.get(scope) else {
return CacheOutcome::Unknown;
};
Expand Down Expand Up @@ -166,15 +166,15 @@ struct ScopedCache {
}

impl ScopedCache {
fn check(&self, slot: Slot, hash: u32, limit: u64) -> CacheOutcome {
fn check(&self, slot: Slot, hash: u32, limit: u32) -> CacheOutcome {
if slot != self.current_slot {
return CacheOutcome::Unknown;
}

if self.hashes.contains(&hash) {
// Local cache copy contains the hash -> accept it straight away
CacheOutcome::Accepted
} else if self.hashes.len() as u64 >= limit {
} else if self.hashes.len().try_into().unwrap_or(u32::MAX) >= limit {
// We have more or the same amount of items in the local cache as the cardinality
// limit -> this new item/hash is rejected.
CacheOutcome::Rejected
Expand Down
10 changes: 5 additions & 5 deletions relay-cardinality/src/redis/limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl RedisSetLimiter {
.inspect(|(_, result)| {
metric!(
histogram(CardinalityLimiterHistograms::RedisSetCardinality) =
result.cardinality,
result.cardinality as u64,
id = state.id(),
);
})
Expand Down Expand Up @@ -163,7 +163,7 @@ impl Limiter for RedisSetLimiter {
})?;

for result in results {
reporter.cardinality(state.cardinality_limit(), result.to_report());
reporter.report_cardinality(state.cardinality_limit(), result.to_report());

// This always acquires a write lock, but we only hit this
// if we previously didn't satisfy the request from the cache,
Expand All @@ -188,7 +188,7 @@ impl Limiter for RedisSetLimiter {

struct CheckedLimits {
scope: QuotaScoping,
cardinality: u64,
cardinality: u32,
entries: Vec<RedisEntry>,
statuses: Vec<Status>,
}
Expand Down Expand Up @@ -292,7 +292,7 @@ mod tests {
}

#[track_caller]
fn assert_cardinality(&self, limit: &CardinalityLimit, cardinality: u64) {
fn assert_cardinality(&self, limit: &CardinalityLimit, cardinality: u32) {
let Some(r) = self.reports.get(limit) else {
panic!("expected cardinality report for limit {limit:?}");
};
Expand All @@ -306,7 +306,7 @@ mod tests {
self.entries.insert(entry_id);
}

fn cardinality(&mut self, limit: &'a CardinalityLimit, report: CardinalityReport) {
fn report_cardinality(&mut self, limit: &'a CardinalityLimit, report: CardinalityReport) {
let reports = self.reports.entry(limit.clone()).or_default();
reports.push(report);
reports.sort();
Expand Down
8 changes: 4 additions & 4 deletions relay-cardinality/src/redis/script.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl FromRedisValue for Status {
#[derive(Debug)]
pub struct CardinalityScriptResult {
/// Cardinality of the limit.
pub cardinality: u64,
pub cardinality: u32,
/// Status for each hash passed to the script.
pub statuses: Vec<Status>,
}
Expand Down Expand Up @@ -124,7 +124,7 @@ impl CardinalityScript {
/// Returns a [`redis::ScriptInvocation`] with all keys and arguments prepared.
fn prepare_invocation(
&self,
limit: u64,
limit: u32,
expire: u64,
hashes: impl Iterator<Item = u32>,
keys: impl Iterator<Item = String>,
Expand Down Expand Up @@ -156,7 +156,7 @@ impl<'a> CardinalityScriptPipeline<'a> {
/// Adds another invocation of the script to the pipeline.
pub fn add_invocation(
&mut self,
limit: u64,
limit: u32,
expire: u64,
hashes: impl Iterator<Item = u32>,
keys: impl Iterator<Item = String>,
Expand Down Expand Up @@ -196,7 +196,7 @@ mod tests {
fn invoke_one(
&self,
con: &mut Connection,
limit: u64,
limit: u32,
expire: u64,
hashes: impl Iterator<Item = u32>,
keys: impl Iterator<Item = String>,
Expand Down
2 changes: 1 addition & 1 deletion relay-cardinality/src/redis/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
#[derive(Debug)]
pub struct LimitState<'a> {
/// The limit of the quota.
pub limit: u64,
pub limit: u32,

/// Scoping of the quota.
partial_scope: PartialQuotaScoping,
Expand Down
90 changes: 83 additions & 7 deletions relay-server/src/metric_stats.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::collections::BTreeMap;
use std::sync::{Arc, OnceLock};

use relay_cardinality::{CardinalityLimit, CardinalityReport};
use relay_config::Config;
use relay_metrics::{Aggregator, Bucket, BucketValue, MergeBuckets, MetricName, UnixTimestamp};
use relay_metrics::{
Aggregator, Bucket, BucketValue, GaugeValue, MergeBuckets, MetricName, UnixTimestamp,
};
use relay_quotas::Scoping;
use relay_system::Addr;

Expand All @@ -18,6 +21,14 @@ fn volume_metric_mri() -> MetricName {
.clone()
}

fn cardinality_metric_mri() -> MetricName {
static CARDINALITY_METRIC_MRI: OnceLock<MetricName> = OnceLock::new();

CARDINALITY_METRIC_MRI
.get_or_init(|| "g:metric_stats/cardinality@none".into())
.clone()
}

/// Tracks stats about metrics.
///
/// Metric stats are similar to outcomes for envelopes, they record
Expand Down Expand Up @@ -47,8 +58,8 @@ impl MetricStats {
}

/// Tracks the metric volume and outcome for the bucket.
pub fn track(&self, scoping: Scoping, bucket: Bucket, outcome: Outcome) {
if !self.config.processing_enabled() || !self.is_rolled_out(scoping.organization_id) {
pub fn track_metric(&self, scoping: Scoping, bucket: Bucket, outcome: Outcome) {
if !self.is_enabled(scoping) {
return;
}

Expand All @@ -66,6 +77,35 @@ impl MetricStats {
.send(MergeBuckets::new(scoping.project_key, vec![volume]));
}

/// Tracks the cardinality of a metric.
pub fn track_cardinality(
&self,
scoping: Scoping,
limit: &CardinalityLimit,
report: &CardinalityReport,
) {
if !self.is_enabled(scoping) {
return;
}

let Some(cardinality) = self.to_cardinality_metric(limit, report) else {
return;
};

relay_log::trace!(
"Tracking cardinality '{}' for mri '{}': {}",
limit.id,
report.name.as_deref().unwrap_or("-"),
report.cardinality,
);
self.aggregator
.send(MergeBuckets::new(scoping.project_key, vec![cardinality]));
}

fn is_enabled(&self, scoping: Scoping) -> bool {
self.config.processing_enabled() && self.is_rolled_out(scoping.organization_id)
}

fn is_rolled_out(&self, organization_id: u64) -> bool {
let rate = self
.global_config
Expand Down Expand Up @@ -109,6 +149,42 @@ impl MetricStats {
metadata: Default::default(),
})
}

fn to_cardinality_metric(
&self,
limit: &CardinalityLimit,
report: &CardinalityReport,
) -> Option<Bucket> {
let cardinality = report.cardinality;
if cardinality == 0 {
return None;
}

let name = report.name.as_ref()?;
let namespace = name.namespace();

if !namespace.has_metric_stats() {
return None;
}

let tags = BTreeMap::from([
("mri".to_owned(), name.to_string()),
("mri.namespace".to_owned(), namespace.to_string()),
(
"cardinality.window".to_owned(),
limit.window.window_seconds.to_string(),
),
]);

Some(Bucket {
timestamp: UnixTimestamp::now(),
width: 0,
name: cardinality_metric_mri(),
value: BucketValue::Gauge(GaugeValue::single(cardinality.into())),
tags,
metadata: Default::default(),
})
}
}

#[cfg(test)]
Expand Down Expand Up @@ -163,10 +239,10 @@ mod tests {
let scoping = scoping();
let mut bucket = Bucket::parse(b"rt@millisecond:57|d", UnixTimestamp::now()).unwrap();

ms.track(scoping, bucket.clone(), Outcome::Accepted);
ms.track_metric(scoping, bucket.clone(), Outcome::Accepted);

bucket.metadata.merges = bucket.metadata.merges.saturating_add(41);
ms.track(
ms.track_metric(
scoping,
bucket,
Outcome::RateLimited(Some(ReasonCode::new("foobar"))),
Expand Down Expand Up @@ -224,7 +300,7 @@ mod tests {

let scoping = scoping();
let bucket = Bucket::parse(b"rt@millisecond:57|d", UnixTimestamp::now()).unwrap();
ms.track(scoping, bucket, Outcome::Accepted);
ms.track_metric(scoping, bucket, Outcome::Accepted);

drop(ms);

Expand All @@ -238,7 +314,7 @@ mod tests {
let scoping = scoping();
let bucket =
Bucket::parse(b"transactions/rt@millisecond:57|d", UnixTimestamp::now()).unwrap();
ms.track(scoping, bucket, Outcome::Accepted);
ms.track_metric(scoping, bucket, Outcome::Accepted);

drop(ms);

Expand Down
4 changes: 3 additions & 1 deletion relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl ServiceState {
config.clone(),
global_config_handle.clone(),
outcome_aggregator.clone(),
metric_stats,
metric_stats.clone(),
)?
.start_in(rt),
),
Expand Down Expand Up @@ -180,6 +180,8 @@ impl ServiceState {
aggregator.clone(),
#[cfg(feature = "processing")]
store.clone(),
#[cfg(feature = "processing")]
metric_stats,
)
.spawn_handler(processor_rx);

Expand Down
Loading
Loading