Skip to content

Commit c44dde2

Browse files
authored
feat(metric-meta): Add support for metric metadata (#2751)
Implements `metric_meta` envelope item and storage in Redis. Epic: getsentry/sentry#60260
1 parent 5e74635 commit c44dde2

24 files changed

+875
-57
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44

55
- `normalize_performance_score` now handles `PerformanceScoreProfile` configs with zero weight components and component weight sums of any number greater than 0. ([#2756](https://github.com/getsentry/relay/pull/2756))
66

7+
**Internal**:
8+
9+
- Add support for metric metadata. ([#2751](https://github.com/getsentry/relay/pull/2751))
10+
711
## 23.11.1
812

913
**Features**:

Cargo.lock

+8-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+4-2
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,16 @@ debug = true
1515

1616
[workspace.dependencies]
1717
anyhow = "1.0.66"
18-
chrono = { version = "0.4.29", default-features = false, features = [
18+
chrono = { version = "0.4.31", default-features = false, features = [
1919
"std",
2020
"serde",
2121
] }
2222
clap = { version = "4.4.6" }
2323
criterion = "0.5"
2424
futures = { version = "0.3", default-features = false, features = ["std"] }
2525
insta = { version = "1.31.0", features = ["json", "redactions", "ron"] }
26+
hash32 = "0.3.1"
27+
hashbrown = "0.13.2"
2628
itertools = "0.10.5"
2729
once_cell = "1.13.1"
2830
rand = "0.8.5"
@@ -32,7 +34,7 @@ serde_json = "1.0.93"
3234
serde_yaml = "0.9.17"
3335
schemars = { version = "=0.8.10", features = ["uuid1", "chrono"] }
3436
similar-asserts = "1.4.2"
35-
smallvec = { version = "1.10.0", features = ["serde"] }
37+
smallvec = { version = "1.11.2", features = ["serde"] }
3638
thiserror = "1.0.38"
3739
tokio = { version = "1.28.0", features = ["macros", "sync", "tracing"] }
3840
url = "2.1.1"

relay-config/src/config.rs

+20
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,14 @@ struct Metrics {
502502
/// For example, a value of `0.3` means that only 30% of the emitted metrics will be sent.
503503
/// Defaults to `1.0` (100%).
504504
sample_rate: f32,
505+
/// Code locations expiry in seconds.
506+
///
507+
/// Defaults to 15 days.
508+
meta_locations_expiry: u64,
509+
/// Maximum amount of code locations to store per metric.
510+
///
511+
/// Defaults to 5.
512+
meta_locations_max: usize,
505513
}
506514

507515
impl Default for Metrics {
@@ -513,6 +521,8 @@ impl Default for Metrics {
513521
hostname_tag: None,
514522
buffering: true,
515523
sample_rate: 1.0,
524+
meta_locations_expiry: 15 * 24 * 60 * 60,
525+
meta_locations_max: 5,
516526
}
517527
}
518528
}
@@ -1758,6 +1768,16 @@ impl Config {
17581768
self.values.metrics.sample_rate
17591769
}
17601770

1771+
/// Returns the maximum amount of code locations per metric.
1772+
pub fn metrics_meta_locations_max(&self) -> usize {
1773+
self.values.metrics.meta_locations_max
1774+
}
1775+
1776+
/// Returns the expiry for code locations.
1777+
pub fn metrics_meta_locations_expiry(&self) -> Duration {
1778+
Duration::from_secs(self.values.metrics.meta_locations_expiry)
1779+
}
1780+
17611781
/// Returns the default timeout for all upstream HTTP requests.
17621782
pub fn http_timeout(&self) -> Duration {
17631783
Duration::from_secs(self.values.http.timeout.into())

relay-dynamic-config/src/feature.rs

+3
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ pub enum Feature {
3333
/// Enable processing profiles
3434
#[serde(rename = "organizations:profiling")]
3535
Profiling,
36+
/// Enable metric metadata.
37+
#[serde(rename = "organizations:metric-meta")]
38+
MetricMeta,
3639

3740
/// Deprecated, still forwarded for older downstream Relays.
3841
#[serde(rename = "organizations:transaction-name-mark-scrubbed-as-sanitized")]

relay-metrics/Cargo.toml

+7-1
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,20 @@ edition = "2021"
99
license-file = "../LICENSE.md"
1010
publish = false
1111

12+
[features]
13+
redis = ["relay-redis/impl"]
14+
1215
[dependencies]
1316
bytecount = "0.6.0"
17+
chrono = { workspace = true }
1418
fnv = "1.0.7"
15-
hash32 = "0.3.1"
19+
hash32 = { workspace = true }
20+
hashbrown = { workspace = true }
1621
itertools = { workspace = true }
1722
relay-base-schema = { path = "../relay-base-schema" }
1823
relay-common = { path = "../relay-common" }
1924
relay-log = { path = "../relay-log" }
25+
relay-redis = { path = "../relay-redis", optional = true }
2026
relay-statsd = { path = "../relay-statsd" }
2127
relay-system = { path = "../relay-system" }
2228
serde = { workspace = true }

relay-metrics/src/bucket.rs

+1-24
Original file line numberDiff line numberDiff line change
@@ -399,29 +399,6 @@ fn parse_gauge(string: &str) -> Option<GaugeValue> {
399399
})
400400
}
401401

402-
/// Parses an MRI from a string and a separate type.
403-
///
404-
/// The given string must be a part of the MRI, including the following components:
405-
/// - (optional) The namespace. If missing, it is defaulted to `"custom"`
406-
/// - (required) The metric name.
407-
/// - (optional) The unit. If missing, it is defaulted to "none".
408-
///
409-
/// The metric type is never part of this string and must be supplied separately.
410-
fn parse_mri(string: &str, ty: MetricType) -> Option<MetricResourceIdentifier<'_>> {
411-
let (name_and_namespace, unit) = protocol::parse_name_unit(string)?;
412-
413-
let (raw_namespace, name) = name_and_namespace
414-
.split_once('/')
415-
.unwrap_or(("custom", name_and_namespace));
416-
417-
Some(MetricResourceIdentifier {
418-
ty,
419-
name: name.into(),
420-
namespace: raw_namespace.parse().ok()?,
421-
unit,
422-
})
423-
}
424-
425402
/// Parses tags in the format `tag1,tag2:value`.
426403
///
427404
/// Tag values are optional. For tags with missing values, an empty `""` value is assumed.
@@ -640,7 +617,7 @@ impl Bucket {
640617
let (mri_str, values_str) = components.next()?.split_once(':')?;
641618
let ty = components.next().and_then(|s| s.parse().ok())?;
642619

643-
let mri = parse_mri(mri_str, ty)?;
620+
let mri = MetricResourceIdentifier::parse_with_type(mri_str, ty).ok()?;
644621
let value = match ty {
645622
MetricType::Counter => BucketValue::Counter(parse_counter(values_str)?),
646623
MetricType::Distribution => BucketValue::Distribution(parse_distribution(values_str)?),

relay-metrics/src/lib.rs

+4
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
)]
7070

7171
pub mod aggregator;
72+
pub mod meta;
7273

7374
mod aggregatorservice;
7475
mod bucket;
@@ -78,5 +79,8 @@ mod statsd;
7879

7980
pub use aggregatorservice::*;
8081
pub use bucket::*;
82+
#[cfg(feature = "redis")]
83+
pub use meta::RedisMetricMetaStore;
84+
pub use meta::{MetaAggregator, MetricMeta};
8185
pub use protocol::*;
8286
pub use router::*;

relay-metrics/src/meta/aggregator.rs

+148
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
use std::{
2+
collections::{HashMap, HashSet},
3+
hash::Hash,
4+
};
5+
6+
use relay_base_schema::project::ProjectKey;
7+
8+
use super::{Item, Location, MetricMeta, StartOfDayUnixTimestamp};
9+
use crate::{statsd::MetricCounters, MetricResourceIdentifier};
10+
11+
/// A metrics meta aggregator.
12+
///
13+
/// Aggregates metric metadata based on their scope (project, mri, timestamp) and
14+
/// only keeps the most relevant entries.
15+
///
16+
/// Currently we track the first N amount of unique metric meta elements we get.
17+
///
18+
/// This should represent the actual adoption rate of different code versions.
19+
///
20+
/// This aggregator is purely in memeory and will lose its state on restart,
21+
/// which may cause multiple different items being emitted after restarts.
22+
/// For this we have de-deuplication in the storage and the volume overall
23+
/// of this happening is small enough to just add it to the storage worst case.
24+
#[derive(Debug)]
25+
pub struct MetaAggregator {
26+
/// All tracked code locations.
27+
locations: hashbrown::HashMap<Scope, HashSet<Location>>,
28+
29+
/// Maximum tracked locations.
30+
max_locations: usize,
31+
}
32+
33+
impl MetaAggregator {
34+
/// Creates a new metrics meta aggregator.
35+
pub fn new(max_locations: usize) -> Self {
36+
Self {
37+
locations: hashbrown::HashMap::new(),
38+
max_locations,
39+
}
40+
}
41+
42+
/// Adds a new meta item to the aggregator.
43+
///
44+
/// Returns a new [`MetricMeta`] element when the element should be stored
45+
/// or sent upstream for storage.
46+
///
47+
/// Returns `None` when the meta item was already seen or is not considered relevant.
48+
pub fn add(&mut self, project_key: ProjectKey, meta: MetricMeta) -> Option<MetricMeta> {
49+
let mut send_upstream = HashMap::new();
50+
51+
for (mri, items) in meta.mapping {
52+
let scope = Scope {
53+
timestamp: meta.timestamp,
54+
project_key,
55+
mri,
56+
};
57+
58+
if let Some(items) = self.add_scoped(&scope, items) {
59+
send_upstream.insert(scope.mri, items);
60+
}
61+
}
62+
63+
if send_upstream.is_empty() {
64+
return None;
65+
}
66+
67+
relay_statsd::metric!(counter(MetricCounters::MetaAggregatorUpdate) += 1);
68+
Some(MetricMeta {
69+
timestamp: meta.timestamp,
70+
mapping: send_upstream,
71+
})
72+
}
73+
74+
/// Retrieves all currently relevant metric meta for a project.
75+
pub fn get_all_relevant(&self, project_key: ProjectKey) -> impl Iterator<Item = MetricMeta> {
76+
let locations = self
77+
.locations
78+
.iter()
79+
.filter(|(scope, _)| scope.project_key == project_key);
80+
81+
let mut result = HashMap::new();
82+
83+
for (scope, locations) in locations {
84+
result
85+
.entry(scope.timestamp)
86+
.or_insert_with(|| MetricMeta {
87+
timestamp: scope.timestamp,
88+
mapping: HashMap::new(),
89+
})
90+
.mapping
91+
.entry(scope.mri.clone()) // This clone sucks
92+
.or_insert_with(Vec::new)
93+
.extend(locations.iter().cloned().map(Item::Location));
94+
}
95+
96+
result.into_values()
97+
}
98+
99+
/// Remove all contained state related to a project.
100+
pub fn clear(&mut self, project_key: ProjectKey) {
101+
self.locations
102+
.retain(|scope, _| scope.project_key != project_key);
103+
}
104+
105+
fn add_scoped(&mut self, scope: &Scope, items: Vec<Item>) -> Option<Vec<Item>> {
106+
// Entry ref needs hashbrown, we would have to clone the scope without or do a separate lookup.
107+
let locations = self.locations.entry_ref(scope).or_default();
108+
let mut send_upstream = Vec::new();
109+
110+
for item in items {
111+
match item {
112+
Item::Location(location) => {
113+
if locations.len() > self.max_locations {
114+
break;
115+
}
116+
117+
if !locations.contains(&location) {
118+
locations.insert(location.clone());
119+
send_upstream.push(Item::Location(location));
120+
}
121+
}
122+
Item::Unknown => {}
123+
}
124+
}
125+
126+
(!send_upstream.is_empty()).then_some(send_upstream)
127+
}
128+
}
129+
130+
/// The metadata scope.
131+
///
132+
/// We scope metadata by project, mri and day,
133+
/// represented as a unix timestamp at the beginning of the day.
134+
///
135+
/// The technical scope (e.g. redis key) also includes the organization id, but this
136+
/// can be inferred from the project.
137+
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
138+
struct Scope {
139+
pub timestamp: StartOfDayUnixTimestamp,
140+
pub project_key: ProjectKey,
141+
pub mri: MetricResourceIdentifier<'static>,
142+
}
143+
144+
impl From<&Scope> for Scope {
145+
fn from(value: &Scope) -> Self {
146+
value.clone()
147+
}
148+
}

relay-metrics/src/meta/mod.rs

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
//! Functionality for aggregating and storing of metrics metadata.
2+
3+
mod aggregator;
4+
mod protocol;
5+
#[cfg(feature = "redis")]
6+
mod redis;
7+
8+
pub use self::aggregator::*;
9+
pub use self::protocol::*;
10+
#[cfg(feature = "redis")]
11+
pub use self::redis::*;

0 commit comments

Comments
 (0)