Skip to content

Commit 4801ec2

Browse files
authored
feat(profiles): Emit "accepted" outcomes for profiles filtered by sampling (#2054)
To facilitate a billing model that is consistent between transactions and profiles, #2051 introduced a new data category for profiles, such that ``` processed profiles = indexed profiles + sampled profiles # "sampled" as in dropped by dynamic sampling ``` just like it is already the case for transactions: ``` processed transactions = indexed transactions + sampled transactions ``` In other words, "processed profiles" should count _all_ valid, non rate limited profiles seen by Relays, even if they were dropped by dynamic sampling. ## Difference between transactions and profiles For transactions, we extract _metrics_ before dynamic sampling, and those metrics are what we rate limit and eventually log as "accepted" for the "processed" transaction data category (`DataCategory.Transaction`). For profiles, we do not extract metrics (yet), so the outcomes for the "processed" profile data category have to be calculated in a different way. ## How this PR achieves this goal 1. In processing Relays, if an envelope still contains profiles _after_ dynamic sampling, log an `Accepted` outcome for the "processed" category. By restricting this to processing Relays, we can be sure that every profile is only counted once. 2. Also in processing Relays, shortly before reporting outcomes to kafka, 2.1. translate `Filtered` outcomes with reason `Sampled:` to an `Accepted` outcome. This counts all profiles dropped by dynamic sampling, regardless of where the dynamic sampling took place (external relay, pop relay, processing relay). 2.2. Also send the original `Filtered` outcome, but with data category `DataCategory.TransactionIndexed`. By adding up the counts of these two disjoint sets, we should correctly count all profiles regardless of whether they were sampled or not. ## Alternative proposal (rejected for now) In order to _actually_ line up behavior of transactions and profiles, we could start extracting a simple counter metric for profiles before dynamic sampling, and let that metric represent `DataCategory.Profile` -- this would mean that rate limits are applied to the metric, and the accepted outcome would be emitted from the `billing_metrics_consumer`, just like we do for the `DataCategory.Transactions`. See [this internal doc](https://www.notion.so/sentry/Implementation-Concerns-412caf18c2f04f579bb551b98c9dad8c) for more. ## Why the currrent approach was chosen I personally believe that the alternative proposal described above would be more correct and easier to maintain, but: 1. By containing all new logic to processing relays, we will correctly count "processed" profiles even if they were dropped by dynamic sampling outdated external relays. 3. In a wider sense, by containing all new logic to processing relays, we can iterate faster (deploy hotfixes, etc.) without having to worry about the behavior of external relays (which we cannot update). 4. This change is easy to revert if needed. The alternative solution would be distributed across nodes, from external relays to sentry.
1 parent 3fc247b commit 4801ec2

File tree

7 files changed

+501
-13
lines changed

7 files changed

+501
-13
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
- Lower default max compressed replay recording segment size to 10 MiB. ([#2031](https://github.com/getsentry/relay/pull/2031))
1515
- Increase chunking limit to 15MB for replay recordings. ([#2032](https://github.com/getsentry/relay/pull/2032))
1616
- Add a data category for indexed profiles. ([#2051](https://github.com/getsentry/relay/pull/2051))
17+
- Differentiate between `Profile` and `ProfileIndexed` outcomes. ([#2054](https://github.com/getsentry/relay/pull/2054))
1718

1819
## 23.4.0
1920

relay-server/src/actors/outcome.rs

+82-9
Original file line numberDiff line numberDiff line change
@@ -148,12 +148,16 @@ impl FromMessage<Self> for TrackOutcome {
148148
/// Defines the possible outcomes from processing an event.
149149
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
150150
pub enum Outcome {
151-
// /// The event has been accepted and handled completely.
152-
// ///
153-
// /// This is never emitted by Relay as the event may be discarded by the processing pipeline
154-
// /// after Relay. Only the `save_event` task in Sentry finally accepts an event.
155-
// #[allow(dead_code)]
156-
// Accepted,
151+
/// The event has been accepted and handled completely.
152+
///
153+
/// For events and most other types, this is never emitted by Relay as the event
154+
/// may be discarded by the processing pipeline after Relay.
155+
/// Only the `save_event` task in Sentry finally accepts an event.
156+
///
157+
/// The only data type for which this outcome is emitted by Relay is [`DataCategory::Profile`].
158+
/// (See [`crate::actors::processor::EnvelopeProcessor`])
159+
#[cfg(feature = "processing")]
160+
Accepted,
157161
/// The event has been filtered due to a configured filter.
158162
Filtered(FilterStatKey),
159163

@@ -178,6 +182,8 @@ impl Outcome {
178182
/// Returns the raw numeric value of this outcome for the JSON and Kafka schema.
179183
fn to_outcome_id(&self) -> OutcomeId {
180184
match self {
185+
#[cfg(feature = "processing")]
186+
Outcome::Accepted => OutcomeId::ACCEPTED,
181187
Outcome::Filtered(_) | Outcome::FilteredSampling(_) => OutcomeId::FILTERED,
182188
Outcome::RateLimited(_) => OutcomeId::RATE_LIMITED,
183189
Outcome::Invalid(_) => OutcomeId::INVALID,
@@ -189,6 +195,8 @@ impl Outcome {
189195
/// Returns the `reason` code field of this outcome.
190196
fn to_reason(&self) -> Option<Cow<str>> {
191197
match self {
198+
#[cfg(feature = "processing")]
199+
Outcome::Accepted => None,
192200
Outcome::Invalid(discard_reason) => Some(Cow::Borrowed(discard_reason.name())),
193201
Outcome::Filtered(filter_key) => Some(Cow::Borrowed(filter_key.name())),
194202
Outcome::FilteredSampling(rule_ids) => Some(Cow::Owned(format!("Sampled:{rule_ids}"))),
@@ -221,6 +229,8 @@ impl Outcome {
221229
impl fmt::Display for Outcome {
222230
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223231
match self {
232+
#[cfg(feature = "processing")]
233+
Outcome::Accepted => write!(f, "accepted"),
224234
Outcome::Filtered(key) => write!(f, "filtered by {key}"),
225235
Outcome::FilteredSampling(rule_ids) => write!(f, "sampling rule {rule_ids}"),
226236
Outcome::RateLimited(None) => write!(f, "rate limited"),
@@ -787,14 +797,16 @@ impl OutcomeBroker {
787797
}
788798

789799
#[cfg(feature = "processing")]
790-
fn send_kafka_message(
800+
fn send_kafka_message_inner(
791801
&self,
792802
producer: &KafkaOutcomesProducer,
793803
organization_id: u64,
794804
message: TrackRawOutcome,
795805
) -> Result<(), OutcomeError> {
796806
relay_log::trace!("Tracking kafka outcome: {:?}", message);
797807

808+
send_outcome_metric(&message, "kafka");
809+
798810
let payload = serde_json::to_string(&message).map_err(OutcomeError::SerializationError)?;
799811

800812
// At the moment, we support outcomes with optional EventId.
@@ -824,11 +836,23 @@ impl OutcomeBroker {
824836
}
825837
}
826838

839+
#[cfg(feature = "processing")]
840+
fn send_kafka_message(
841+
&self,
842+
producer: &KafkaOutcomesProducer,
843+
organization_id: u64,
844+
message: TrackRawOutcome,
845+
) -> Result<(), OutcomeError> {
846+
for message in transform_outcome(message) {
847+
self.send_kafka_message_inner(producer, organization_id, message)?;
848+
}
849+
Ok(())
850+
}
851+
827852
fn handle_track_outcome(&self, message: TrackOutcome, config: &Config) {
828853
match self {
829854
#[cfg(feature = "processing")]
830855
Self::Kafka(kafka_producer) => {
831-
send_outcome_metric(&message, "kafka");
832856
let organization_id = message.scoping.organization_id;
833857
let raw_message = TrackRawOutcome::from_outcome(message, config);
834858
if let Err(error) =
@@ -853,7 +877,6 @@ impl OutcomeBroker {
853877
match self {
854878
#[cfg(feature = "processing")]
855879
Self::Kafka(kafka_producer) => {
856-
send_outcome_metric(&message, "kafka");
857880
let sharding_id = message.org_id.unwrap_or_else(|| message.project_id.value());
858881
if let Err(error) = self.send_kafka_message(kafka_producer, sharding_id, message) {
859882
relay_log::error!("failed to produce outcome: {}", LogError(&error));
@@ -869,6 +892,56 @@ impl OutcomeBroker {
869892
}
870893
}
871894

895+
/// Returns true if the outcome represents profiles dropped by dynamic sampling.
896+
#[cfg(feature = "processing")]
897+
fn is_sampled_profile(outcome: &TrackRawOutcome) -> bool {
898+
// Older external Relays will still emit a `Profile` outcome.
899+
// Newer Relays will emit a `ProfileIndexed` outcome.
900+
(outcome.category == Some(DataCategory::Profile as u8)
901+
|| outcome.category == Some(DataCategory::ProfileIndexed as u8))
902+
&& outcome.outcome == OutcomeId::FILTERED
903+
&& outcome
904+
.reason
905+
.as_deref()
906+
.map_or(false, |reason| reason.starts_with("Sampled:"))
907+
}
908+
909+
/// Transform outcome into one or more derived outcome messages before sending it to kafka.
910+
#[cfg(feature = "processing")]
911+
fn transform_outcome(mut outcome: TrackRawOutcome) -> impl Iterator<Item = TrackRawOutcome> {
912+
let mut extra = None;
913+
if is_sampled_profile(&outcome) {
914+
// Profiles that were dropped by dynamic sampling still count as "processed",
915+
// so we emit the FILTERED outcome only for the "indexed" category instead.
916+
outcome.category = Some(DataCategory::ProfileIndexed as u8);
917+
918+
// "processed" profiles are an abstract data category that does not represent actual data
919+
// going through our pipeline. Instead, the number of accepted "processed" profiles is counted as
920+
//
921+
// processed_profiles = indexed_profiles + sampled_profiles
922+
//
923+
// The "processed" outcome for indexed_profiles is generated in processing
924+
// (see `EnvelopeProcessor::count_processed_profiles()`),
925+
// but for profiles dropped by dynamic sampling, all we have is the FILTERED outcome,
926+
// which we transform into an ACCEPTED outcome here.
927+
//
928+
// The reason for doing this transformation in the kafka producer is that it should apply
929+
// to both `TrackOutcome` and `TrackRawOutcome`, and it should only happen _once_.
930+
//
931+
// In the future, we might actually extract metrics from profiles before dynamic sampling,
932+
// like we do for transactions. At that point, this code should be removed, and we should
933+
// enforce rate limits and emit outcomes based on the collect profile metric, as we do for
934+
// transactions.
935+
extra = Some(TrackRawOutcome {
936+
outcome: OutcomeId::ACCEPTED,
937+
reason: None,
938+
category: Some(DataCategory::Profile as u8),
939+
..outcome.clone()
940+
});
941+
}
942+
Some(outcome).into_iter().chain(extra)
943+
}
944+
872945
#[derive(Debug)]
873946
enum ProducerInner {
874947
#[cfg(feature = "processing")]

relay-server/src/actors/processor.rs

+56
Original file line numberDiff line numberDiff line change
@@ -1070,6 +1070,54 @@ impl EnvelopeProcessorService {
10701070
})
10711071
}
10721072

1073+
/// Count the number of profiles that are in the envelope and emit accepted outcome.
1074+
///
1075+
/// "processed" profiles are an abstract data category that does not represent actual data
1076+
/// going through our pipeline. Instead, the number of accepted "processed" profiles is counted as
1077+
///
1078+
/// ```text
1079+
/// processed_profiles = indexed_profiles + sampled_profiles
1080+
/// ```
1081+
///
1082+
/// The "processed" outcome for sampled profiles is generated by the Kafka producer
1083+
/// (see `transform_outcome` in [`crate::actors::store`]), but for "indexed" profiles, we count
1084+
/// the corresponding number of processed profiles here.
1085+
///
1086+
/// NOTE: Instead of emitting a [processed](`DataCategory::Profile`) outcome here,
1087+
/// we could also do it in sentry, in the same place where the [indexed](`DataCategory::ProfileIndexed`)
1088+
/// outcome is logged. We do it here to be consistent with profiles that are dropped by dynamic sampling,
1089+
/// which also count as "processed" even though they did not pass through the `process_profiles` step yet.
1090+
///
1091+
///
1092+
/// In the future, we might actually extract metrics from profiles before dynamic sampling,
1093+
/// like we do for transactions. At that point, this code should be removed, and we should
1094+
/// enforce rate limits and emit outcomes based on the collect profile metric, as we do for
1095+
/// transactions.
1096+
#[cfg(feature = "processing")]
1097+
fn count_processed_profiles(&self, state: &mut ProcessEnvelopeState) {
1098+
let profile_count: usize = state
1099+
.managed_envelope
1100+
.envelope()
1101+
.items()
1102+
.filter(|item| item.ty() == &ItemType::Profile)
1103+
.map(|item| item.quantity())
1104+
.sum();
1105+
1106+
if profile_count == 0 {
1107+
return;
1108+
}
1109+
1110+
self.outcome_aggregator.send(TrackOutcome {
1111+
timestamp: state.managed_envelope.received_at(),
1112+
scoping: state.managed_envelope.scoping(),
1113+
outcome: Outcome::Accepted,
1114+
event_id: None,
1115+
remote_addr: None,
1116+
category: DataCategory::Profile,
1117+
quantity: profile_count as u32, // truncates to `u32::MAX`
1118+
})
1119+
}
1120+
10731121
/// Process profiles and set the profile ID in the profile context on the transaction if successful
10741122
#[cfg(feature = "processing")]
10751123
fn process_profiles(&self, state: &mut ProcessEnvelopeState) {
@@ -2245,6 +2293,9 @@ impl EnvelopeProcessorService {
22452293
self.process_replays(state)?;
22462294
self.filter_profiles(state);
22472295

2296+
// After filtering, we need to update the envelope summary:
2297+
state.managed_envelope.update();
2298+
22482299
if state.creates_event() {
22492300
// Some envelopes only create events in processing relays; for example, unreal events.
22502301
// This makes it possible to get in this code block while not really having an event in
@@ -2276,6 +2327,11 @@ impl EnvelopeProcessorService {
22762327

22772328
if_processing!({
22782329
self.enforce_quotas(state)?;
2330+
// Any profile that reaches this point counts as "processed", regardless of whether
2331+
// they survive the actual `process_profiles` step. This is to be consistent with
2332+
// profiles that are dropped by dynamic sampling, which also count as "processed"
2333+
// even though they did not pass through the `process_profiles` step yet.
2334+
self.count_processed_profiles(state);
22792335
// We need the event parsed in order to set the profile context on it
22802336
self.process_profiles(state);
22812337
self.process_check_ins(state);

relay-server/src/envelope.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,11 @@ impl Item {
550550
ItemType::Metrics | ItemType::MetricBuckets => None,
551551
ItemType::FormData => None,
552552
ItemType::UserReport => None,
553-
ItemType::Profile => Some(DataCategory::Profile),
553+
ItemType::Profile => Some(if indexed {
554+
DataCategory::ProfileIndexed
555+
} else {
556+
DataCategory::Profile
557+
}),
554558
ItemType::ReplayEvent | ItemType::ReplayRecording => Some(DataCategory::Replay),
555559
ItemType::ClientReport => None,
556560
ItemType::CheckIn => Some(DataCategory::Monitor),

relay-server/src/utils/managed_envelope.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,11 @@ impl ManagedEnvelope {
312312
if self.context.summary.profile_quantity > 0 {
313313
self.track_outcome(
314314
outcome,
315-
DataCategory::Profile,
315+
if self.context.summary.event_metrics_extracted {
316+
DataCategory::ProfileIndexed
317+
} else {
318+
DataCategory::Profile
319+
},
316320
self.context.summary.profile_quantity,
317321
);
318322
}

relay-server/src/utils/rate_limits.rs

+11-2
Original file line numberDiff line numberDiff line change
@@ -510,8 +510,13 @@ where
510510

511511
// It makes no sense to store profiles without transactions, so if the event
512512
// is rate limited, rate limit profiles as well.
513+
let profile_category = if summary.event_metrics_extracted {
514+
DataCategory::ProfileIndexed
515+
} else {
516+
DataCategory::Profile
517+
};
513518
enforcement.profiles =
514-
CategoryLimit::new(DataCategory::Profile, summary.profile_quantity, longest);
519+
CategoryLimit::new(profile_category, summary.profile_quantity, longest);
515520

516521
rate_limits.merge(event_limits);
517522
}
@@ -548,7 +553,11 @@ where
548553
let item_scoping = scoping.item(DataCategory::Profile);
549554
let profile_limits = (self.check)(item_scoping, summary.profile_quantity)?;
550555
enforcement.profiles = CategoryLimit::new(
551-
DataCategory::Profile,
556+
if summary.event_metrics_extracted {
557+
DataCategory::ProfileIndexed
558+
} else {
559+
DataCategory::Profile
560+
},
552561
summary.profile_quantity,
553562
profile_limits.longest(),
554563
);

0 commit comments

Comments
 (0)