From a61d0db2fcbcdad99eee7ef1c555a0329f6d7b9c Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Thu, 1 Feb 2024 19:44:04 -0500 Subject: [PATCH 01/17] feat(metrics-summaries): Emit summaries to their own topic --- relay-kafka/src/config.rs | 9 +++- relay-server/src/services/store.rs | 71 +++++++++++++++++++++++++++--- 2 files changed, 72 insertions(+), 8 deletions(-) diff --git a/relay-kafka/src/config.rs b/relay-kafka/src/config.rs index a5f0414ac98..ff9f74ab45d 100644 --- a/relay-kafka/src/config.rs +++ b/relay-kafka/src/config.rs @@ -51,6 +51,8 @@ pub enum KafkaTopic { Monitors, /// Standalone spans without a transaction. Spans, + /// Summary for metrics collected during a span. + MetricsSummaries, } impl KafkaTopic { @@ -58,7 +60,7 @@ impl KafkaTopic { /// It will have to be adjusted if the new variants are added. pub fn iter() -> std::slice::Iter<'static, Self> { use KafkaTopic::*; - static TOPICS: [KafkaTopic; 13] = [ + static TOPICS: [KafkaTopic; 14] = [ Events, Attachments, Transactions, @@ -72,6 +74,7 @@ impl KafkaTopic { ReplayRecordings, Monitors, Spans, + MetricsSummaries, ]; TOPICS.iter() } @@ -111,6 +114,8 @@ pub struct TopicAssignments { pub monitors: TopicAssignment, /// Standalone spans without a transaction. pub spans: TopicAssignment, + /// Summary for metrics collected during a span. + pub metrics_summaries: TopicAssignment, } impl TopicAssignments { @@ -131,6 +136,7 @@ impl TopicAssignments { KafkaTopic::ReplayRecordings => &self.replay_recordings, KafkaTopic::Monitors => &self.monitors, KafkaTopic::Spans => &self.spans, + KafkaTopic::MetricsSummaries => &self.metrics_summaries, } } } @@ -152,6 +158,7 @@ impl Default for TopicAssignments { replay_recordings: "ingest-replay-recordings".to_owned().into(), monitors: "ingest-monitors".to_owned().into(), spans: "snuba-spans".to_owned().into(), + metrics_summaries: "snuba-metrics-summaries".to_owned().into(), } } } diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 36ae64d3629..00b0a8a543b 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -871,6 +871,12 @@ impl StoreService { } }; + span.duration_ms = ((span.end_timestamp - span.start_timestamp) * 1e3) as u32; + span.event_id = event_id; + span.project_id = scoping.project_id.value(); + span.retention_days = retention_days; + span.start_timestamp_ms = (span.start_timestamp * 1e3) as u64; + if let Some(measurements) = &mut span.measurements { measurements.retain(|_, v| { v.as_ref() @@ -896,13 +902,38 @@ impl StoreService { false } }); - } - span.duration_ms = ((span.end_timestamp - span.start_timestamp) * 1e3) as u32; - span.event_id = event_id; - span.project_id = scoping.project_id.value(); - span.retention_days = retention_days; - span.start_timestamp_ms = (span.start_timestamp * 1e3) as u64; + let sentry_tags = span.sentry_tags.unwrap_or_default(); + let group: u64 = sentry_tags + .get("group") + .map(|group| u64::from_str_radix(group, 16).unwrap_or_default()) + .unwrap_or_default(); + + for (mri, summary) in metrics_summary.value().as_ref() { + self.produce( + KafkaTopic::MetricsSummaries, + scoping.organization_id, + KafkaMessage::MetricsSummary(MetricsSummaryKafkaMessage { + count: summary.count, + duration_ms: span.duration_ms, + end_timestamp: span.end_timestamp, + group: span.group, + is_segment: span.is_segment, + max: summary.max, + mri, + min: summary.min, + project_id: span.project_id, + retention_days: span.retention_days, + segment_id: span.segment_id, + span_id: span.span_id, + sum: summary.sum, + tags: summary.tags, + trace_id: span.trace_id, + }), + ); + } + span.metrics_summary = None; + } self.produce( KafkaTopic::Spans, @@ -1278,6 +1309,25 @@ struct SpanKafkaMessage<'a> { trace_id: &'a str, } +#[derive(Debug, Deserialize, Serialize)] +struct MetricsSummaryKafkaMessage<'a> { + count: u64, + duration_ms: u32, + end_timestamp: f64, + group: &'a str, + is_segment: bool, + max: f64, + mri: &'a str, + min: f64, + project_id: u64, + retention_days: u16, + segment_id: &'a str, + span_id: &'a str, + sum: f64, + tags: BTreeMap<&'a str, String>, + trace_id: &'a str, +} + /// An enum over all possible ingest messages. #[derive(Debug, Serialize)] #[serde(tag = "type", rename_all = "snake_case")] @@ -1299,6 +1349,7 @@ enum KafkaMessage<'a> { ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage), CheckIn(CheckInKafkaMessage), Span(SpanKafkaMessage<'a>), + MetricsSummary(MetricsSummaryKafkaMessage<'a>), } impl Message for KafkaMessage<'_> { @@ -1315,6 +1366,7 @@ impl Message for KafkaMessage<'_> { KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked", KafkaMessage::CheckIn(_) => "check_in", KafkaMessage::Span(_) => "span", + KafkaMessage::MetricsSummary(_) => "metrics_summary", } } @@ -1337,7 +1389,8 @@ impl Message for KafkaMessage<'_> { Self::Session(_) | Self::Profile(_) | Self::ReplayRecordingNotChunked(_) - | Self::Span(_) => Uuid::nil(), + | Self::Span(_) + | Self::MetricsSummary(_) => Uuid::nil(), // TODO(ja): Determine a partitioning key Self::Metric { .. } => Uuid::nil(), @@ -1383,6 +1436,10 @@ impl Message for KafkaMessage<'_> { KafkaMessage::Span(message) => { serde_json::to_vec(message).map_err(ClientError::InvalidJson) } + KafkaMessage::MetricsSummary(message) => { + serde_json::to_vec(message).map_err(ClientError::InvalidJson) + } + _ => rmp_serde::to_vec_named(&self).map_err(ClientError::InvalidMsgPack), } } From 4f549c95b0a3f006fae2d3df6816432e7c62576f Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Fri, 2 Feb 2024 10:27:12 -0500 Subject: [PATCH 02/17] Loop through all summaries properly --- relay-server/src/services/store.rs | 69 +++++++++++++++++------------- 1 file changed, 39 insertions(+), 30 deletions(-) diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 00b0a8a543b..e95a8660897 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -903,34 +903,43 @@ impl StoreService { } }); - let sentry_tags = span.sentry_tags.unwrap_or_default(); - let group: u64 = sentry_tags - .get("group") - .map(|group| u64::from_str_radix(group, 16).unwrap_or_default()) - .unwrap_or_default(); - - for (mri, summary) in metrics_summary.value().as_ref() { - self.produce( - KafkaTopic::MetricsSummaries, - scoping.organization_id, - KafkaMessage::MetricsSummary(MetricsSummaryKafkaMessage { - count: summary.count, - duration_ms: span.duration_ms, - end_timestamp: span.end_timestamp, - group: span.group, - is_segment: span.is_segment, - max: summary.max, - mri, - min: summary.min, - project_id: span.project_id, - retention_days: span.retention_days, - segment_id: span.segment_id, - span_id: span.span_id, - sum: summary.sum, - tags: summary.tags, - trace_id: span.trace_id, - }), - ); + let group = span + .sentry_tags + .as_ref() + .and_then(|sentry_tags| sentry_tags.get("group")) + .unwrap_or(&""); + + for (mri, summaries) in metrics_summary { + let Some(summaries) = summaries else { + continue; + }; + for summary in summaries { + let Some(summary) = summary else { + continue; + }; + // Ignore immedate errors on produce. + let _ = self.produce( + KafkaTopic::MetricsSummaries, + scoping.organization_id, + KafkaMessage::MetricsSummary(MetricsSummaryKafkaMessage { + count: summary.count.unwrap_or_default(), + duration_ms: span.duration_ms, + end_timestamp: span.end_timestamp, + group, + is_segment: span.is_segment, + max: summary.max.unwrap_or_default(), + mri, + min: summary.min.unwrap_or_default(), + project_id: span.project_id, + retention_days: span.retention_days, + segment_id: span.segment_id.unwrap_or_default(), + span_id: span.span_id, + sum: summary.sum.unwrap_or_default(), + tags: summary.tags.clone().unwrap_or_default(), + trace_id: span.trace_id, + }), + ); + } } span.metrics_summary = None; } @@ -1300,7 +1309,7 @@ struct SpanKafkaMessage<'a> { #[serde(default, skip_serializing_if = "Option::is_none")] segment_id: Option<&'a str>, #[serde(default, skip_serializing_if = "Option::is_none")] - sentry_tags: Option<&'a RawValue>, + sentry_tags: Option>, span_id: &'a str, #[serde(default)] start_timestamp_ms: u64, @@ -1324,7 +1333,7 @@ struct MetricsSummaryKafkaMessage<'a> { segment_id: &'a str, span_id: &'a str, sum: f64, - tags: BTreeMap<&'a str, String>, + tags: BTreeMap, trace_id: &'a str, } From e093dee6ee6863fd880e812c9b26c8e6dadc4df2 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Fri, 2 Feb 2024 10:28:01 -0500 Subject: [PATCH 03/17] Add a CHANGELOG entry --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d36377c8e5f..7ac5525e9d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ - Drop spans ending outside the valid timestamp range. ([#3013](https://github.com/getsentry/relay/pull/3013)) - Extract INP metrics from spans. ([#2969](https://github.com/getsentry/relay/pull/2969)) - Add ability to rate limit metric buckets by namespace. ([#2941](https://github.com/getsentry/relay/pull/2941)) +- Push metrics summaries to their own topic. ([#3045](https://github.com/getsentry/relay/pull/3045)) ## 24.1.1 From 9f43434f72873e815aec22e49d67636d1e1c9cdb Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Fri, 2 Feb 2024 15:25:06 -0500 Subject: [PATCH 04/17] Make sure metrics summaries are not on the span anymore --- tests/integration/test_store.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/integration/test_store.py b/tests/integration/test_store.py index 98fd3a693a0..29dd53ee6b1 100644 --- a/tests/integration/test_store.py +++ b/tests/integration/test_store.py @@ -1809,7 +1809,6 @@ def test_span_extraction_with_ddm( start_timestamp.replace(tzinfo=timezone.utc).timestamp() * 1e3 ), "trace_id": "a0fa8803753e40fd8124b21eeb2986b5", - "_metrics_summary": metrics_summary, } spans_consumer.assert_empty() @@ -1877,7 +1876,6 @@ def test_span_extraction_with_ddm_missing_values( start_timestamp.replace(tzinfo=timezone.utc).timestamp() * 1e3 ), "trace_id": "a0fa8803753e40fd8124b21eeb2986b5", - "_metrics_summary": metrics_summary, "measurements": {}, } From 02a5777af4e8b79531f0f066346ddd5b7e123273 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Tue, 6 Feb 2024 18:04:05 -0500 Subject: [PATCH 05/17] Log error when we produce a metrics summary to Kafka --- relay-server/src/services/store.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index e95a8660897..77b7bb235f6 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -917,8 +917,8 @@ impl StoreService { let Some(summary) = summary else { continue; }; - // Ignore immedate errors on produce. - let _ = self.produce( + // Ignore immediate errors on produce. + if let Err(error) = self.produce( KafkaTopic::MetricsSummaries, scoping.organization_id, KafkaMessage::MetricsSummary(MetricsSummaryKafkaMessage { @@ -938,7 +938,12 @@ impl StoreService { tags: summary.tags.clone().unwrap_or_default(), trace_id: span.trace_id, }), - ); + ) { + relay_log::error!( + error = &error as &dyn std::error::Error, + "failed to push metrics summary to kafka", + ); + } } } span.metrics_summary = None; From c4fec7a8ec9839f5de5d2873bb31dc01ea932335 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Tue, 6 Feb 2024 18:19:06 -0500 Subject: [PATCH 06/17] Make the Sentry tag value a String to avoid problems with special characters --- relay-server/src/services/store.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 77b7bb235f6..db65654a5d0 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -906,8 +906,8 @@ impl StoreService { let group = span .sentry_tags .as_ref() - .and_then(|sentry_tags| sentry_tags.get("group")) - .unwrap_or(&""); + .and_then(|sentry_tags| sentry_tags.get("group").cloned()) + .unwrap_or_default(); for (mri, summaries) in metrics_summary { let Some(summaries) = summaries else { @@ -925,7 +925,7 @@ impl StoreService { count: summary.count.unwrap_or_default(), duration_ms: span.duration_ms, end_timestamp: span.end_timestamp, - group, + group: &group, is_segment: span.is_segment, max: summary.max.unwrap_or_default(), mri, @@ -1314,7 +1314,7 @@ struct SpanKafkaMessage<'a> { #[serde(default, skip_serializing_if = "Option::is_none")] segment_id: Option<&'a str>, #[serde(default, skip_serializing_if = "Option::is_none")] - sentry_tags: Option>, + sentry_tags: Option>, span_id: &'a str, #[serde(default)] start_timestamp_ms: u64, From adf09c7dc6409a6e44b7c1031495b97b2681ef0a Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Tue, 6 Feb 2024 18:33:58 -0500 Subject: [PATCH 07/17] Set values as optional as the consumer handles missing values --- relay-server/src/services/store.rs | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index db65654a5d0..d90b1a7242a 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -922,20 +922,20 @@ impl StoreService { KafkaTopic::MetricsSummaries, scoping.organization_id, KafkaMessage::MetricsSummary(MetricsSummaryKafkaMessage { - count: summary.count.unwrap_or_default(), + count: summary.count, duration_ms: span.duration_ms, end_timestamp: span.end_timestamp, group: &group, is_segment: span.is_segment, - max: summary.max.unwrap_or_default(), + max: summary.max, mri, - min: summary.min.unwrap_or_default(), + min: summary.min, project_id: span.project_id, retention_days: span.retention_days, segment_id: span.segment_id.unwrap_or_default(), span_id: span.span_id, - sum: summary.sum.unwrap_or_default(), - tags: summary.tags.clone().unwrap_or_default(), + sum: summary.sum, + tags: summary.tags.clone(), trace_id: span.trace_id, }), ) { @@ -1325,21 +1325,27 @@ struct SpanKafkaMessage<'a> { #[derive(Debug, Deserialize, Serialize)] struct MetricsSummaryKafkaMessage<'a> { - count: u64, duration_ms: u32, end_timestamp: f64, group: &'a str, is_segment: bool, - max: f64, mri: &'a str, - min: f64, project_id: u64, retention_days: u16, segment_id: &'a str, span_id: &'a str, - sum: f64, - tags: BTreeMap, trace_id: &'a str, + + #[serde(default, skip_serializing_if = "Option::is_none")] + count: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + max: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + min: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + sum: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + tags: Option>, } /// An enum over all possible ingest messages. From bcb4c3dcbe9fbfd57c9880697513f7081905e9ea Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Tue, 6 Feb 2024 18:52:01 -0500 Subject: [PATCH 08/17] Make tags a borrowable value --- relay-server/src/services/store.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index d90b1a7242a..2182a8d1828 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -935,7 +935,7 @@ impl StoreService { segment_id: span.segment_id.unwrap_or_default(), span_id: span.span_id, sum: summary.sum, - tags: summary.tags.clone(), + tags: summary.tags, trace_id: span.trace_id, }), ) { @@ -1257,7 +1257,7 @@ struct SpanMeasurement { } #[derive(Debug, Deserialize, Serialize)] -struct SpanMetricsSummary { +struct SpanMetricsSummary<'a> { #[serde(default, skip_serializing_if = "Option::is_none")] count: Option, #[serde(default, skip_serializing_if = "Option::is_none")] @@ -1266,11 +1266,11 @@ struct SpanMetricsSummary { min: Option, #[serde(default, skip_serializing_if = "Option::is_none")] sum: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - tags: Option>, + #[serde(borrow, default, skip_serializing_if = "Option::is_none")] + tags: Option<&'a RawValue>, } -type SpanMetricsSummaries = Vec>; +type SpanMetricsSummaries<'a> = Vec>>; #[derive(Debug, Deserialize, Serialize)] struct SpanKafkaMessage<'a> { @@ -1298,7 +1298,7 @@ struct SpanKafkaMessage<'a> { rename = "_metrics_summary", skip_serializing_if = "Option::is_none" )] - metrics_summary: Option, Option>>, + metrics_summary: Option, Option>>>, #[serde(default, skip_serializing_if = "Option::is_none")] parent_span_id: Option<&'a str>, #[serde(default, skip_serializing_if = "Option::is_none")] @@ -1345,7 +1345,7 @@ struct MetricsSummaryKafkaMessage<'a> { #[serde(default, skip_serializing_if = "Option::is_none")] sum: Option, #[serde(default, skip_serializing_if = "Option::is_none")] - tags: Option>, + tags: Option<&'a RawValue>, } /// An enum over all possible ingest messages. From bed5657ec4a2db29cf8836c3e26e87027dd45409 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Fri, 9 Feb 2024 10:29:35 -0700 Subject: [PATCH 09/17] Move metrics summaries processing to their own function and helper struct --- relay-server/src/services/store.rs | 177 +++++++++++++++++------------ 1 file changed, 107 insertions(+), 70 deletions(-) diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 2182a8d1828..45e528d8a7c 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -885,69 +885,7 @@ impl StoreService { }); } - if let Some(metrics_summary) = &mut span.metrics_summary { - metrics_summary.retain(|_, mut v| { - if let Some(v) = &mut v { - v.retain(|v| { - if let Some(v) = v { - return v.min.is_some() - || v.max.is_some() - || v.sum.is_some() - || v.count.is_some(); - } - false - }); - !v.is_empty() - } else { - false - } - }); - - let group = span - .sentry_tags - .as_ref() - .and_then(|sentry_tags| sentry_tags.get("group").cloned()) - .unwrap_or_default(); - - for (mri, summaries) in metrics_summary { - let Some(summaries) = summaries else { - continue; - }; - for summary in summaries { - let Some(summary) = summary else { - continue; - }; - // Ignore immediate errors on produce. - if let Err(error) = self.produce( - KafkaTopic::MetricsSummaries, - scoping.organization_id, - KafkaMessage::MetricsSummary(MetricsSummaryKafkaMessage { - count: summary.count, - duration_ms: span.duration_ms, - end_timestamp: span.end_timestamp, - group: &group, - is_segment: span.is_segment, - max: summary.max, - mri, - min: summary.min, - project_id: span.project_id, - retention_days: span.retention_days, - segment_id: span.segment_id.unwrap_or_default(), - span_id: span.span_id, - sum: summary.sum, - tags: summary.tags, - trace_id: span.trace_id, - }), - ) { - relay_log::error!( - error = &error as &dyn std::error::Error, - "failed to push metrics summary to kafka", - ); - } - } - } - span.metrics_summary = None; - } + self.produce_metrics_summary(scoping, start_time, item, &span); self.produce( KafkaTopic::Spans, @@ -972,6 +910,101 @@ impl StoreService { Ok(()) } + + fn produce_metrics_summary( + &self, + scoping: Scoping, + start_time: Instant, + item: &Item, + span: &SpanKafkaMessage, + ) { + let payload = item.payload(); + let d = &mut Deserializer::from_slice(&payload); + let mut metrics_summary: SpanWithMetricsSummary = match serde_path_to_error::deserialize(d) + { + Ok(span) => span, + Err(error) => { + relay_log::error!( + error = &error as &dyn std::error::Error, + "failed to parse span" + ); + self.outcome_aggregator.send(TrackOutcome { + category: DataCategory::SpanIndexed, + event_id: None, + outcome: Outcome::Invalid(DiscardReason::InvalidSpan), + quantity: 1, + remote_addr: None, + scoping, + timestamp: instant_to_date_time(start_time), + }); + return; + } + }; + let Some(metrics_summary) = &mut metrics_summary.metrics_summary else { + return; + }; + + metrics_summary.retain(|_, mut v| { + if let Some(v) = &mut v { + v.retain(|v| { + if let Some(v) = v { + return v.min.is_some() + || v.max.is_some() + || v.sum.is_some() + || v.count.is_some(); + } + false + }); + !v.is_empty() + } else { + false + } + }); + + let group = span + .sentry_tags + .as_ref() + .and_then(|sentry_tags| sentry_tags.get("group")) + .map_or("", String::as_str); + + for (mri, summaries) in metrics_summary { + let Some(summaries) = summaries else { + continue; + }; + for summary in summaries { + let Some(summary) = summary else { + continue; + }; + // Ignore immediate errors on produce. + if let Err(error) = self.produce( + KafkaTopic::MetricsSummaries, + scoping.organization_id, + KafkaMessage::MetricsSummary(MetricsSummaryKafkaMessage { + count: summary.count, + duration_ms: span.duration_ms, + end_timestamp: span.end_timestamp, + group, + is_segment: span.is_segment, + max: summary.max, + mri, + min: summary.min, + project_id: span.project_id, + retention_days: span.retention_days, + segment_id: span.segment_id.unwrap_or_default(), + span_id: span.span_id, + sum: summary.sum, + tags: summary.tags, + trace_id: span.trace_id, + }), + ) { + relay_log::error!( + error = &error as &dyn std::error::Error, + "failed to push metrics summary to kafka", + ); + } + } + } + } } impl Service for StoreService { @@ -1292,13 +1325,6 @@ struct SpanKafkaMessage<'a> { #[serde(borrow, default, skip_serializing_if = "Option::is_none")] measurements: Option, Option>>, - #[serde( - borrow, - default, - rename = "_metrics_summary", - skip_serializing_if = "Option::is_none" - )] - metrics_summary: Option, Option>>>, #[serde(default, skip_serializing_if = "Option::is_none")] parent_span_id: Option<&'a str>, #[serde(default, skip_serializing_if = "Option::is_none")] @@ -1323,6 +1349,17 @@ struct SpanKafkaMessage<'a> { trace_id: &'a str, } +#[derive(Debug, Deserialize)] +struct SpanWithMetricsSummary<'a> { + #[serde( + borrow, + default, + rename(deserialize = "_metrics_summary"), + skip_serializing_if = "Option::is_none" + )] + metrics_summary: Option, Option>>>, +} + #[derive(Debug, Deserialize, Serialize)] struct MetricsSummaryKafkaMessage<'a> { duration_ms: u32, From cb72014cd3f8dd411b93d8e141f86afbf63c1ab8 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Fri, 9 Feb 2024 10:43:45 -0700 Subject: [PATCH 10/17] Destructure all the things! --- .../src/services/processor/span/processing.rs | 4 +- relay-server/src/services/store.rs | 46 +++++++++++++------ 2 files changed, 34 insertions(+), 16 deletions(-) diff --git a/relay-server/src/services/processor/span/processing.rs b/relay-server/src/services/processor/span/processing.rs index e3885fe56de..e0998316dca 100644 --- a/relay-server/src/services/processor/span/processing.rs +++ b/relay-server/src/services/processor/span/processing.rs @@ -293,7 +293,7 @@ fn normalize( let NormalizeSpanConfig { received_at, - timestamp_range: timestmap_range, + timestamp_range, max_tag_value_size, performance_score, measurements, @@ -317,7 +317,7 @@ fn normalize( )?; if let Some(span) = annotated_span.value() { - validate_span(span, Some(×tmap_range))?; + validate_span(span, Some(×tamp_range))?; } process_value( annotated_span, diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 45e528d8a7c..fcdf03de2d6 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -943,6 +943,17 @@ impl StoreService { let Some(metrics_summary) = &mut metrics_summary.metrics_summary else { return; }; + let &SpanKafkaMessage { + end_timestamp, + duration_ms, + is_segment, + project_id, + segment_id, + retention_days, + span_id, + trace_id, + .. + } = span; metrics_summary.retain(|_, mut v| { if let Some(v) = &mut v { @@ -972,7 +983,14 @@ impl StoreService { continue; }; for summary in summaries { - let Some(summary) = summary else { + let &mut Some(SpanMetricsSummary { + count, + max, + min, + sum, + tags, + }) = summary + else { continue; }; // Ignore immediate errors on produce. @@ -980,21 +998,21 @@ impl StoreService { KafkaTopic::MetricsSummaries, scoping.organization_id, KafkaMessage::MetricsSummary(MetricsSummaryKafkaMessage { - count: summary.count, - duration_ms: span.duration_ms, - end_timestamp: span.end_timestamp, + count, + duration_ms, + end_timestamp, group, - is_segment: span.is_segment, - max: summary.max, + is_segment, + max, + min, mri, - min: summary.min, - project_id: span.project_id, - retention_days: span.retention_days, - segment_id: span.segment_id.unwrap_or_default(), - span_id: span.span_id, - sum: summary.sum, - tags: summary.tags, - trace_id: span.trace_id, + project_id, + retention_days, + segment_id: segment_id.unwrap_or_default(), + span_id, + sum, + tags, + trace_id, }), ) { relay_log::error!( From dbd4cea46a8326f2e5dac32f01e16dae220e8713 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Fri, 9 Feb 2024 11:10:48 -0700 Subject: [PATCH 11/17] Remove borrow where appropriate and make a saner code overall --- relay-server/src/services/store.rs | 67 ++++++++++++++---------------- 1 file changed, 31 insertions(+), 36 deletions(-) diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index fcdf03de2d6..99b37101695 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -983,7 +983,7 @@ impl StoreService { continue; }; for summary in summaries { - let &mut Some(SpanMetricsSummary { + let Some(SpanMetricsSummary { count, max, min, @@ -1307,22 +1307,6 @@ struct SpanMeasurement { value: Option, } -#[derive(Debug, Deserialize, Serialize)] -struct SpanMetricsSummary<'a> { - #[serde(default, skip_serializing_if = "Option::is_none")] - count: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - max: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - min: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - sum: Option, - #[serde(borrow, default, skip_serializing_if = "Option::is_none")] - tags: Option<&'a RawValue>, -} - -type SpanMetricsSummaries<'a> = Vec>>; - #[derive(Debug, Deserialize, Serialize)] struct SpanKafkaMessage<'a> { #[serde(skip_serializing)] @@ -1368,17 +1352,28 @@ struct SpanKafkaMessage<'a> { } #[derive(Debug, Deserialize)] -struct SpanWithMetricsSummary<'a> { - #[serde( - borrow, - default, - rename(deserialize = "_metrics_summary"), - skip_serializing_if = "Option::is_none" - )] - metrics_summary: Option, Option>>>, +struct SpanMetricsSummary { + #[serde(default)] + count: Option, + #[serde(default)] + max: Option, + #[serde(default)] + min: Option, + #[serde(default)] + sum: Option, + #[serde(default)] + tags: BTreeMap, +} + +type SpanMetricsSummaries = Vec>; + +#[derive(Debug, Deserialize)] +struct SpanWithMetricsSummary { + #[serde(default, rename(deserialize = "_metrics_summary"))] + metrics_summary: Option>>, } -#[derive(Debug, Deserialize, Serialize)] +#[derive(Debug, Serialize)] struct MetricsSummaryKafkaMessage<'a> { duration_ms: u32, end_timestamp: f64, @@ -1391,16 +1386,16 @@ struct MetricsSummaryKafkaMessage<'a> { span_id: &'a str, trace_id: &'a str, - #[serde(default, skip_serializing_if = "Option::is_none")] - count: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - max: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - min: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - sum: Option, - #[serde(default, skip_serializing_if = "Option::is_none")] - tags: Option<&'a RawValue>, + #[serde(skip_serializing_if = "Option::is_none")] + count: &'a Option, + #[serde(skip_serializing_if = "Option::is_none")] + max: &'a Option, + #[serde(skip_serializing_if = "Option::is_none")] + min: &'a Option, + #[serde(skip_serializing_if = "Option::is_none")] + sum: &'a Option, + #[serde(skip_serializing_if = "BTreeMap::is_empty")] + tags: &'a BTreeMap, } /// An enum over all possible ingest messages. From 16c70442fa8d050e1a444e99447a588810c6a43f Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Fri, 9 Feb 2024 19:24:55 -0700 Subject: [PATCH 12/17] Verify we're getting the summary on the right topic --- tests/integration/conftest.py | 1 + tests/integration/fixtures/processing.py | 27 ++++++++++++++++++++++++ tests/integration/test_store.py | 10 +++++++-- 3 files changed, 36 insertions(+), 2 deletions(-) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 654379b2f7d..708581abb65 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -30,6 +30,7 @@ monitors_consumer, spans_consumer, profiles_consumer, + metrics_summaries_consumer, ) diff --git a/tests/integration/fixtures/processing.py b/tests/integration/fixtures/processing.py index cc15d6ea004..8af4f2fd76d 100644 --- a/tests/integration/fixtures/processing.py +++ b/tests/integration/fixtures/processing.py @@ -57,6 +57,7 @@ def inner(options=None): "monitors": get_topic_name("monitors"), "spans": get_topic_name("spans"), "profiles": get_topic_name("profiles"), + "metrics_summaries": get_topic_name("metrics_summaries"), } if not processing.get("redis"): @@ -315,6 +316,13 @@ def profiles_consumer(kafka_consumer): return lambda: ProfileConsumer(*kafka_consumer("profiles")) +@pytest.fixture +def metrics_summaries_consumer(kafka_consumer): + return lambda timeout=None: MetricsSummariesConsumer( + timeout=timeout, *kafka_consumer("metrics_summaries") + ) + + class MetricsConsumer(ConsumerBase): def get_metric(self, timeout=None): message = self.poll(timeout=timeout) @@ -469,3 +477,22 @@ def get_profile(self): assert message.error() is None return msgpack.loads(message.value()), message.headers() + + +class MetricsSummariesConsumer(ConsumerBase): + def get_metrics_summary(self): + message = self.poll() + assert message is not None + assert message.error() is None + + return json.loads(message.value()) + + def get_metrics_summaries(self, timeout=None, max_attempts=100): + for _ in range(max_attempts): + message = self.poll(timeout=timeout) + + if message is None: + return + else: + assert message.error() is None + yield json.loads(message.value()) diff --git a/tests/integration/test_store.py b/tests/integration/test_store.py index 74d37133ee4..4ec0c6d5bdd 100644 --- a/tests/integration/test_store.py +++ b/tests/integration/test_store.py @@ -1812,12 +1812,14 @@ def test_span_ingestion( ] -def test_span_extraction_with_ddm( +def test_span_extraction_with_metrics_summary( mini_sentry, relay_with_processing, spans_consumer, + metrics_summaries_consumer, ): spans_consumer = spans_consumer() + metrics_summaries_consumer = metrics_summaries_consumer() relay = relay_with_processing() project_id = 42 @@ -1827,8 +1829,9 @@ def test_span_extraction_with_ddm( ] event = make_transaction({"event_id": "cbf6960622e14a45abc1f03b2055b186"}) + mri = "c:spans/some_metric@none" metrics_summary = { - "c:spans/some_metric@none": [ + mri: [ { "min": 1.0, "max": 2.0, @@ -1868,6 +1871,9 @@ def test_span_extraction_with_ddm( } spans_consumer.assert_empty() + metrics_summary = metrics_summaries_consumer.get_metrics_summary() + + assert metrics_summary["mri"] == mri def test_span_extraction_with_ddm_missing_values( From 45902155343b0f17c1fc9d82e11ed65f5a11cf8b Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Mon, 12 Feb 2024 09:02:07 -0700 Subject: [PATCH 13/17] Send the received timestamp --- relay-server/src/services/store.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 99b37101695..307f5ec564c 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -944,12 +944,13 @@ impl StoreService { return; }; let &SpanKafkaMessage { - end_timestamp, duration_ms, + end_timestamp, is_segment, project_id, - segment_id, + received, retention_days, + segment_id, span_id, trace_id, .. @@ -1007,6 +1008,7 @@ impl StoreService { min, mri, project_id, + received, retention_days, segment_id: segment_id.unwrap_or_default(), span_id, @@ -1381,6 +1383,7 @@ struct MetricsSummaryKafkaMessage<'a> { is_segment: bool, mri: &'a str, project_id: u64, + received: f64, retention_days: u16, segment_id: &'a str, span_id: &'a str, From fab864744f89f3d37bbd15c7fb04995295584a38 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Tue, 13 Feb 2024 13:09:20 -0700 Subject: [PATCH 14/17] Inline summary validation in for loop --- relay-server/src/services/store.rs | 24 ++++++------------------ 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 307f5ec564c..676a6a06dcd 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -955,24 +955,6 @@ impl StoreService { trace_id, .. } = span; - - metrics_summary.retain(|_, mut v| { - if let Some(v) = &mut v { - v.retain(|v| { - if let Some(v) = v { - return v.min.is_some() - || v.max.is_some() - || v.sum.is_some() - || v.count.is_some(); - } - false - }); - !v.is_empty() - } else { - false - } - }); - let group = span .sentry_tags .as_ref() @@ -994,6 +976,12 @@ impl StoreService { else { continue; }; + + // If none of the values are there, the summary is invalid. + if count.is_none() && max.is_none() && min.is_none() && sum.is_none() { + continue; + } + // Ignore immediate errors on produce. if let Err(error) = self.produce( KafkaTopic::MetricsSummaries, From 91e71c1bd8688a7715e60c3fb4bc377216b23fd3 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Tue, 13 Feb 2024 13:10:43 -0700 Subject: [PATCH 15/17] Remove outcome since we'll still emit a valid one for the span --- relay-server/src/services/store.rs | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 676a6a06dcd..8a5b1196535 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -926,17 +926,8 @@ impl StoreService { Err(error) => { relay_log::error!( error = &error as &dyn std::error::Error, - "failed to parse span" + "failed to parse metrics summary of span" ); - self.outcome_aggregator.send(TrackOutcome { - category: DataCategory::SpanIndexed, - event_id: None, - outcome: Outcome::Invalid(DiscardReason::InvalidSpan), - quantity: 1, - remote_addr: None, - scoping, - timestamp: instant_to_date_time(start_time), - }); return; } }; From d75ff60d65d88c71d69b3fe600a5a35beeee2db9 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Tue, 13 Feb 2024 13:21:44 -0700 Subject: [PATCH 16/17] Remove unneeded argument --- relay-server/src/services/store.rs | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 8a5b1196535..a81820ed63a 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -885,7 +885,7 @@ impl StoreService { }); } - self.produce_metrics_summary(scoping, start_time, item, &span); + self.produce_metrics_summary(scoping, item, &span); self.produce( KafkaTopic::Spans, @@ -911,13 +911,7 @@ impl StoreService { Ok(()) } - fn produce_metrics_summary( - &self, - scoping: Scoping, - start_time: Instant, - item: &Item, - span: &SpanKafkaMessage, - ) { + fn produce_metrics_summary(&self, scoping: Scoping, item: &Item, span: &SpanKafkaMessage) { let payload = item.payload(); let d = &mut Deserializer::from_slice(&payload); let mut metrics_summary: SpanWithMetricsSummary = match serde_path_to_error::deserialize(d) From bff8cd161c3206e2844e321033aaa4043e081fbd Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Tue, 13 Feb 2024 16:33:22 -0700 Subject: [PATCH 17/17] Validate count value is not 0 --- relay-server/src/services/store.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index a81820ed63a..236a336197e 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -962,8 +962,16 @@ impl StoreService { continue; }; + let &mut Some(count) = count else { + continue; + }; + + if count == 0 { + continue; + } + // If none of the values are there, the summary is invalid. - if count.is_none() && max.is_none() && min.is_none() && sum.is_none() { + if max.is_none() && min.is_none() && sum.is_none() { continue; } @@ -1362,8 +1370,7 @@ struct MetricsSummaryKafkaMessage<'a> { span_id: &'a str, trace_id: &'a str, - #[serde(skip_serializing_if = "Option::is_none")] - count: &'a Option, + count: u64, #[serde(skip_serializing_if = "Option::is_none")] max: &'a Option, #[serde(skip_serializing_if = "Option::is_none")]