Skip to content

Commit 7452732

Browse files
ref(profiles): Counting profiles, V2 (#2163)
Instead of counting processed profiles in two different places (see #2054), add a `has_profile` tag to the transaction counter metric, and define ``` processed_profiles := count(processed_transactions) WHERE has_profile = true ``` ## Changes This PR contains the following changes: * Revert PRs around counting profiles. * Add `has_profile(s)` tag on `transaction.duration` metric. #2165 * Update metrics billing consumer to emit `accepted` outcome for profiles. getsentry/sentry#50047 ## Caveats With this PR, rate limits will be applied consistently if they are issued for `DataCategory.Transaction`. However, if a rate limit (e.g. spike protection) is issued _only_ for `DataCategory.Profile`, it will not be enforced. ## Rollout Order 1. Merge and deploy getsentry/sentry#50047. The billing metrics consumer will now listen for profiles, but not receive any. 3. Deploy processing Relays. Profile counts will go down, because PoP-Relays are not sending the `has_profile` tag yet. 4. Deploy PoP Relays. Profile counts should go back to normal. ## In case of rollback 1. First revert the sentry change (getsentry/sentry#50047) and deploy it to stop counting profiles. 2. Then revert the Relay change (this PR) and deploy to processing Relays and PoPs. ref: #2158 --------- Co-authored-by: Iker Barriocanal <[email protected]>
1 parent c44960e commit 7452732

File tree

11 files changed

+253
-255
lines changed

11 files changed

+253
-255
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
## Unreleased
44

5+
**Bug Fixes**:
6+
7+
- Make counting of total profiles consistent with total transactions. ([#2163](https://github.com/getsentry/relay/pull/2163))
8+
59
**Features**:
610

711
- Add support for X-Vercel-Forwarded-For header. ([#2124](https://github.com/getsentry/relay/pull/2124))

relay-profiling/src/error.rs

+2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ pub enum ProfileError {
2626
MalformedSamples,
2727
#[error("exceed size limit")]
2828
ExceedSizeLimit,
29+
#[error("too many profiles")]
30+
TooManyProfiles,
2931
#[error("duration is too long")]
3032
DurationIsTooLong,
3133
}

relay-profiling/src/outcomes.rs

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pub fn discard_reason(err: ProfileError) -> &'static str {
1414
ProfileError::NoTransactionAssociated => "profiling_no_transaction_associated",
1515
ProfileError::NotEnoughSamples => "profiling_not_enough_samples",
1616
ProfileError::PlatformNotSupported => "profiling_platform_not_supported",
17+
ProfileError::TooManyProfiles => "profiling_too_many_profiles",
1718
ProfileError::DurationIsTooLong => "profiling_duration_is_too_long",
1819
}
1920
}

relay-server/src/actors/outcome.rs

+9-80
Original file line numberDiff line numberDiff line change
@@ -147,16 +147,12 @@ impl FromMessage<Self> for TrackOutcome {
147147
/// Defines the possible outcomes from processing an event.
148148
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
149149
pub enum Outcome {
150-
/// The event has been accepted and handled completely.
151-
///
152-
/// For events and most other types, this is never emitted by Relay as the event
153-
/// may be discarded by the processing pipeline after Relay.
154-
/// Only the `save_event` task in Sentry finally accepts an event.
155-
///
156-
/// The only data type for which this outcome is emitted by Relay is [`DataCategory::Profile`].
157-
/// (See [`crate::actors::processor::EnvelopeProcessor`])
158-
#[cfg(feature = "processing")]
159-
Accepted,
150+
// /// The event has been accepted and handled completely.
151+
// ///
152+
// /// This is never emitted by Relay as the event may be discarded by the processing pipeline
153+
// /// after Relay. Only the `save_event` task in Sentry finally accepts an event.
154+
// #[allow(dead_code)]
155+
// Accepted,
160156
/// The event has been filtered due to a configured filter.
161157
Filtered(FilterStatKey),
162158

@@ -181,8 +177,6 @@ impl Outcome {
181177
/// Returns the raw numeric value of this outcome for the JSON and Kafka schema.
182178
fn to_outcome_id(&self) -> OutcomeId {
183179
match self {
184-
#[cfg(feature = "processing")]
185-
Outcome::Accepted => OutcomeId::ACCEPTED,
186180
Outcome::Filtered(_) | Outcome::FilteredSampling(_) => OutcomeId::FILTERED,
187181
Outcome::RateLimited(_) => OutcomeId::RATE_LIMITED,
188182
Outcome::Invalid(_) => OutcomeId::INVALID,
@@ -194,8 +188,6 @@ impl Outcome {
194188
/// Returns the `reason` code field of this outcome.
195189
fn to_reason(&self) -> Option<Cow<str>> {
196190
match self {
197-
#[cfg(feature = "processing")]
198-
Outcome::Accepted => None,
199191
Outcome::Invalid(discard_reason) => Some(Cow::Borrowed(discard_reason.name())),
200192
Outcome::Filtered(filter_key) => Some(Cow::Borrowed(filter_key.name())),
201193
Outcome::FilteredSampling(rule_ids) => Some(Cow::Owned(format!("Sampled:{rule_ids}"))),
@@ -228,8 +220,6 @@ impl Outcome {
228220
impl fmt::Display for Outcome {
229221
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
230222
match self {
231-
#[cfg(feature = "processing")]
232-
Outcome::Accepted => write!(f, "accepted"),
233223
Outcome::Filtered(key) => write!(f, "filtered by {key}"),
234224
Outcome::FilteredSampling(rule_ids) => write!(f, "sampling rule {rule_ids}"),
235225
Outcome::RateLimited(None) => write!(f, "rate limited"),
@@ -796,16 +786,14 @@ impl OutcomeBroker {
796786
}
797787

798788
#[cfg(feature = "processing")]
799-
fn send_kafka_message_inner(
789+
fn send_kafka_message(
800790
&self,
801791
producer: &KafkaOutcomesProducer,
802792
organization_id: u64,
803793
message: TrackRawOutcome,
804794
) -> Result<(), OutcomeError> {
805795
relay_log::trace!("Tracking kafka outcome: {message:?}");
806796

807-
send_outcome_metric(&message, "kafka");
808-
809797
let payload = serde_json::to_string(&message).map_err(OutcomeError::SerializationError)?;
810798

811799
// At the moment, we support outcomes with optional EventId.
@@ -835,23 +823,11 @@ impl OutcomeBroker {
835823
}
836824
}
837825

838-
#[cfg(feature = "processing")]
839-
fn send_kafka_message(
840-
&self,
841-
producer: &KafkaOutcomesProducer,
842-
organization_id: u64,
843-
message: TrackRawOutcome,
844-
) -> Result<(), OutcomeError> {
845-
for message in transform_outcome(message) {
846-
self.send_kafka_message_inner(producer, organization_id, message)?;
847-
}
848-
Ok(())
849-
}
850-
851826
fn handle_track_outcome(&self, message: TrackOutcome, config: &Config) {
852827
match self {
853828
#[cfg(feature = "processing")]
854829
Self::Kafka(kafka_producer) => {
830+
send_outcome_metric(&message, "kafka");
855831
let organization_id = message.scoping.organization_id;
856832
let raw_message = TrackRawOutcome::from_outcome(message, config);
857833
if let Err(error) =
@@ -876,6 +852,7 @@ impl OutcomeBroker {
876852
match self {
877853
#[cfg(feature = "processing")]
878854
Self::Kafka(kafka_producer) => {
855+
send_outcome_metric(&message, "kafka");
879856
let sharding_id = message.org_id.unwrap_or_else(|| message.project_id.value());
880857
if let Err(error) = self.send_kafka_message(kafka_producer, sharding_id, message) {
881858
relay_log::error!(error = &error as &dyn Error, "failed to produce outcome");
@@ -891,54 +868,6 @@ impl OutcomeBroker {
891868
}
892869
}
893870

894-
/// Returns true if the outcome represents profiles dropped by dynamic sampling.
895-
#[cfg(feature = "processing")]
896-
fn is_sampled_profile(outcome: &TrackRawOutcome) -> bool {
897-
(outcome.category == Some(DataCategory::Profile as u8)
898-
|| outcome.category == Some(DataCategory::ProfileIndexed as u8))
899-
&& outcome.outcome == OutcomeId::FILTERED
900-
&& outcome
901-
.reason
902-
.as_deref()
903-
.map_or(false, |reason| reason.starts_with("Sampled:"))
904-
}
905-
906-
/// Transform outcome into one or more derived outcome messages before sending it to kafka.
907-
#[cfg(feature = "processing")]
908-
fn transform_outcome(mut outcome: TrackRawOutcome) -> impl Iterator<Item = TrackRawOutcome> {
909-
let mut extra = None;
910-
if is_sampled_profile(&outcome) {
911-
// Profiles that were dropped by dynamic sampling still count as "processed",
912-
// so we emit the FILTERED outcome only for the "indexed" category instead.
913-
outcome.category = Some(DataCategory::ProfileIndexed as u8);
914-
915-
// "processed" profiles are an abstract data category that does not represent actual data
916-
// going through our pipeline. Instead, the number of accepted "processed" profiles is counted as
917-
//
918-
// processed_profiles = indexed_profiles + sampled_profiles
919-
//
920-
// The "processed" outcome for indexed_profiles is generated in processing
921-
// (see `EnvelopeProcessor::count_processed_profiles()`),
922-
// but for profiles dropped by dynamic sampling, all we have is the FILTERED outcome,
923-
// which we transform into an ACCEPTED outcome here.
924-
//
925-
// The reason for doing this transformation in the kafka producer is that it should apply
926-
// to both `TrackOutcome` and `TrackRawOutcome`, and it should only happen _once_.
927-
//
928-
// In the future, we might actually extract metrics from profiles before dynamic sampling,
929-
// like we do for transactions. At that point, this code should be removed, and we should
930-
// enforce rate limits and emit outcomes based on the collect profile metric, as we do for
931-
// transactions.
932-
extra = Some(TrackRawOutcome {
933-
outcome: OutcomeId::ACCEPTED,
934-
reason: None,
935-
category: Some(DataCategory::Profile as u8),
936-
..outcome.clone()
937-
});
938-
}
939-
Some(outcome).into_iter().chain(extra)
940-
}
941-
942871
#[derive(Debug)]
943872
enum ProducerInner {
944873
#[cfg(feature = "processing")]

relay-server/src/actors/processor.rs

+28-69
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use chrono::{DateTime, Duration as SignedDuration, Utc};
1313
use flate2::write::{GzEncoder, ZlibEncoder};
1414
use flate2::Compression;
1515
use once_cell::sync::OnceCell;
16+
use relay_profiling::ProfileError;
1617
use serde_json::Value as SerdeValue;
1718
use tokio::sync::Semaphore;
1819

@@ -307,6 +308,9 @@ struct ProcessEnvelopeState {
307308

308309
/// The managed envelope before processing.
309310
managed_envelope: ManagedEnvelope,
311+
312+
/// Whether there is a profiling item in the envelope.
313+
has_profile: bool,
310314
}
311315

312316
impl ProcessEnvelopeState {
@@ -1061,15 +1065,29 @@ impl EnvelopeProcessorService {
10611065

10621066
/// Remove profiles from the envelope if they can not be parsed
10631067
fn filter_profiles(&self, state: &mut ProcessEnvelopeState) {
1068+
let mut found_profile = false;
10641069
state.managed_envelope.retain_items(|item| match item.ty() {
1065-
ItemType::Profile => match relay_profiling::parse_metadata(&item.payload()) {
1066-
Ok(_) => ItemAction::Keep,
1067-
Err(err) => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
1068-
relay_profiling::discard_reason(err),
1069-
))),
1070-
},
1070+
ItemType::Profile => {
1071+
if !found_profile {
1072+
match relay_profiling::parse_metadata(&item.payload()) {
1073+
Ok(_) => {
1074+
found_profile = true;
1075+
ItemAction::Keep
1076+
}
1077+
Err(err) => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
1078+
relay_profiling::discard_reason(err),
1079+
))),
1080+
}
1081+
} else {
1082+
// We found a second profile, drop it.
1083+
return ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
1084+
relay_profiling::discard_reason(ProfileError::TooManyProfiles),
1085+
)));
1086+
}
1087+
}
10711088
_ => ItemAction::Keep,
10721089
});
1090+
state.has_profile = found_profile;
10731091
}
10741092

10751093
/// Normalize monitor check-ins and remove invalid ones.
@@ -1097,61 +1115,6 @@ impl EnvelopeProcessorService {
10971115
})
10981116
}
10991117

1100-
/// Count the number of profiles that are in the envelope and emit accepted outcome.
1101-
///
1102-
/// "processed" profiles are an abstract data category that does not represent actual data
1103-
/// going through our pipeline. Instead, the number of accepted "processed" profiles is counted as
1104-
///
1105-
/// ```text
1106-
/// processed_profiles = indexed_profiles + sampled_profiles
1107-
/// ```
1108-
///
1109-
/// The "processed" outcome for sampled profiles is generated by the Kafka producer
1110-
/// (see `transform_outcome` in [`crate::actors::store`]), but for "indexed" profiles, we count
1111-
/// the corresponding number of processed profiles here.
1112-
///
1113-
/// NOTE: Instead of emitting a [processed](`DataCategory::Profile`) outcome here,
1114-
/// we could also do it in sentry, in the same place where the [indexed](`DataCategory::ProfileIndexed`)
1115-
/// outcome is logged. We do it here to be consistent with profiles that are dropped by dynamic sampling,
1116-
/// which also count as "processed" even though they did not pass through the `process_profiles` step yet.
1117-
///
1118-
///
1119-
/// In the future, we might actually extract metrics from profiles before dynamic sampling,
1120-
/// like we do for transactions. At that point, this code should be removed, and we should
1121-
/// enforce rate limits and emit outcomes based on the collect profile metric, as we do for
1122-
/// transactions.
1123-
#[cfg(feature = "processing")]
1124-
fn count_processed_profiles(&self, state: &mut ProcessEnvelopeState) {
1125-
let profile_count: usize = state
1126-
.managed_envelope
1127-
.envelope_mut()
1128-
.items_mut()
1129-
.filter(|item| item.ty() == &ItemType::Profile)
1130-
.map(|item| {
1131-
item.set_profile_counted_as_processed();
1132-
item.quantity()
1133-
})
1134-
.sum();
1135-
1136-
if profile_count == 0 {
1137-
return;
1138-
}
1139-
1140-
self.outcome_aggregator.send(TrackOutcome {
1141-
timestamp: state.managed_envelope.received_at(),
1142-
scoping: state.managed_envelope.scoping(),
1143-
outcome: Outcome::Accepted,
1144-
event_id: None,
1145-
remote_addr: None,
1146-
category: DataCategory::Profile,
1147-
quantity: profile_count as u32, // truncates to `u32::MAX`
1148-
});
1149-
1150-
// TODO: At this point, we should also ensure that the envelope summary gets recomputed.
1151-
// But recomputing the summary after extracting the event is currently problematic, because it
1152-
// sets the envelope type to `None`. This needs to be solved in a follow-up.
1153-
}
1154-
11551118
/// Process profiles and set the profile ID in the profile context on the transaction if successful
11561119
#[cfg(feature = "processing")]
11571120
fn process_profiles(&self, state: &mut ProcessEnvelopeState) {
@@ -1396,6 +1359,7 @@ impl EnvelopeProcessorService {
13961359
sampling_project_state,
13971360
project_id,
13981361
managed_envelope,
1362+
has_profile: false,
13991363
})
14001364
}
14011365

@@ -2129,6 +2093,7 @@ impl EnvelopeProcessorService {
21292093
event,
21302094
transaction_from_dsc,
21312095
&state.sampling_result,
2096+
state.has_profile,
21322097
&mut state.extracted_metrics.project_metrics,
21332098
&mut state.extracted_metrics.sampling_metrics,
21342099
);
@@ -2429,9 +2394,6 @@ impl EnvelopeProcessorService {
24292394
self.process_replays(state)?;
24302395
self.filter_profiles(state);
24312396

2432-
// After filtering, we need to update the envelope summary:
2433-
state.managed_envelope.update();
2434-
24352397
if state.creates_event() {
24362398
// Some envelopes only create events in processing relays; for example, unreal events.
24372399
// This makes it possible to get in this code block while not really having an event in
@@ -2463,11 +2425,6 @@ impl EnvelopeProcessorService {
24632425

24642426
if_processing!({
24652427
self.enforce_quotas(state)?;
2466-
// Any profile that reaches this point counts as "processed", regardless of whether
2467-
// they survive the actual `process_profiles` step. This is to be consistent with
2468-
// profiles that are dropped by dynamic sampling, which also count as "processed"
2469-
// even though they did not pass through the `process_profiles` step yet.
2470-
self.count_processed_profiles(state);
24712428
// We need the event parsed in order to set the profile context on it
24722429
self.process_profiles(state);
24732430
self.process_check_ins(state);
@@ -2770,6 +2727,7 @@ impl Service for EnvelopeProcessorService {
27702727

27712728
#[cfg(test)]
27722729
mod tests {
2730+
use std::env;
27732731
use std::str::FromStr;
27742732

27752733
use chrono::{DateTime, TimeZone, Utc};
@@ -2992,6 +2950,7 @@ mod tests {
29922950
outcome_aggregator.clone(),
29932951
test_store.clone(),
29942952
),
2953+
has_profile: false,
29952954
};
29962955

29972956
// TODO: This does not test if the sampling decision is actually applied. This should be

0 commit comments

Comments
 (0)