Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(profiles): Count processed profiles with metrics #2165

Merged
merged 9 commits into from
Jun 2, 2023
Merged
2 changes: 2 additions & 0 deletions relay-profiling/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ pub enum ProfileError {
MalformedSamples,
#[error("exceed size limit")]
ExceedSizeLimit,
#[error("too many profiles")]
TooManyProfiles,
}
1 change: 1 addition & 0 deletions relay-profiling/src/outcomes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ pub fn discard_reason(err: ProfileError) -> &'static str {
ProfileError::NoTransactionAssociated => "profiling_no_transaction_associated",
ProfileError::NotEnoughSamples => "profiling_not_enough_samples",
ProfileError::PlatformNotSupported => "profiling_platform_not_supported",
ProfileError::TooManyProfiles => "profiling_too_many_profiles",
}
}
34 changes: 28 additions & 6 deletions relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use chrono::{DateTime, Duration as SignedDuration, Utc};
use flate2::write::{GzEncoder, ZlibEncoder};
use flate2::Compression;
use once_cell::sync::OnceCell;
use relay_profiling::ProfileError;
use serde_json::Value as SerdeValue;
use tokio::sync::Semaphore;

Expand Down Expand Up @@ -307,6 +308,9 @@ struct ProcessEnvelopeState {

/// The managed envelope before processing.
managed_envelope: ManagedEnvelope,

/// Whether there is a profiling item in the envelope.
has_profile: bool,
}

impl ProcessEnvelopeState {
Expand Down Expand Up @@ -1061,15 +1065,29 @@ impl EnvelopeProcessorService {

/// Remove profiles from the envelope if they can not be parsed
fn filter_profiles(&self, state: &mut ProcessEnvelopeState) {
let mut found_profile = false;
state.managed_envelope.retain_items(|item| match item.ty() {
ItemType::Profile => match relay_profiling::parse_metadata(&item.payload()) {
Ok(_) => ItemAction::Keep,
Err(err) => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
relay_profiling::discard_reason(err),
))),
},
ItemType::Profile => {
if !found_profile {
match relay_profiling::parse_metadata(&item.payload()) {
Ok(_) => {
found_profile = true;
ItemAction::Keep
}
Err(err) => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
relay_profiling::discard_reason(err),
))),
}
} else {
// We found a second profile, drop it.
return ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
relay_profiling::discard_reason(ProfileError::TooManyProfiles),
)));
}
}
_ => ItemAction::Keep,
});
state.has_profile = found_profile;
}

/// Normalize monitor check-ins and remove invalid ones.
Expand Down Expand Up @@ -1341,6 +1359,7 @@ impl EnvelopeProcessorService {
sampling_project_state,
project_id,
managed_envelope,
has_profile: false,
})
}

