-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(feedback): produce to ingest-feedback-events topic #3344
Changes from all commits
35a4dab
e2f5d3d
3036f02
70f6079
69a2236
6a5fca9
78d5254
b51c913
3ca3e94
ba3cc73
5843e3a
caebd46
a9e5151
de7d7b2
9cbd7a3
1ee586a
a5e9caf
a795e7f
3d6bd0b
5f203c7
cf0a70a
f8e800d
40f5d47
f8e4cff
e18e2f7
0ee36f0
d4b7e96
c6766e2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,7 +33,9 @@ use crate::services::global_config::GlobalConfigHandle; | |
use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; | ||
use crate::services::processor::Processed; | ||
use crate::statsd::RelayCounters; | ||
use crate::utils::{self, ArrayEncoding, BucketEncoder, ExtractionMode, TypedEnvelope}; | ||
use crate::utils::{ | ||
self, is_rolled_out, ArrayEncoding, BucketEncoder, ExtractionMode, TypedEnvelope, | ||
}; | ||
|
||
/// Fallback name used for attachment items without a `filename` header. | ||
const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment"; | ||
|
@@ -184,6 +186,7 @@ impl StoreService { | |
) -> Result<(), StoreError> { | ||
let retention = envelope.retention(); | ||
let event_id = envelope.event_id(); | ||
|
||
let event_item = envelope.as_mut().take_item_by(|item| { | ||
matches!( | ||
item.ty(), | ||
|
@@ -199,6 +202,17 @@ impl StoreService { | |
KafkaTopic::Attachments | ||
} else if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) { | ||
KafkaTopic::Transactions | ||
} else if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::UserReportV2) { | ||
let feedback_ingest_topic_rollout_rate = self | ||
.global_config | ||
.current() | ||
.options | ||
.feedback_ingest_topic_rollout_rate; | ||
if is_rolled_out(scoping.organization_id, feedback_ingest_topic_rollout_rate) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is checking the rollout rate with the org, not the event -- let's please roll out slowly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's just used as a boolean on/off in practice, so should be fine 👍 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure what the feedback event volume is, but suddenly going from 0 to N may cause a backlog until consumers scale up. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a function I can use to do it on the event level? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We will probably keep it at the org level since feedback volumes aren't that high, and we'd like to test it on all feedback in s4s. Will comment here if that changes. Holding off merging until https://getsentry.atlassian.net/browse/OPS-5543 is done. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That's totally fine, my comment was just a note to roll out slowly to avoid any surprises 😄 |
||
KafkaTopic::Feedback | ||
} else { | ||
KafkaTopic::Events | ||
} | ||
} else { | ||
KafkaTopic::Events | ||
}; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,6 +37,7 @@ | |
profiles_consumer, | ||
metrics_summaries_consumer, | ||
cogs_consumer, | ||
feedback_consumer, | ||
) | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,4 @@ | ||
import pytest | ||
import json | ||
|
||
|
||
|
@@ -42,24 +43,35 @@ def generate_feedback_sdk_event(): | |
} | ||
|
||
|
||
@pytest.mark.parametrize("use_feedback_topic", (False, True)) | ||
def test_feedback_event_with_processing( | ||
mini_sentry, relay_with_processing, events_consumer | ||
mini_sentry, | ||
relay_with_processing, | ||
events_consumer, | ||
feedback_consumer, | ||
use_feedback_topic, | ||
): | ||
relay = relay_with_processing() | ||
mini_sentry.add_basic_project_config( | ||
42, extra={"config": {"features": ["organizations:user-feedback-ingest"]}} | ||
) | ||
|
||
_events_consumer = events_consumer(timeout=20) | ||
feedback = generate_feedback_sdk_event() | ||
if use_feedback_topic: | ||
mini_sentry.set_global_config_option("feedback.ingest-topic.rollout-rate", 1.0) | ||
consumer = feedback_consumer(timeout=20) | ||
other_consumer = events_consumer(timeout=20) | ||
else: | ||
mini_sentry.set_global_config_option("feedback.ingest-topic.rollout-rate", 0.0) | ||
consumer = events_consumer(timeout=20) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be worth also assigning |
||
other_consumer = feedback_consumer(timeout=20) | ||
|
||
feedback = generate_feedback_sdk_event() | ||
relay = relay_with_processing() | ||
relay.send_user_feedback(42, feedback) | ||
|
||
replay_event, replay_event_message = _events_consumer.get_event() | ||
assert replay_event["type"] == "feedback" | ||
# assert replay_event_message["retention_days"] == 90 | ||
event, message = consumer.get_event() | ||
assert event["type"] == "feedback" | ||
|
||
parsed_feedback = json.loads(bytes(replay_event_message["payload"])) | ||
parsed_feedback = json.loads(message["payload"]) | ||
# Assert required fields were returned. | ||
assert parsed_feedback["event_id"] | ||
assert parsed_feedback["type"] == feedback["type"] | ||
|
@@ -101,18 +113,25 @@ def test_feedback_event_with_processing( | |
}, | ||
} | ||
|
||
# test message wasn't dup'd to the wrong topic | ||
other_consumer.assert_empty() | ||
|
||
def test_feedback_events_without_processing(mini_sentry, relay_chain): | ||
relay = relay_chain(min_relay_version="latest") | ||
|
||
@pytest.mark.parametrize("use_feedback_topic", (False, True)) | ||
def test_feedback_events_without_processing( | ||
mini_sentry, relay_chain, use_feedback_topic | ||
): | ||
project_id = 42 | ||
mini_sentry.add_basic_project_config( | ||
project_id, | ||
extra={"config": {"features": ["organizations:user-feedback-ingest"]}}, | ||
) | ||
mini_sentry.set_global_config_option( | ||
"feedback.ingest-topic.rollout-rate", 1.0 if use_feedback_topic else 0.0 | ||
) | ||
|
||
replay_item = generate_feedback_sdk_event() | ||
|
||
relay = relay_chain(min_relay_version="latest") | ||
relay.send_user_feedback(42, replay_item) | ||
|
||
envelope = mini_sentry.captured_events.get(timeout=20) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tiny nit:
I like splitting this, so it takes up less lines and becomes a bit more readable. But need to check if rustfmt aggrees with me here ...