Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics-summaries): Emit summaries to their own topic #3045

Merged
merged 22 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a61d0db
feat(metrics-summaries): Emit summaries to their own topic
phacops Feb 2, 2024
4f549c9
Loop through all summaries properly
phacops Feb 2, 2024
e093dee
Add a CHANGELOG entry
phacops Feb 2, 2024
9f43434
Make sure metrics summaries are not on the span anymore
phacops Feb 2, 2024
02a5777
Log error when we produce a metrics summary to Kafka
phacops Feb 6, 2024
c4fec7a
Make the Sentry tag value a String to avoid problems with special cha…
phacops Feb 6, 2024
adf09c7
Set values as optional as the consumer handles missing values
phacops Feb 6, 2024
bcb4c3d
Make tags a borrowable value
phacops Feb 6, 2024
39c9dbc
Merge branch 'master' into pierre/metrics-summaries-push-to-separate-…
phacops Feb 6, 2024
bed5657
Move metrics summaries processing to their own function and helper st…
phacops Feb 9, 2024
cb72014
Destructure all the things!
phacops Feb 9, 2024
dbd4cea
Remove borrow where appropriate and make a saner code overall
phacops Feb 9, 2024
b20d3cc
Merge branch 'master' into pierre/metrics-summaries-push-to-separate-…
phacops Feb 9, 2024
3397b4d
Merge branch 'master' into pierre/metrics-summaries-push-to-separate-…
phacops Feb 10, 2024
16c7044
Verify we're getting the summary on the right topic
phacops Feb 10, 2024
4590215
Send the received timestamp
phacops Feb 12, 2024
91d83e4
Merge branch 'master' into pierre/metrics-summaries-push-to-separate-…
phacops Feb 12, 2024
fab8647
Inline summary validation in for loop
phacops Feb 13, 2024
91e71c1
Remove outcome since we'll still emit a valid one for the span
phacops Feb 13, 2024
e9a0788
Merge branch 'master' into pierre/metrics-summaries-push-to-separate-…
phacops Feb 13, 2024
d75ff60
Remove unneeded argument
phacops Feb 13, 2024
bff8cd1
Validate count value is not 0
phacops Feb 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
**Internal**:

