Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(replay): separate feedback from attachments in the same envelope #3403

Merged
merged 21 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

## Unreleased


**Bug fixes:**

- Respect country code TLDs when scrubbing span tags. ([#3458](https://github.com/getsentry/relay/pull/3458))

**Features**:

- Separate the logic for producing UserReportV2 events (user feedback) and handle attachments in the same envelope as feedback. ([#3403](https://github.com/getsentry/relay/pull/3403))
- Use same keys for OTel span attributes and Sentry span data. ([#3457](https://github.com/getsentry/relay/pull/3457))
- Support passing owner when upserting Monitors. ([#3468](https://github.com/getsentry/relay/pull/3468))

Expand Down
12 changes: 12 additions & 0 deletions relay-dynamic-config/src/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,18 @@ pub struct Options {
)]
pub feedback_ingest_topic_rollout_rate: f32,

/// Flag for handling feedback and attachments in the same envelope. Temporary FF for fast-revert
/// (will remove after user feedback GA release).
Comment on lines +170 to +171
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without context, this comment is not clear to me what the flag should do. What do you think about something like the following?

Suggested change
/// Flag for handling feedback and attachments in the same envelope. Temporary FF for fast-revert
/// (will remove after user feedback GA release).
/// Kill switch to stop producing feedback items separately into Kafka. The topic is controlled by `feedback_ingest_topic_rollout_rate`.

///
/// Enabling this will also separate the logic for producing feedback, to its own match case in
/// StoreService::store_envelope
#[serde(
rename = "feedback.ingest-inline-attachments",
deserialize_with = "default_on_error",
skip_serializing_if = "is_default"
)]
pub feedback_ingest_same_envelope_attachments: bool,

/// Overall sampling of span extraction.
///
/// This number represents the fraction of transactions for which
Expand Down
57 changes: 52 additions & 5 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,19 @@ impl StoreService {
let retention = envelope.retention();
let event_id = envelope.event_id();

let feedback_ingest_same_envelope_attachments = self
.global_config
.current()
.options
.feedback_ingest_same_envelope_attachments;

let event_item = envelope.as_mut().take_item_by(|item| {
matches!(
item.ty(),
ItemType::Event
| ItemType::Transaction
| ItemType::Security
| ItemType::UserReportV2
(item.ty(), feedback_ingest_same_envelope_attachments),
(ItemType::Event, _)
| (ItemType::Transaction, _)
| (ItemType::Security, _)
| (ItemType::UserReportV2, false)
)
});
let client = envelope.meta().client();
Expand Down Expand Up @@ -244,6 +250,17 @@ impl StoreService {
item,
)?;
}
ItemType::UserReportV2 if feedback_ingest_same_envelope_attachments => {
let remote_addr = envelope.meta().client_addr().map(|addr| addr.to_string());
self.produce_user_report_v2(
event_id.ok_or(StoreError::NoEventId)?,
scoping.project_id,
scoping.organization_id,
start_time,
item,
remote_addr,
)?;
}
ItemType::Profile => self.produce_profile(
scoping.organization_id,
scoping.project_id,
Expand Down Expand Up @@ -673,6 +690,36 @@ impl StoreService {
self.produce(KafkaTopic::Attachments, message)
}

fn produce_user_report_v2(
&self,
event_id: EventId,
project_id: ProjectId,
organization_id: u64,
start_time: Instant,
item: &Item,
remote_addr: Option<String>,
) -> Result<(), StoreError> {
// check rollout rate option (effectively a FF) to determine whether to produce to new infra
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #3344

let global_config = self.global_config.current();
let feedback_ingest_topic_rollout_rate =
global_config.options.feedback_ingest_topic_rollout_rate;
let topic = if is_rolled_out(organization_id, feedback_ingest_topic_rollout_rate) {
KafkaTopic::Feedback
} else {
KafkaTopic::Events
};
Comment on lines +706 to +710
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of scope for this PR, but should we hard-code this to Feedback soon? Or are some installations not ready for that yet? (self-hosted, devenv, ...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this has to do with ops deployment of the feedback topic -- right now, its only in s4s and not other regions. This check and rollout rate will eventually be removed too


let message = KafkaMessage::Event(EventKafkaMessage {
project_id,
event_id,
payload: item.payload(),
start_time: UnixTimestamp::from_instant(start_time).as_secs(),
remote_addr,
attachments: vec![],
});
self.produce(topic, message)
}

fn send_metric_message(
&self,
namespace: MetricNamespace,
Expand Down
69 changes: 69 additions & 0 deletions tests/.idea/workspace.xml
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, remove this file.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion tests/integration/fixtures/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,9 @@ def transactions_consumer(kafka_consumer):

@pytest.fixture
def attachments_consumer(kafka_consumer):
return lambda: AttachmentsConsumer(*kafka_consumer("attachments"))
return lambda timeout=None: AttachmentsConsumer(
timeout=timeout, *kafka_consumer("attachments")
)


@pytest.fixture
Expand Down
Loading
Loading