Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(profiles): Emit "accepted" outcomes for profiles filtered by sampling #2054

Merged
merged 15 commits into from
Apr 24, 2023
88 changes: 79 additions & 9 deletions relay-server/src/actors/outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,16 @@ impl FromMessage<Self> for TrackOutcome {
/// Defines the possible outcomes from processing an event.
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum Outcome {
// /// The event has been accepted and handled completely.
// ///
// /// This is never emitted by Relay as the event may be discarded by the processing pipeline
// /// after Relay. Only the `save_event` task in Sentry finally accepts an event.
// #[allow(dead_code)]
// Accepted,
/// The event has been accepted and handled completely.
///
/// For events and most other types, this is never emitted by Relay as the event
/// may be discarded by the processing pipeline after Relay.
/// Only the `save_event` task in Sentry finally accepts an event.
///
/// The only data type for which this outcome is emitted by Relay is [`DataCategory::Profile`].
/// (See [`crate::actors::processor::EnvelopeProcessor`])
#[cfg(feature = "processing")]
Accepted,
/// The event has been filtered due to a configured filter.
Filtered(FilterStatKey),

Expand All @@ -178,6 +182,8 @@ impl Outcome {
/// Returns the raw numeric value of this outcome for the JSON and Kafka schema.
fn to_outcome_id(&self) -> OutcomeId {
match self {
#[cfg(feature = "processing")]
Outcome::Accepted => OutcomeId::ACCEPTED,
Outcome::Filtered(_) | Outcome::FilteredSampling(_) => OutcomeId::FILTERED,
Outcome::RateLimited(_) => OutcomeId::RATE_LIMITED,
Outcome::Invalid(_) => OutcomeId::INVALID,
Expand All @@ -189,6 +195,8 @@ impl Outcome {
/// Returns the `reason` code field of this outcome.
fn to_reason(&self) -> Option<Cow<str>> {
match self {
#[cfg(feature = "processing")]
Outcome::Accepted => None,
Outcome::Invalid(discard_reason) => Some(Cow::Borrowed(discard_reason.name())),
Outcome::Filtered(filter_key) => Some(Cow::Borrowed(filter_key.name())),
Outcome::FilteredSampling(rule_ids) => Some(Cow::Owned(format!("Sampled:{rule_ids}"))),
Expand Down Expand Up @@ -221,6 +229,8 @@ impl Outcome {
impl fmt::Display for Outcome {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
#[cfg(feature = "processing")]
Outcome::Accepted => write!(f, "accepted"),
Outcome::Filtered(key) => write!(f, "filtered by {key}"),
Outcome::FilteredSampling(rule_ids) => write!(f, "sampling rule {rule_ids}"),
Outcome::RateLimited(None) => write!(f, "rate limited"),
Expand Down Expand Up @@ -787,14 +797,16 @@ impl OutcomeBroker {
}

#[cfg(feature = "processing")]
fn send_kafka_message(
fn send_kafka_message_inner(
&self,
producer: &KafkaOutcomesProducer,
organization_id: u64,
message: TrackRawOutcome,
) -> Result<(), OutcomeError> {
relay_log::trace!("Tracking kafka outcome: {:?}", message);

send_outcome_metric(&message, "kafka");

let payload = serde_json::to_string(&message).map_err(OutcomeError::SerializationError)?;

// At the moment, we support outcomes with optional EventId.
Expand Down Expand Up @@ -824,11 +836,23 @@ impl OutcomeBroker {
}
}

#[cfg(feature = "processing")]
fn send_kafka_message(
&self,
producer: &KafkaOutcomesProducer,
organization_id: u64,
message: TrackRawOutcome,
) -> Result<(), OutcomeError> {
for message in transform_outcome(message) {
self.send_kafka_message_inner(producer, organization_id, message)?;
}
Ok(())
}

fn handle_track_outcome(&self, message: TrackOutcome, config: &Config) {
match self {
#[cfg(feature = "processing")]
Self::Kafka(kafka_producer) => {
send_outcome_metric(&message, "kafka");
let organization_id = message.scoping.organization_id;
let raw_message = TrackRawOutcome::from_outcome(message, config);
if let Err(error) =
Expand All @@ -853,7 +877,6 @@ impl OutcomeBroker {
match self {
#[cfg(feature = "processing")]
Self::Kafka(kafka_producer) => {
send_outcome_metric(&message, "kafka");
let sharding_id = message.org_id.unwrap_or_else(|| message.project_id.value());
if let Err(error) = self.send_kafka_message(kafka_producer, sharding_id, message) {
relay_log::error!("failed to produce outcome: {}", LogError(&error));
Expand All @@ -869,6 +892,53 @@ impl OutcomeBroker {
}
}

/// Returns true if the outcome represents profiles dropped by dynamic sampling.
#[cfg(feature = "processing")]
fn is_sampled_profile(outcome: &TrackRawOutcome) -> bool {
outcome.category == Some(DataCategory::Profile as u8)
&& outcome.outcome == OutcomeId::FILTERED
&& outcome
.reason
.as_deref()
.map_or(false, |reason| reason.starts_with("Sampled:"))
}

/// Transform outcome into one or more derived outcome messages before sending it to kafka.
#[cfg(feature = "processing")]
fn transform_outcome(mut outcome: TrackRawOutcome) -> impl Iterator<Item = TrackRawOutcome> {
let mut extra = None;
if is_sampled_profile(&outcome) {
// Profiles that were dropped by dynamic sampling still count as "processed",
// so we emit the FILTERED outcome only for the "indexed" category instead.
outcome.category = Some(DataCategory::ProfileIndexed as u8);

// "processed" profiles are an abstract data category that does not represent actual data
// going through our pipeline. Instead, the number of accepted "processed" profiles is counted as
//
// processed_profiles = indexed_profiles + sampled_profiles
//
// The "processed" outcome for indexed_profiles is generated in processing
// (see `EnvelopeProcessor::count_processed_profiles()`),
// but for profiles dropped by dynamic sampling, all we have is the FILTERED outcome,
// which we transform into an ACCEPTED outcome here.
//
// The reason for doing this transformation in the kafka producer is that it should apply
// to both `TrackOutcome` and `TrackRawOutcome`, and it should only happen _once_.
//
// In the future, we might actually extract metrics from profiles before dynamic sampling,
// like we do for transactions. At that point, this code should be removed, and we should
// enforce rate limits and emit outcomes based on the collect profile metric, as we do for
// transactions.
extra = Some(TrackRawOutcome {
outcome: OutcomeId::ACCEPTED,
reason: None,
category: Some(DataCategory::Profile as u8),
..outcome.clone()
});
}
Some(outcome).into_iter().chain(extra)
}

#[derive(Debug)]
enum ProducerInner {
#[cfg(feature = "processing")]
Expand Down
53 changes: 53 additions & 0 deletions relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,51 @@ impl EnvelopeProcessorService {
})
}

/// Count the number of profiles that are in the envelope and emit accepted outcome.
///
/// "processed" profiles are an abstract data category that does not represent actual data
/// going through our pipeline. Instead, the number of accepted "processed" profiles is counted as
///
/// processed_profiles = indexed_profiles + sampled_profiles
///
/// The "processed" outcome for sampled profiles is generated by the Kafka producer
/// (see `transform_outcome` in [`crate::actors::store`]), but for "indexed" profiles, we count
/// the corresponding number of processed profiles here.
///
/// NOTE: Instead of emitting a [processed](`DataCategory::Profile`) outcome here,
/// we could also do it in sentry, in the same place where the [indexed](`DataCategory::ProfileIndexed`)
/// outcome is logged. We do it here to be consistent with profiles that are dropped by dynamic sampling,
/// which also count as "processed" even though they did not pass through the `process_profiles` step yet.
///
///
/// In the future, we might actually extract metrics from profiles before dynamic sampling,
/// like we do for transactions. At that point, this code should be removed, and we should
/// enforce rate limits and emit outcomes based on the collect profile metric, as we do for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sentence ends here kind of leaving you hanging at the end.

Suggested change
/// enforce rate limits and emit outcomes based on the collect profile metric, as we do for
/// enforce rate limits and emit outcomes based on the collect profile metric, as we do for transactions

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

#[cfg(feature = "processing")]
fn count_processed_profiles(&self, state: &mut ProcessEnvelopeState) {
let profile_count: usize = state
.managed_envelope
.envelope()
.items()
.filter(|item| item.ty() == &ItemType::Profile)
.map(|item| item.quantity())
.sum();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, the SDKs will only send 1 profile per envelope.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, but Relay does not enforce that AFAIK, so its safer to handle the case where there's multiple profiles.


if profile_count == 0 {
return;
}

self.outcome_aggregator.send(TrackOutcome {
timestamp: state.managed_envelope.received_at(),
scoping: state.managed_envelope.scoping(),
outcome: Outcome::Accepted,
event_id: None,
remote_addr: None,
category: DataCategory::Profile,
quantity: profile_count as u32, // truncates to `u32::MAX`
})
}

/// Process profiles and set the profile ID in the profile context on the transaction if successful
#[cfg(feature = "processing")]
fn process_profiles(&self, state: &mut ProcessEnvelopeState) {
Expand Down Expand Up @@ -2245,6 +2290,9 @@ impl EnvelopeProcessorService {
self.process_replays(state)?;
self.filter_profiles(state);

// After filtering, we need to update the envelope summary:
state.managed_envelope.update();

if state.creates_event() {
// Some envelopes only create events in processing relays; for example, unreal events.
// This makes it possible to get in this code block while not really having an event in
Expand Down Expand Up @@ -2276,6 +2324,11 @@ impl EnvelopeProcessorService {

if_processing!({
self.enforce_quotas(state)?;
// Any profile that reaches this point counts as "processed", regardless of whether
// they survive the actual `process_profiles` step. This is to be consistent with
// profiles that are dropped by dynamic sampling, which also count as "processed"
// even though they did not pass through the `process_profiles` step yet.
self.count_processed_profiles(state);
// We need the event parsed in order to set the profile context on it
self.process_profiles(state);
self.process_check_ins(state);
Expand Down
Loading