- Set the span op on segments. ([#3082](https://github.com/getsentry/relay/pull/3082))
- Push metrics summaries to their own topic. ([#3045](https://github.com/getsentry/relay/pull/3045))

## 24.1.2

Expand Down
9 changes: 8 additions & 1 deletion relay-kafka/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,16 @@ pub enum KafkaTopic {
Monitors,
/// Standalone spans without a transaction.
Spans,
/// Summary for metrics collected during a span.
MetricsSummaries,
}

impl KafkaTopic {
/// Returns iterator over the variants of [`KafkaTopic`].
/// It will have to be adjusted if the new variants are added.
pub fn iter() -> std::slice::Iter<'static, Self> {
use KafkaTopic::*;
static TOPICS: [KafkaTopic; 13] = [
static TOPICS: [KafkaTopic; 14] = [
Events,
Attachments,
Transactions,
Expand All @@ -72,6 +74,7 @@ impl KafkaTopic {
ReplayRecordings,
Monitors,
Spans,
MetricsSummaries,
];
TOPICS.iter()
}
Expand Down Expand Up @@ -111,6 +114,8 @@ pub struct TopicAssignments {
pub monitors: TopicAssignment,
/// Standalone spans without a transaction.
pub spans: TopicAssignment,
/// Summary for metrics collected during a span.
pub metrics_summaries: TopicAssignment,
Comment on lines +117 to +118
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure we have the topic created and relay configuration in place before merging this, to not risk auto-creating the topic on the default cluster.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The topic was already created by @mwarkentin on the spans cluster but I should probably add some config to Relay in order to not auto-create it on the events cluster.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

impl TopicAssignments {
Expand All @@ -131,6 +136,7 @@ impl TopicAssignments {
KafkaTopic::ReplayRecordings => &self.replay_recordings,
KafkaTopic::Monitors => &self.monitors,
KafkaTopic::Spans => &self.spans,
KafkaTopic::MetricsSummaries => &self.metrics_summaries,
}
}
}
Expand All @@ -152,6 +158,7 @@ impl Default for TopicAssignments {
replay_recordings: "ingest-replay-recordings".to_owned().into(),
monitors: "ingest-monitors".to_owned().into(),
spans: "snuba-spans".to_owned().into(),
metrics_summaries: "snuba-metrics-summaries".to_owned().into(),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/processor/span/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ fn normalize(

let NormalizeSpanConfig {
received_at,
timestamp_range: timestmap_range,
timestamp_range,
max_tag_value_size,
performance_score,
measurements,
Expand All @@ -323,7 +323,7 @@ fn normalize(
)?;

if let Some(span) = annotated_span.value() {
validate_span(span, Some(&timestmap_range))?;
validate_span(span, Some(&timestamp_range))?;
}
process_value(
annotated_span,
Expand Down
228 changes: 179 additions & 49 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,12 @@ impl StoreService {
}
};

span.duration_ms = ((span.end_timestamp - span.start_timestamp) * 1e3) as u32;
span.event_id = event_id;
span.project_id = scoping.project_id.value();
span.retention_days = retention_days;
span.start_timestamp_ms = (span.start_timestamp * 1e3) as u64;

if let Some(measurements) = &mut span.measurements {
measurements.retain(|_, v| {
v.as_ref()
Expand All @@ -879,30 +885,7 @@ impl StoreService {
});
}

if let Some(metrics_summary) = &mut span.metrics_summary {
metrics_summary.retain(|_, mut v| {
if let Some(v) = &mut v {
v.retain(|v| {
if let Some(v) = v {
return v.min.is_some()
|| v.max.is_some()
|| v.sum.is_some()
|| v.count.is_some();
}
false
});
!v.is_empty()
} else {
false
}
});
}

span.duration_ms = ((span.end_timestamp - span.start_timestamp) * 1e3) as u32;
span.event_id = event_id;
span.project_id = scoping.project_id.value();
span.retention_days = retention_days;
span.start_timestamp_ms = (span.start_timestamp * 1e3) as u64;
self.produce_metrics_summary(scoping, start_time, item, &span);

self.produce(
KafkaTopic::Spans,
Expand All @@ -927,6 +910,121 @@ impl StoreService {

Ok(())
}

fn produce_metrics_summary(
&self,
scoping: Scoping,
start_time: Instant,
item: &Item,
span: &SpanKafkaMessage,
) {
let payload = item.payload();
let d = &mut Deserializer::from_slice(&payload);
let mut metrics_summary: SpanWithMetricsSummary = match serde_path_to_error::deserialize(d)
{
Ok(span) => span,
Err(error) => {
relay_log::error!(
error = &error as &dyn std::error::Error,
"failed to parse span"
);
self.outcome_aggregator.send(TrackOutcome {
category: DataCategory::SpanIndexed,
event_id: None,
outcome: Outcome::Invalid(DiscardReason::InvalidSpan),
quantity: 1,
remote_addr: None,
scoping,
timestamp: instant_to_date_time(start_time),
});
return;
}
};
let Some(metrics_summary) = &mut metrics_summary.metrics_summary else {
return;
};
let &SpanKafkaMessage {
duration_ms,
end_timestamp,
is_segment,
project_id,
received,
retention_days,
segment_id,
span_id,
trace_id,
..
} = span;

metrics_summary.retain(|_, mut v| {
if let Some(v) = &mut v {
v.retain(|v| {
if let Some(v) = v {
return v.min.is_some()
|| v.max.is_some()
|| v.sum.is_some()
|| v.count.is_some();
}
false
});
!v.is_empty()
} else {
false
}
});

let group = span
.sentry_tags
.as_ref()
.and_then(|sentry_tags| sentry_tags.get("group"))
.map_or("", String::as_str);

for (mri, summaries) in metrics_summary {
let Some(summaries) = summaries else {
continue;
};
for summary in summaries {
let Some(SpanMetricsSummary {
count,
max,
min,
sum,
tags,
}) = summary
else {
continue;
};
// Ignore immediate errors on produce.
if let Err(error) = self.produce(
KafkaTopic::MetricsSummaries,
scoping.organization_id,
KafkaMessage::MetricsSummary(MetricsSummaryKafkaMessage {
count,
duration_ms,
end_timestamp,
group,
is_segment,
max,
min,
mri,
project_id,
received,
retention_days,
segment_id: segment_id.unwrap_or_default(),
span_id,
sum,
tags,
trace_id,
}),
) {
relay_log::error!(
error = &error as &dyn std::error::Error,
"failed to push metrics summary to kafka",
);
}
}
}
}
}

impl Service for StoreService {
Expand Down Expand Up @@ -1211,22 +1309,6 @@ struct SpanMeasurement {
value: Option<f64>,
}

#[derive(Debug, Deserialize, Serialize)]
struct SpanMetricsSummary {
#[serde(default, skip_serializing_if = "Option::is_none")]
count: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
max: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
min: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
sum: Option<f64>,
#[serde(default, skip_serializing_if = "Option::is_none")]
tags: Option<BTreeMap<String, String>>,
}

type SpanMetricsSummaries = Vec<Option<SpanMetricsSummary>>;

#[derive(Debug, Deserialize, Serialize)]
struct SpanKafkaMessage<'a> {
#[serde(skip_serializing)]
Expand All @@ -1247,13 +1329,6 @@ struct SpanKafkaMessage<'a> {

#[serde(borrow, default, skip_serializing_if = "Option::is_none")]
measurements: Option<BTreeMap<Cow<'a, str>, Option<SpanMeasurement>>>,
#[serde(
borrow,
default,
rename = "_metrics_summary",
skip_serializing_if = "Option::is_none"
)]
metrics_summary: Option<BTreeMap<Cow<'a, str>, Option<SpanMetricsSummaries>>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
parent_span_id: Option<&'a str>,
#[serde(default, skip_serializing_if = "Option::is_none")]
Expand All @@ -1269,7 +1344,7 @@ struct SpanKafkaMessage<'a> {
#[serde(default, skip_serializing_if = "Option::is_none")]
segment_id: Option<&'a str>,
#[serde(default, skip_serializing_if = "Option::is_none")]
sentry_tags: Option<&'a RawValue>,
sentry_tags: Option<BTreeMap<&'a str, String>>,
span_id: &'a str,
#[serde(default)]
start_timestamp_ms: u64,
Expand All @@ -1278,6 +1353,54 @@ struct SpanKafkaMessage<'a> {
trace_id: &'a str,
}

#[derive(Debug, Deserialize)]
struct SpanMetricsSummary {
#[serde(default)]
count: Option<u64>,
#[serde(default)]
max: Option<f64>,
#[serde(default)]
min: Option<f64>,
#[serde(default)]
sum: Option<f64>,
#[serde(default)]
tags: BTreeMap<String, String>,
}

type SpanMetricsSummaries = Vec<Option<SpanMetricsSummary>>;

#[derive(Debug, Deserialize)]
struct SpanWithMetricsSummary {
#[serde(default, rename(deserialize = "_metrics_summary"))]
metrics_summary: Option<BTreeMap<String, Option<SpanMetricsSummaries>>>,
}

#[derive(Debug, Serialize)]
struct MetricsSummaryKafkaMessage<'a> {
duration_ms: u32,
end_timestamp: f64,
group: &'a str,
is_segment: bool,
mri: &'a str,
project_id: u64,
received: f64,
retention_days: u16,
segment_id: &'a str,
span_id: &'a str,
trace_id: &'a str,

#[serde(skip_serializing_if = "Option::is_none")]
count: &'a Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
max: &'a Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
min: &'a Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
sum: &'a Option<f64>,
#[serde(skip_serializing_if = "BTreeMap::is_empty")]
tags: &'a BTreeMap<String, String>,
}

/// An enum over all possible ingest messages.
#[derive(Debug, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
Expand All @@ -1299,6 +1422,7 @@ enum KafkaMessage<'a> {
ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage),
CheckIn(CheckInKafkaMessage),
Span(SpanKafkaMessage<'a>),
MetricsSummary(MetricsSummaryKafkaMessage<'a>),
}

impl Message for KafkaMessage<'_> {
Expand All @@ -1315,6 +1439,7 @@ impl Message for KafkaMessage<'_> {
KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
KafkaMessage::CheckIn(_) => "check_in",
KafkaMessage::Span(_) => "span",
KafkaMessage::MetricsSummary(_) => "metrics_summary",
}
}

Expand All @@ -1337,7 +1462,8 @@ impl Message for KafkaMessage<'_> {
Self::Session(_)
| Self::Profile(_)
| Self::ReplayRecordingNotChunked(_)
| Self::Span(_) => Uuid::nil(),
| Self::Span(_)
| Self::MetricsSummary(_) => Uuid::nil(),

// TODO(ja): Determine a partitioning key
Self::Metric { .. } => Uuid::nil(),
Expand Down Expand Up @@ -1383,6 +1509,10 @@ impl Message for KafkaMessage<'_> {
KafkaMessage::Span(message) => {
serde_json::to_vec(message).map_err(ClientError::InvalidJson)
}
KafkaMessage::MetricsSummary(message) => {
serde_json::to_vec(message).map_err(ClientError::InvalidJson)
}

_ => rmp_serde::to_vec_named(&self).map_err(ClientError::InvalidMsgPack),
}
}
Expand Down
1 change: 1 addition & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
monitors_consumer,
spans_consumer,
profiles_consumer,
metrics_summaries_consumer,
)


Expand Down
Loading
Loading