Expand Down Expand Up @@ -2074,6 +2093,7 @@ impl EnvelopeProcessorService {
event,
transaction_from_dsc,
&state.sampling_result,
state.has_profile,
&mut state.extracted_metrics.project_metrics,
&mut state.extracted_metrics.sampling_metrics,
);
Expand Down Expand Up @@ -2706,6 +2726,7 @@ impl Service for EnvelopeProcessorService {

#[cfg(test)]
mod tests {
use std::env;
use std::str::FromStr;

use chrono::{DateTime, TimeZone, Utc};
Expand Down Expand Up @@ -2928,6 +2949,7 @@ mod tests {
outcome_aggregator.clone(),
test_store.clone(),
),
has_profile: false,
};

// TODO: This does not test if the sampling decision is actually applied. This should be
Expand Down
6 changes: 5 additions & 1 deletion relay-server/src/envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,11 @@ impl Item {
ItemType::Metrics | ItemType::MetricBuckets => None,
ItemType::FormData => None,
ItemType::UserReport => None,
ItemType::Profile => Some(DataCategory::Profile),
ItemType::Profile => Some(if indexed {
DataCategory::ProfileIndexed
} else {
DataCategory::Profile
}),
ItemType::ReplayEvent | ItemType::ReplayRecording => Some(DataCategory::Replay),
ItemType::ClientReport => None,
ItemType::CheckIn => Some(DataCategory::Monitor),
Expand Down
64 changes: 47 additions & 17 deletions relay-server/src/metrics_extraction/transactions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use relay_metrics::{AggregatorConfig, Metric, MetricNamespace, MetricValue};

use crate::metrics_extraction::conditional_tagging::run_conditional_tagging;
use crate::metrics_extraction::transactions::types::{
CommonTag, CommonTags, ExtractMetricsError, TransactionCPRTags, TransactionMeasurementTags,
TransactionMetric,
CommonTag, CommonTags, ExtractMetricsError, TransactionCPRTags, TransactionDurationTags,
TransactionMeasurementTags, TransactionMetric,
};
use crate::metrics_extraction::IntoMetric;
use crate::statsd::RelayCounters;
Expand Down Expand Up @@ -334,17 +334,21 @@ pub fn extract_transaction_metrics(
event: &mut Event,
transaction_from_dsc: Option<&str>,
sampling_result: &SamplingResult,
has_profile: bool,
project_metrics: &mut Vec<Metric>, // output parameter
sampling_metrics: &mut Vec<Metric>, // output parameter
) -> Result<bool, ExtractMetricsError> {
let before_len = project_metrics.len();

extract_transaction_metrics_inner(
aggregator_config,
config,
event,
transaction_from_dsc,
sampling_result,
ExtractInput {
aggregator_config,
config,
event,
transaction_from_dsc,
sampling_result,
has_profile,
},
project_metrics,
sampling_metrics,
)?;
Expand All @@ -358,15 +362,21 @@ pub fn extract_transaction_metrics(
Ok(!added_slice.is_empty())
}

struct ExtractInput<'a> {
aggregator_config: &'a AggregatorConfig,
config: &'a TransactionMetricsConfig,
event: &'a Event,
transaction_from_dsc: Option<&'a str>,
sampling_result: &'a SamplingResult,
has_profile: bool,
}
Comment on lines +365 to +372
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The linter finally started complaining about the number of arguments to extract_transaction_metrics_inner so I created this struct.


fn extract_transaction_metrics_inner(
aggregator_config: &AggregatorConfig,
config: &TransactionMetricsConfig,
event: &Event,
transaction_from_dsc: Option<&str>,
sampling_result: &SamplingResult,
input: ExtractInput<'_>,
metrics: &mut Vec<Metric>, // output parameter
sampling_metrics: &mut Vec<Metric>, // output parameter
) -> Result<(), ExtractMetricsError> {
let event = input.event;
if event.ty.value() != Some(&EventType::Transaction) {
return Ok(());
}
Expand All @@ -384,12 +394,16 @@ fn extract_transaction_metrics_inner(
// Validate the transaction event against the metrics timestamp limits. If the metric is too
// old or too new, we cannot extract the metric and also need to drop the transaction event
// for consistency between metrics and events.
if !aggregator_config.timestamp_range().contains(&timestamp) {
if !input
.aggregator_config
.timestamp_range()
.contains(&timestamp)
{
relay_log::debug!("event timestamp is out of the valid range for metrics");
return Err(ExtractMetricsError::InvalidTimestamp);
}

let tags = extract_universal_tags(event, config);
let tags = extract_universal_tags(event, input.config);

// Measurements
if let Some(measurements) = event.measurements.value() {
Expand Down Expand Up @@ -459,20 +473,23 @@ fn extract_transaction_metrics_inner(
TransactionMetric::Duration {
unit: DurationUnit::MilliSecond,
value: relay_common::chrono_to_positive_millis(end - start),
tags: tags.clone(),
tags: TransactionDurationTags {
has_profile: input.has_profile,
universal_tags: tags.clone(),
},
}
.into_metric(timestamp),
);

let root_counter_tags = {
let mut universal_tags = CommonTags(BTreeMap::default());
if let Some(transaction_from_dsc) = transaction_from_dsc {
if let Some(transaction_from_dsc) = input.transaction_from_dsc {
universal_tags
.0
.insert(CommonTag::Transaction, transaction_from_dsc.to_string());
}
TransactionCPRTags {
decision: match sampling_result {
decision: match input.sampling_result {
SamplingResult::Keep => "keep".to_owned(),
SamplingResult::Drop(_) => "drop".to_owned(),
},
Expand Down Expand Up @@ -1079,6 +1096,7 @@ mod tests {
event.value_mut().as_mut().unwrap(),
Some("test_transaction"),
&SamplingResult::Keep,
false,
&mut metrics,
&mut sampling_metrics,
)
Expand Down Expand Up @@ -1813,6 +1831,7 @@ mod tests {
event.value_mut().as_mut().unwrap(),
Some("test_transaction"),
&SamplingResult::Keep,
false,
&mut metrics,
&mut sampling_metrics,
)
Expand Down Expand Up @@ -1906,6 +1925,7 @@ mod tests {
event.value_mut().as_mut().unwrap(),
Some("test_transaction"),
&SamplingResult::Keep,
false,
&mut metrics,
&mut sampling_metrics,
)
Expand Down Expand Up @@ -1986,6 +2006,7 @@ mod tests {
event.value_mut().as_mut().unwrap(),
Some("test_transaction"),
&SamplingResult::Keep,
false,
&mut metrics,
&mut sampling_metrics,
)
Expand Down Expand Up @@ -2062,6 +2083,7 @@ mod tests {
event.value_mut().as_mut().unwrap(),
Some("test_transaction"),
&SamplingResult::Keep,
false,
&mut metrics,
&mut sampling_metrics,
)
Expand Down Expand Up @@ -2173,6 +2195,7 @@ mod tests {
event.value_mut().as_mut().unwrap(),
Some("test_transaction"),
&SamplingResult::Keep,
false,
&mut metrics,
&mut sampling_metrics,
)
Expand Down Expand Up @@ -2261,6 +2284,7 @@ mod tests {
event.value_mut().as_mut().unwrap(),
Some("test_transaction"),
&SamplingResult::Keep,
false,
&mut metrics,
&mut sampling_metrics,
)
Expand Down Expand Up @@ -2311,6 +2335,7 @@ mod tests {
event.value_mut().as_mut().unwrap(),
Some("test_transaction"),
&SamplingResult::Keep,
false,
&mut metrics,
&mut sampling_metrics,
)
Expand Down Expand Up @@ -2351,6 +2376,7 @@ mod tests {
event.value_mut().as_mut().unwrap(),
Some("test_transaction"),
&SamplingResult::Keep,
false,
&mut metrics,
&mut sampling_metrics,
)
Expand Down Expand Up @@ -2400,6 +2426,7 @@ mod tests {
event.value_mut().as_mut().unwrap(),
Some("test_transaction"),
&SamplingResult::Keep,
false,
&mut metrics,
&mut sampling_metrics,
);
Expand All @@ -2426,6 +2453,7 @@ mod tests {
event.value_mut().as_mut().unwrap(),
Some("test_transaction"),
&SamplingResult::Keep,
false,
&mut metrics,
&mut sampling_metrics,
)
Expand Down Expand Up @@ -2466,6 +2494,7 @@ mod tests {
event.value_mut().as_mut().unwrap(),
Some("root_transaction"),
&SamplingResult::Keep,
false,
&mut metrics,
&mut sampling_metrics,
)
Expand Down Expand Up @@ -2840,6 +2869,7 @@ mod tests {
event.value_mut().as_mut().unwrap(),
Some("test_transaction"),
&SamplingResult::Keep,
false,
&mut metrics,
&mut sampling_metrics,
)
Expand Down
18 changes: 17 additions & 1 deletion relay-server/src/metrics_extraction/transactions/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub enum TransactionMetric {
Duration {
unit: DurationUnit,
value: DistributionType,
tags: CommonTags,
tags: TransactionDurationTags,
},
/// An internal counter metric used to compute dynamic sampling biases.
///
Expand Down Expand Up @@ -96,6 +96,22 @@ impl IntoMetric for TransactionMetric {
}
}

#[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd)]
pub struct TransactionDurationTags {
pub has_profile: bool,
pub universal_tags: CommonTags,
}

impl From<TransactionDurationTags> for BTreeMap<String, String> {
fn from(tags: TransactionDurationTags) -> Self {
let mut map: BTreeMap<String, String> = tags.universal_tags.into();
if tags.has_profile {
map.insert("has_profile".to_string(), "true".to_string());
}
map
}
}

#[derive(Clone, Debug, PartialEq, Eq, Ord, PartialOrd)]
pub struct TransactionMeasurementTags {
pub measurement_rating: Option<String>,
Expand Down
6 changes: 5 additions & 1 deletion relay-server/src/utils/managed_envelope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,11 @@ impl ManagedEnvelope {
if self.context.summary.profile_quantity > 0 {
self.track_outcome(
outcome,
DataCategory::Profile,
if self.use_index_category() {
DataCategory::ProfileIndexed
} else {
DataCategory::Profile
},
self.context.summary.profile_quantity,
);
}
Expand Down
Loading