Skip to content

Commit 1ee46b8

Browse files
ref(profiles): Count processed profiles with metrics (#2165)
As described in #2158, track the number of processed profiles by tagging the corresponding transaction metric. Also: Do not accept multiple profiles per envelope. **Not in this PR:** * Distinguishing between `Profile` and `ProfileIndexed` * Rate limiting processed profiles based on metrics --------- Co-authored-by: Iker Barriocanal <[email protected]>
1 parent 1908076 commit 1ee46b8

File tree

9 files changed

+244
-44
lines changed

9 files changed

+244
-44
lines changed

relay-profiling/src/error.rs

+2
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,6 @@ pub enum ProfileError {
2626
MalformedSamples,
2727
#[error("exceed size limit")]
2828
ExceedSizeLimit,
29+
#[error("too many profiles")]
30+
TooManyProfiles,
2931
}

relay-profiling/src/outcomes.rs

+1
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,6 @@ pub fn discard_reason(err: ProfileError) -> &'static str {
1414
ProfileError::NoTransactionAssociated => "profiling_no_transaction_associated",
1515
ProfileError::NotEnoughSamples => "profiling_not_enough_samples",
1616
ProfileError::PlatformNotSupported => "profiling_platform_not_supported",
17+
ProfileError::TooManyProfiles => "profiling_too_many_profiles",
1718
}
1819
}

relay-server/src/actors/processor.rs

+28-6
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use chrono::{DateTime, Duration as SignedDuration, Utc};
1313
use flate2::write::{GzEncoder, ZlibEncoder};
1414
use flate2::Compression;
1515
use once_cell::sync::OnceCell;
16+
use relay_profiling::ProfileError;
1617
use serde_json::Value as SerdeValue;
1718
use tokio::sync::Semaphore;
1819

@@ -307,6 +308,9 @@ struct ProcessEnvelopeState {
307308

308309
/// The managed envelope before processing.
309310
managed_envelope: ManagedEnvelope,
311+
312+
/// Whether there is a profiling item in the envelope.
313+
has_profile: bool,
310314
}
311315

312316
impl ProcessEnvelopeState {
@@ -1061,15 +1065,29 @@ impl EnvelopeProcessorService {
10611065

10621066
/// Remove profiles from the envelope if they can not be parsed
10631067
fn filter_profiles(&self, state: &mut ProcessEnvelopeState) {
1068+
let mut found_profile = false;
10641069
state.managed_envelope.retain_items(|item| match item.ty() {
1065-
ItemType::Profile => match relay_profiling::parse_metadata(&item.payload()) {
1066-
Ok(_) => ItemAction::Keep,
1067-
Err(err) => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
1068-
relay_profiling::discard_reason(err),
1069-
))),
1070-
},
1070+
ItemType::Profile => {
1071+
if !found_profile {
1072+
match relay_profiling::parse_metadata(&item.payload()) {
1073+
Ok(_) => {
1074+
found_profile = true;
1075+
ItemAction::Keep
1076+
}
1077+
Err(err) => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
1078+
relay_profiling::discard_reason(err),
1079+
))),
1080+
}
1081+
} else {
1082+
// We found a second profile, drop it.
1083+
return ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
1084+
relay_profiling::discard_reason(ProfileError::TooManyProfiles),
1085+
)));
1086+
}
1087+
}
10711088
_ => ItemAction::Keep,
10721089
});
1090+
state.has_profile = found_profile;
10731091
}
10741092

10751093
/// Normalize monitor check-ins and remove invalid ones.
@@ -1341,6 +1359,7 @@ impl EnvelopeProcessorService {
13411359
sampling_project_state,
13421360
project_id,
13431361
managed_envelope,
1362+
has_profile: false,
13441363
})
13451364
}
13461365

