Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): Add metadata for buckets #3254

Merged
merged 11 commits into from
Mar 14, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- Allow enabling SSL for Kafka. ([#3232](https://github.com/getsentry/relay/pull/3232))
- Enable HTTP compression for all APIs. ([#3233](https://github.com/getsentry/relay/pull/3233))
- Add `process.load` span to ingested mobile span ops. ([#3227](https://github.com/getsentry/relay/pull/3227))
- Track metric bucket metadata for Relay internal usage. ([#3254](https://github.com/getsentry/relay/pull/3254))
- Enforce rate limits for standalone spans. ([#3238](https://github.com/getsentry/relay/pull/3238))
- Extract `span.status_code` tag for HTTP spans. ([#3245](https://github.com/getsentry/relay/pull/3245))

Expand Down
2 changes: 1 addition & 1 deletion relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ impl ConfigObject for Credentials {
}

/// Information on a downstream Relay.
#[derive(Debug, Serialize, Deserialize, Clone)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RelayInfo {
/// The public key that this Relay uses to authenticate and sign requests.
Expand Down
1 change: 1 addition & 0 deletions relay-metrics/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ fn bench_insert_and_flush(c: &mut Criterion) {
name: "c:transactions/foo@none".to_owned(),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
metadata: Default::default(),
};

let inputs = [
Expand Down
78 changes: 68 additions & 10 deletions relay-metrics/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tokio::time::Instant;
use crate::bucket::{Bucket, BucketValue};
use crate::protocol::{self, MetricNamespace, MetricResourceIdentifier};
use crate::statsd::{MetricCounters, MetricGauges, MetricHistograms, MetricSets, MetricTimers};
use crate::BucketMetadata;

/// Any error that may occur during aggregation.
#[derive(Debug, Error, PartialEq)]
Expand Down Expand Up @@ -311,18 +312,43 @@ impl Default for AggregatorConfig {
struct QueuedBucket {
flush_at: Instant,
value: BucketValue,
metadata: BucketMetadata,
}

impl QueuedBucket {
/// Creates a new `QueuedBucket` with a given flush time.
fn new(flush_at: Instant, value: BucketValue) -> Self {
Self { flush_at, value }
fn new(flush_at: Instant, value: BucketValue, metadata: BucketMetadata) -> Self {
Self {
flush_at,
value,
metadata,
}
}

/// Returns `true` if the flush time has elapsed.
fn elapsed(&self) -> bool {
Instant::now() > self.flush_at
}

/// Merges a bucket into the current queued bucket.
///
/// Returns the value cost increase on success,
/// otherwise returns an error if the passed bucket value type does not match
/// the contained type.
fn merge(
&mut self,
value: BucketValue,
metadata: BucketMetadata,
) -> Result<usize, AggregateMetricsErrorKind> {
let cost_before = self.value.cost();

self.value
.merge(value)
.map_err(|_| AggregateMetricsErrorKind::InvalidTypes)?;
self.metadata.merge(metadata);

Ok(self.value.cost().saturating_sub(cost_before))
}
}

impl PartialEq for QueuedBucket {
Expand Down Expand Up @@ -497,6 +523,7 @@ impl Aggregator {
name: key.metric_name,
value: entry.value,
tags: key.tags,
metadata: entry.metadata,
})
.collect()
}
Expand Down Expand Up @@ -530,6 +557,7 @@ impl Aggregator {
if force || entry.elapsed() {
// Take the value and leave a placeholder behind. It'll be removed right after.
let value = mem::replace(&mut entry.value, BucketValue::counter(0.into()));
let metadata = mem::take(&mut entry.metadata);
cost_tracker.subtract_cost(key.project_key, key.cost());
cost_tracker.subtract_cost(key.project_key, value.cost());

Expand All @@ -545,6 +573,7 @@ impl Aggregator {
name: key.metric_name.clone(),
value,
tags: key.tags.clone(),
metadata,
};

buckets
Expand Down Expand Up @@ -775,13 +804,8 @@ impl Aggregator {
aggregator = &self.name,
namespace = entry.key().namespace().as_str(),
);
let bucket_value = &mut entry.get_mut().value;
let cost_before = bucket_value.cost();
bucket_value
.merge(bucket.value)
.map_err(|_| AggregateMetricsErrorKind::InvalidTypes)?;
let cost_after = bucket_value.cost();
added_cost = cost_after.saturating_sub(cost_before);

added_cost = entry.get_mut().merge(bucket.value, bucket.metadata)?;
}
Entry::Vacant(entry) => {
relay_statsd::metric!(
Expand All @@ -798,7 +822,7 @@ impl Aggregator {
let flush_at = self.config.get_flush_time(entry.key());
let value = bucket.value;
added_cost = entry.key().cost() + value.cost();
entry.insert(QueuedBucket::new(flush_at, value));
entry.insert(QueuedBucket::new(flush_at, value, bucket.metadata));
}
}

Expand Down Expand Up @@ -889,6 +913,7 @@ mod tests {
name: "c:transactions/foo".to_owned(),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
metadata: BucketMetadata::new(),
}
}

Expand Down Expand Up @@ -1133,6 +1158,7 @@ mod tests {
name: "c:transactions/foo@none".to_owned(),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
metadata: BucketMetadata::new(),
};
let bucket_key = BucketKey {
project_key,
Expand Down Expand Up @@ -1388,6 +1414,7 @@ mod tests {
name: "c:transactions/foo".to_owned(),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
metadata: BucketMetadata::new(),
};

let mut aggregator = Aggregator::new(test_config());
Expand Down Expand Up @@ -1418,6 +1445,7 @@ mod tests {
name: "c:transactions/foo".to_owned(),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
metadata: BucketMetadata::new(),
};

let mut aggregator = Aggregator::new(config);
Expand All @@ -1439,4 +1467,34 @@ mod tests {
let parsed: AggregatorConfig = serde_json::from_str(json).unwrap();
assert!(matches!(parsed.shift_key, ShiftKey::Bucket));
}

#[test]
fn test_aggregator_merge_metadata() {
let mut config = test_config();
config.bucket_interval = 10;

let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
let mut aggregator = Aggregator::new(config);

let bucket1 = some_bucket();
let bucket2 = some_bucket();

// Create a bucket with already 3 merges.
let mut bucket3 = some_bucket();
bucket3.metadata.merge(BucketMetadata::new());
bucket3.metadata.merge(BucketMetadata::new());

aggregator.merge(project_key, bucket1, None).unwrap();
aggregator.merge(project_key, bucket2, None).unwrap();
aggregator.merge(project_key, bucket3, None).unwrap();

let buckets: Vec<_> = aggregator.buckets.values().map(|v| &v.metadata).collect();
insta::assert_debug_snapshot!(buckets, @r###"
[
BucketMetadata {
merges: 5,
},
]
"###);
}
}
3 changes: 2 additions & 1 deletion relay-metrics/src/aggregatorservice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ mod tests {
use relay_common::time::UnixTimestamp;
use relay_system::{FromMessage, Interface};

use crate::{BucketCountInquiry, BucketValue};
use crate::{BucketCountInquiry, BucketMetadata, BucketValue};

use super::*;

Expand Down Expand Up @@ -478,6 +478,7 @@ mod tests {
name: "c:transactions/foo".to_owned(),
value: BucketValue::counter(42.into()),
tags: BTreeMap::new(),
metadata: BucketMetadata::new(),
}
}

Expand Down
78 changes: 76 additions & 2 deletions relay-metrics/src/bucket.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::{BTreeMap, BTreeSet};
use std::hash::Hash;
use std::iter::FusedIterator;
use std::num::NonZeroU64;
use std::{fmt, mem};

use hash32::{FnvHasher, Hasher as _};
Expand Down Expand Up @@ -602,6 +603,13 @@ pub struct Bucket {
/// ```
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub tags: BTreeMap<String, String>,

/// Relay internal metadata for a metric bucket.
///
/// The metadata contains meta information about the metric bucket itself,
/// for example how many this bucket has been aggregated in total.
#[serde(default, skip_serializing_if = "BucketMetadata::is_default")]
pub metadata: BucketMetadata,
}

impl Bucket {
Expand Down Expand Up @@ -635,6 +643,7 @@ impl Bucket {
name: mri.to_string(),
value,
tags: Default::default(),
metadata: Default::default(),
};

for component in components {
Expand Down Expand Up @@ -735,6 +744,47 @@ impl CardinalityItem for Bucket {
}
}

/// Relay internal metadata for a metric bucket.
#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
pub struct BucketMetadata {
/// How many times the bucket was merged.
///
/// Creating a new bucket is the first merge.
/// Merging two buckets sums the amount of merges.
///
/// For example: Merging two un-merged buckets will yield a total
/// of `2` merges.
pub merges: NonZeroU64,
}

impl BucketMetadata {
/// Creates a fresh metadata instance.
///
/// The new metadata is initialized with `1` merge.
pub fn new() -> Self {
Self {
merges: NonZeroU64::MIN,
}
}

/// Whether the metadata does not contain more information than the default.
pub fn is_default(&self) -> bool {
let Self { merges } = self;
*merges == NonZeroU64::MIN
}

/// Merges another metadata object into the current one.
pub fn merge(&mut self, other: Self) {
self.merges = self.merges.saturating_add(other.merges.get());
}
}

impl Default for BucketMetadata {
fn default() -> Self {
Self::new()
}
}

/// Iterator over parsed metrics returned from [`Bucket::parse_all`].
#[derive(Clone, Debug)]
pub struct ParseBuckets<'a> {
Expand Down Expand Up @@ -860,6 +910,9 @@ mod tests {
42.0,
),
tags: {},
metadata: BucketMetadata {
merges: 1,
},
}
"###);
}
Expand Down Expand Up @@ -888,6 +941,9 @@ mod tests {
],
),
tags: {},
metadata: BucketMetadata {
merges: 1,
},
}
"###);
}
Expand Down Expand Up @@ -934,6 +990,9 @@ mod tests {
},
),
tags: {},
metadata: BucketMetadata {
merges: 1,
},
}
"###);
}
Expand Down Expand Up @@ -988,6 +1047,9 @@ mod tests {
},
),
tags: {},
metadata: BucketMetadata {
merges: 1,
},
}
"###);
}
Expand All @@ -1012,6 +1074,9 @@ mod tests {
},
),
tags: {},
metadata: BucketMetadata {
merges: 1,
},
}
"###);
}
Expand All @@ -1030,6 +1095,9 @@ mod tests {
42.0,
),
tags: {},
metadata: BucketMetadata {
merges: 1,
},
}
"###);
}
Expand Down Expand Up @@ -1208,6 +1276,9 @@ mod tests {
tags: {
"route": "user_index",
},
metadata: BucketMetadata {
merges: 1,
},
},
]
"###);
Expand All @@ -1226,7 +1297,7 @@ mod tests {
]"#;

let buckets = serde_json::from_str::<Vec<Bucket>>(json).unwrap();
insta::assert_debug_snapshot!(buckets, @r#"
insta::assert_debug_snapshot!(buckets, @r###"
[
Bucket {
timestamp: UnixTimestamp(1615889440),
Expand All @@ -1236,9 +1307,12 @@ mod tests {
4.0,
),
tags: {},
metadata: BucketMetadata {
merges: 1,
},
},
]
"#);
"###);
}

#[test]
Expand Down
Loading
Loading