@@ -2074,6 +2093,7 @@ impl EnvelopeProcessorService {
20742093
event,
20752094
transaction_from_dsc,
20762095
&state.sampling_result,
2096+
state.has_profile,
20772097
&mut state.extracted_metrics.project_metrics,
20782098
&mut state.extracted_metrics.sampling_metrics,
20792099
);
@@ -2706,6 +2726,7 @@ impl Service for EnvelopeProcessorService {
27062726

27072727
#[cfg(test)]
27082728
mod tests {
2729+
use std::env;
27092730
use std::str::FromStr;
27102731

27112732
use chrono::{DateTime, TimeZone, Utc};
@@ -2928,6 +2949,7 @@ mod tests {
29282949
outcome_aggregator.clone(),
29292950
test_store.clone(),
29302951
),
2952+
has_profile: false,
29312953
};
29322954

29332955
// TODO: This does not test if the sampling decision is actually applied. This should be

relay-server/src/envelope.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,11 @@ impl Item {
550550
ItemType::Metrics | ItemType::MetricBuckets => None,
551551
ItemType::FormData => None,
552552
ItemType::UserReport => None,
553-
ItemType::Profile => Some(DataCategory::Profile),
553+
ItemType::Profile => Some(if indexed {
554+
DataCategory::ProfileIndexed
555+
} else {
556+
DataCategory::Profile
557+
}),
554558
ItemType::ReplayEvent | ItemType::ReplayRecording => Some(DataCategory::Replay),
555559
ItemType::ClientReport => None,
556560
ItemType::CheckIn => Some(DataCategory::Monitor),

relay-server/src/metrics_extraction/transactions/mod.rs

+47-17
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ use relay_metrics::{AggregatorConfig, Metric, MetricNamespace, MetricValue};
1515

1616
use crate::metrics_extraction::conditional_tagging::run_conditional_tagging;
1717
use crate::metrics_extraction::transactions::types::{
18-
CommonTag, CommonTags, ExtractMetricsError, TransactionCPRTags, TransactionMeasurementTags,
19-
TransactionMetric,
18+
CommonTag, CommonTags, ExtractMetricsError, TransactionCPRTags, TransactionDurationTags,
19+
TransactionMeasurementTags, TransactionMetric,
2020
};
2121
use crate::metrics_extraction::IntoMetric;
2222
use crate::statsd::RelayCounters;
@@ -334,17 +334,21 @@ pub fn extract_transaction_metrics(
334334
event: &mut Event,
335335
transaction_from_dsc: Option<&str>,
336336
sampling_result: &SamplingResult,
337+
has_profile: bool,
337338
project_metrics: &mut Vec<Metric>, // output parameter
338339
sampling_metrics: &mut Vec<Metric>, // output parameter
339340
) -> Result<bool, ExtractMetricsError> {
340341
let before_len = project_metrics.len();
341342

342343
extract_transaction_metrics_inner(
343-
aggregator_config,
344-
config,
345-
event,
346-
transaction_from_dsc,
347-
sampling_result,
344+
ExtractInput {
345+
aggregator_config,
346+
config,
347+
event,
348+
transaction_from_dsc,
349+
sampling_result,
350+
has_profile,
351+
},
348352
project_metrics,
349353
sampling_metrics,
350354
)?;
@@ -358,15 +362,21 @@ pub fn extract_transaction_metrics(
358362
Ok(!added_slice.is_empty())
359363
}
360364

365+
struct ExtractInput<'a> {
366+
aggregator_config: &'a AggregatorConfig,
367+
config: &'a TransactionMetricsConfig,
368+
event: &'a Event,
369+
transaction_from_dsc: Option<&'a str>,
370+
sampling_result: &'a SamplingResult,
371+
has_profile: bool,
372+
}
373+
361374
fn extract_transaction_metrics_inner(
362-
aggregator_config: &AggregatorConfig,
363-
config: &TransactionMetricsConfig,
364-
event: &Event,
365-
transaction_from_dsc: Option<&str>,
366-
sampling_result: &SamplingResult,
375+
input: ExtractInput<'_>,
367376
metrics: &mut Vec<Metric>, // output parameter
368377
sampling_metrics: &mut Vec<Metric>, // output parameter
369378
) -> Result<(), ExtractMetricsError> {
379+
let event = input.event;
370380
if event.ty.value() != Some(&EventType::Transaction) {
371381
return Ok(());
372382
}
@@ -384,12 +394,16 @@ fn extract_transaction_metrics_inner(
384394
// Validate the transaction event against the metrics timestamp limits. If the metric is too
385395
// old or too new, we cannot extract the metric and also need to drop the transaction event
386396
// for consistency between metrics and events.
387-
if !aggregator_config.timestamp_range().contains(&timestamp) {
397+
if !input
398+
.aggregator_config
399+
.timestamp_range()
400+
.contains(&timestamp)
401+
{
388402
relay_log::debug!("event timestamp is out of the valid range for metrics");
389403
return Err(ExtractMetricsError::InvalidTimestamp);
390404
}
391405

392-
let tags = extract_universal_tags(event, config);
406+
let tags = extract_universal_tags(event, input.config);
393407

394408
// Measurements
395409
if let Some(measurements) = event.measurements.value() {
@@ -459,20 +473,23 @@ fn extract_transaction_metrics_inner(
459473
TransactionMetric::Duration {
460474
unit: DurationUnit::MilliSecond,
461475
value: relay_common::chrono_to_positive_millis(end - start),
462-
tags: tags.clone(),
476+
tags: TransactionDurationTags {
477+
has_profile: input.has_profile,
478+
universal_tags: tags.clone(),
479+
},
463480
}
464481
.into_metric(timestamp),
465482
);
466483

467484
let root_counter_tags = {
468485
let mut universal_tags = CommonTags(BTreeMap::default());
469-
if let Some(transaction_from_dsc) = transaction_from_dsc {
486+
if let Some(transaction_from_dsc) = input.transaction_from_dsc {
470487
universal_tags
471488
.0
472489
.insert(CommonTag::Transaction, transaction_from_dsc.to_string());
473490
}
474491
TransactionCPRTags {
475-
decision: match sampling_result {
492+
decision: match input.sampling_result {
476493
SamplingResult::Keep => "keep".to_owned(),
477494
SamplingResult::Drop(_) => "drop".to_owned(),
478495
},
@@ -1079,6 +1096,7 @@ mod tests {
10791096
event.value_mut().as_mut().unwrap(),
10801097
Some("test_transaction"),
10811098
&SamplingResult::Keep,
1099+
false,
10821100
&mut metrics,
10831101
&mut sampling_metrics,
10841102
)
@@ -1813,6 +1831,7 @@ mod tests {
18131831
event.value_mut().as_mut().unwrap(),
18141832
Some("test_transaction"),
18151833
&SamplingResult::Keep,
1834+
false,
18161835
&mut metrics,
18171836
&mut sampling_metrics,
18181837
)
@@ -1906,6 +1925,7 @@ mod tests {
19061925
event.value_mut().as_mut().unwrap(),
19071926
Some("test_transaction"),
19081927
&SamplingResult::Keep,
1928+
false,
19091929
&mut metrics,
19101930
&mut sampling_metrics,
19111931
)
@@ -1986,6 +2006,7 @@ mod tests {
19862006
event.value_mut().as_mut().unwrap(),
19872007
Some("test_transaction"),
19882008
&SamplingResult::Keep,
2009+
false,
19892010
&mut metrics,
19902011
&mut sampling_metrics,
19912012
)
@@ -2062,6 +2083,7 @@ mod tests {
20622083
event.value_mut().as_mut().unwrap(),
20632084
Some("test_transaction"),
20642085
&SamplingResult::Keep,
2086+
false,
20652087
&mut metrics,
20662088
&mut sampling_metrics,
20672089
)
@@ -2173,6 +2195,7 @@ mod tests {
21732195
event.value_mut().as_mut().unwrap(),
21742196
Some("test_transaction"),
21752197
&SamplingResult::Keep,
2198+
false,
21762199
&mut metrics,
21772200
&mut sampling_metrics,
21782201
)
@@ -2261,6 +2284,7 @@ mod tests {
22612284
event.value_mut().as_mut().unwrap(),
22622285
Some("test_transaction"),
22632286
&SamplingResult::Keep,
2287+
false,
22642288
&mut metrics,
22652289
&mut sampling_metrics,
22662290
)
@@ -2311,6 +2335,7 @@ mod tests {
23112335
event.value_mut().as_mut().unwrap(),
23122336
Some("test_transaction"),
23132337
&SamplingResult::Keep,
2338+
false,
23142339
&mut metrics,
23152340
&mut sampling_metrics,
23162341
)
@@ -2351,6 +2376,7 @@ mod tests {
23512376
event.value_mut().as_mut().unwrap(),
23522377
Some("test_transaction"),
23532378
&SamplingResult::Keep,
2379+
false,
23542380
&mut metrics,
23552381
&mut sampling_metrics,
23562382
)
@@ -2400,6 +2426,7 @@ mod tests {
24002426
event.value_mut().as_mut().unwrap(),
24012427
Some("test_transaction"),
24022428
&SamplingResult::Keep,
2429+
false,
24032430
&mut metrics,
24042431
&mut sampling_metrics,
24052432
);
@@ -2426,6 +2453,7 @@ mod tests {
24262453
event.value_mut().as_mut().unwrap(),
24272454
Some("test_transaction"),
24282455
&SamplingResult::Keep,
2456+
false,
24292457
&mut metrics,
24302458
&mut sampling_metrics,
24312459
)
@@ -2466,6 +2494,7 @@ mod tests {
24662494
event.value_mut().as_mut().unwrap(),
24672495
Some("root_transaction"),
24682496
&SamplingResult::Keep,
2497+
false,
24692498
&mut metrics,
24702499
&mut sampling_metrics,
24712500
)
@@ -2840,6 +2869,7 @@ mod tests {
28402869
event.value_mut().as_mut().unwrap(),
28412870
Some("test_transaction"),
28422871
&SamplingResult::Keep,
2872+
false,
28432873
&mut metrics,
28442874
&mut sampling_metrics,
28452875
)

relay-server/src/metrics_extraction/transactions/types.rs

+17-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub enum TransactionMetric {
1919
Duration {
2020
unit: DurationUnit,
2121
value: DistributionType,
22-
tags: CommonTags,
22+
tags: TransactionDurationTags,
2323
},
2424
/// An internal counter metric used to compute dynamic sampling biases.
2525
///
@@ -96,6 +96,22 @@ impl IntoMetric for TransactionMetric {
9696
}
9797
}
9898

99+
#[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd)]
100+
pub struct TransactionDurationTags {
101+
pub has_profile: bool,
102+
pub universal_tags: CommonTags,
103+
}
104+
105+
impl From<TransactionDurationTags> for BTreeMap<String, String> {
106+
fn from(tags: TransactionDurationTags) -> Self {
107+
let mut map: BTreeMap<String, String> = tags.universal_tags.into();
108+
if tags.has_profile {
109+
map.insert("has_profile".to_string(), "true".to_string());
110+
}
111+
map
112+
}
113+
}
114+
99115
#[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd)]
100116
pub struct TransactionMeasurementTags {
101117
pub measurement_rating: Option<String>,

relay-server/src/utils/managed_envelope.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,11 @@ impl ManagedEnvelope {
326326
if self.context.summary.profile_quantity > 0 {
327327
self.track_outcome(
328328
outcome,
329-
DataCategory::Profile,
329+
if self.use_index_category() {
330+
DataCategory::ProfileIndexed
331+
} else {
332+
DataCategory::Profile
333+
},
330334
self.context.summary.profile_quantity,
331335
);
332336
}

0 commit comments

Comments
 (0)