Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(feedback): produce to ingest-feedback-events topic #3344

Merged
merged 28 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
35a4dab
first draft
aliu39 Mar 26, 2024
e2f5d3d
Merge branch 'master' into aliu/produce-to-feedback-topic
aliu39 Mar 27, 2024
3036f02
use global config option and revert event.rs
aliu39 Mar 27, 2024
70f6079
fix topic filtering
aliu39 Mar 27, 2024
69a2236
rm outdated session and metric case
aliu39 Mar 28, 2024
6a5fca9
use rollout rate as bool flag
aliu39 Mar 28, 2024
78d5254
add .vscode to gitignore
aliu39 Mar 28, 2024
b51c913
rm old ff and unused imports
aliu39 Mar 28, 2024
3ca3e94
Merge branch 'master' into aliu/produce-to-feedback-topic
aliu39 Mar 28, 2024
ba3cc73
use is_rolled_out and clean up
aliu39 Mar 28, 2024
5843e3a
Delete relay-server/.vscode/settings.json
aliu39 Mar 28, 2024
caebd46
add serde to TopicAssignments.feedback
aliu39 Mar 28, 2024
a9e5151
Merge branch 'aliu/produce-to-feedback-topic' of https://github.com/g…
aliu39 Mar 28, 2024
de7d7b2
revert .vscode
aliu39 Mar 28, 2024
9cbd7a3
namespace option and add test
aliu39 Mar 28, 2024
1ee586a
format
aliu39 Mar 28, 2024
a5e9caf
fix test
aliu39 Mar 28, 2024
a795e7f
rename back to use_feedback_topic
aliu39 Mar 28, 2024
3d6bd0b
parametrize without_processing
aliu39 Mar 28, 2024
5f203c7
add to CHANGELOG
aliu39 Mar 28, 2024
cf0a70a
Update option descrip
aliu39 Apr 1, 2024
f8e800d
rm return val for set_global_config_option
aliu39 Apr 2, 2024
40f5d47
add other_consumer.assert_empty test
aliu39 Apr 2, 2024
f8e4cff
do rollout check in if cond
aliu39 Apr 2, 2024
e18e2f7
Merge branch 'master' into aliu/produce-to-feedback-topic
aliu39 Apr 2, 2024
0ee36f0
Merge branch 'master' into aliu/produce-to-feedback-topic
aliu39 Apr 8, 2024
d4b7e96
Merge branch 'master' into aliu/produce-to-feedback-topic
aliu39 Apr 9, 2024
c6766e2
Merge branch 'master' into aliu/produce-to-feedback-topic
aliu39 Apr 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
- Drop `event_id` and `remote_addr` from all outcomes. ([#3319](https://github.com/getsentry/relay/pull/3319))
- Support for AI token metrics ([#3250](https://github.com/getsentry/relay/pull/3250))
- Accept integers in `event.user.username`. ([#3328](https://github.com/getsentry/relay/pull/3328))
- Produce user feedback to ingest-feedback-events topic, with rollout rate. ([#3344](https://github.com/getsentry/relay/pull/3344))
- Extract `cache.item_size` and `cache.hit` data into span indexed ([#3367]https://github.com/getsentry/relay/pull/3367)
- Allow IP addresses in metrics domain tag. ([#3365](https://github.com/getsentry/relay/pull/3365))
- Support the full unicode character set via UTF-8 encoding for metric tags submitted via the statsd format. Certain restricted characters require escape sequences, see [docs](https://develop.sentry.dev/sdk/metrics/#normalization) for the precise rules. ([#3358](https://github.com/getsentry/relay/pull/3358))
Expand Down
11 changes: 11 additions & 0 deletions relay-dynamic-config/src/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,17 @@ pub struct Options {
)]
pub metric_stats_rollout_rate: f32,

/// Rollout rate for producing to the ingest-feedback-events topic.
///
/// Rate needs to be between `0.0` and `1.0`.
/// If set to `1.0` all organizations will ingest to the feedback topic.
#[serde(
rename = "feedback.ingest-topic.rollout-rate",
deserialize_with = "default_on_error",
skip_serializing_if = "is_default"
)]
pub feedback_ingest_topic_rollout_rate: f32,

/// Overall sampling of span extraction.
///
/// This number represents the fraction of transactions for which
Expand Down
10 changes: 9 additions & 1 deletion relay-kafka/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,16 @@ pub enum KafkaTopic {
MetricsSummaries,
/// COGS measurements topic.
Cogs,
/// Feedback events topic.
Feedback,
}

impl KafkaTopic {
/// Returns iterator over the variants of [`KafkaTopic`].
/// It will have to be adjusted if the new variants are added.
pub fn iter() -> std::slice::Iter<'static, Self> {
use KafkaTopic::*;
static TOPICS: [KafkaTopic; 14] = [
static TOPICS: [KafkaTopic; 15] = [
Events,
Attachments,
Transactions,
Expand All @@ -75,6 +77,7 @@ impl KafkaTopic {
Spans,
MetricsSummaries,
Cogs,
Feedback,
];
TOPICS.iter()
}
Expand Down Expand Up @@ -124,6 +127,9 @@ pub struct TopicAssignments {
/// COGS measurements.
#[serde(alias = "shared-resources-usage")]
pub cogs: TopicAssignment,
/// Feedback events topic name.
#[serde(alias = "ingest-feedback-events")]
pub feedback: TopicAssignment,
}

impl TopicAssignments {
Expand All @@ -145,6 +151,7 @@ impl TopicAssignments {
KafkaTopic::Spans => &self.spans,
KafkaTopic::MetricsSummaries => &self.metrics_summaries,
KafkaTopic::Cogs => &self.cogs,
KafkaTopic::Feedback => &self.feedback,
}
}
}
Expand All @@ -166,6 +173,7 @@ impl Default for TopicAssignments {
spans: "snuba-spans".to_owned().into(),
metrics_summaries: "snuba-metrics-summaries".to_owned().into(),
cogs: "shared-resources-usage".to_owned().into(),
feedback: "ingest-feedback-events".to_owned().into(),
}
}
}
Expand Down
16 changes: 15 additions & 1 deletion relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ use crate::services::global_config::GlobalConfigHandle;
use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::services::processor::Processed;
use crate::statsd::RelayCounters;
use crate::utils::{self, ArrayEncoding, BucketEncoder, ExtractionMode, TypedEnvelope};
use crate::utils::{
self, is_rolled_out, ArrayEncoding, BucketEncoder, ExtractionMode, TypedEnvelope,
};

/// Fallback name used for attachment items without a `filename` header.
const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
Expand Down Expand Up @@ -184,6 +186,7 @@ impl StoreService {
) -> Result<(), StoreError> {
let retention = envelope.retention();
let event_id = envelope.event_id();

let event_item = envelope.as_mut().take_item_by(|item| {
matches!(
item.ty(),
Expand All @@ -199,6 +202,17 @@ impl StoreService {
KafkaTopic::Attachments
} else if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::Transaction) {
KafkaTopic::Transactions
} else if event_item.as_ref().map(|x| x.ty()) == Some(&ItemType::UserReportV2) {
let feedback_ingest_topic_rollout_rate = self
.global_config
.current()
.options
.feedback_ingest_topic_rollout_rate;
Comment on lines +206 to +210
Copy link
Member

@Dav1dde Dav1dde Apr 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nit:

Suggested change
let feedback_ingest_topic_rollout_rate = self
.global_config
.current()
.options
.feedback_ingest_topic_rollout_rate;
let global_config = self.global_config.current();
let topic_rollout_rate = global_config.options.feedback_ingest_topic_rollout_rate;

I like splitting this, so it takes up less lines and becomes a bit more readable. But need to check if rustfmt aggrees with me here ...

if is_rolled_out(scoping.organization_id, feedback_ingest_topic_rollout_rate) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is checking the rollout rate with the org, not the event -- let's please roll out slowly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's just used as a boolean on/off in practice, so should be fine 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what the feedback event volume is, but suddenly going from 0 to N may cause a backlog until consumers scale up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a function I can use to do it on the event level?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will probably keep it at the org level since feedback volumes aren't that high, and we'd like to test it on all feedback in s4s. Will comment here if that changes. Holding off merging until https://getsentry.atlassian.net/browse/OPS-5543 is done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will probably keep it at the org level since feedback volumes aren't that high

That's totally fine, my comment was just a note to roll out slowly to avoid any surprises 😄

KafkaTopic::Feedback
} else {
KafkaTopic::Events
}
} else {
KafkaTopic::Events
};
Expand Down
1 change: 1 addition & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
profiles_consumer,
metrics_summaries_consumer,
cogs_consumer,
feedback_consumer,
)


Expand Down
4 changes: 4 additions & 0 deletions tests/integration/fixtures/mini_sentry.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ def add_full_project_config(self, project_id, dsn_public_key=None, extra=None):
self.project_configs[project_id] = ret_val
return ret_val

def set_global_config_option(self, option_name, value):
# must be called before initializing relay fixture
self.global_config["options"][option_name] = value


def _get_project_id(public_key, project_configs):
for project_id, project_config in project_configs.items():
Expand Down
21 changes: 21 additions & 0 deletions tests/integration/fixtures/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def inner(options=None):
"profiles": get_topic_name("profiles"),
"metrics_summaries": get_topic_name("metrics_summaries"),
"cogs": get_topic_name("cogs"),
"feedback": get_topic_name("feedback"),
}

if not processing.get("redis"):
Expand Down Expand Up @@ -303,6 +304,16 @@ def replay_events_consumer(kafka_consumer):
)


@pytest.fixture
def feedback_consumer(kafka_consumer):
return lambda timeout=None: FeedbackConsumer(
timeout=timeout,
*kafka_consumer(
"feedback"
), # Corresponds to key in processing_config["processing"]["topics"]
)


@pytest.fixture
def monitors_consumer(kafka_consumer):
return lambda timeout=None: MonitorsConsumer(
Expand Down Expand Up @@ -449,6 +460,16 @@ def get_replay_event(self):
return payload, event


class FeedbackConsumer(ConsumerBase):
def get_event(self, timeout=None):
message = self.poll(timeout)
assert message is not None
assert message.error() is None

message_dict = msgpack.unpackb(message.value(), raw=False, use_list=False)
return json.loads(message_dict["payload"].decode("utf8")), message_dict


class MonitorsConsumer(ConsumerBase):
def get_check_in(self):
message = self.poll()
Expand Down
41 changes: 30 additions & 11 deletions tests/integration/test_feedback.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pytest
import json


Expand Down Expand Up @@ -42,24 +43,35 @@ def generate_feedback_sdk_event():
}


@pytest.mark.parametrize("use_feedback_topic", (False, True))
def test_feedback_event_with_processing(
mini_sentry, relay_with_processing, events_consumer
mini_sentry,
relay_with_processing,
events_consumer,
feedback_consumer,
use_feedback_topic,
):
relay = relay_with_processing()
mini_sentry.add_basic_project_config(
42, extra={"config": {"features": ["organizations:user-feedback-ingest"]}}
)

_events_consumer = events_consumer(timeout=20)
feedback = generate_feedback_sdk_event()
if use_feedback_topic:
mini_sentry.set_global_config_option("feedback.ingest-topic.rollout-rate", 1.0)
consumer = feedback_consumer(timeout=20)
other_consumer = events_consumer(timeout=20)
else:
mini_sentry.set_global_config_option("feedback.ingest-topic.rollout-rate", 0.0)
consumer = events_consumer(timeout=20)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth also assigning other_consumer and calling .assert_empty() on it at the end of the test.

other_consumer = feedback_consumer(timeout=20)

feedback = generate_feedback_sdk_event()
relay = relay_with_processing()
relay.send_user_feedback(42, feedback)

replay_event, replay_event_message = _events_consumer.get_event()
assert replay_event["type"] == "feedback"
# assert replay_event_message["retention_days"] == 90
event, message = consumer.get_event()
assert event["type"] == "feedback"

parsed_feedback = json.loads(bytes(replay_event_message["payload"]))
parsed_feedback = json.loads(message["payload"])
# Assert required fields were returned.
assert parsed_feedback["event_id"]
assert parsed_feedback["type"] == feedback["type"]
Expand Down Expand Up @@ -101,18 +113,25 @@ def test_feedback_event_with_processing(
},
}

# test message wasn't dup'd to the wrong topic
other_consumer.assert_empty()

def test_feedback_events_without_processing(mini_sentry, relay_chain):
relay = relay_chain(min_relay_version="latest")

@pytest.mark.parametrize("use_feedback_topic", (False, True))
def test_feedback_events_without_processing(
mini_sentry, relay_chain, use_feedback_topic
):
project_id = 42
mini_sentry.add_basic_project_config(
project_id,
extra={"config": {"features": ["organizations:user-feedback-ingest"]}},
)
mini_sentry.set_global_config_option(
"feedback.ingest-topic.rollout-rate", 1.0 if use_feedback_topic else 0.0
)

replay_item = generate_feedback_sdk_event()

relay = relay_chain(min_relay_version="latest")
relay.send_user_feedback(42, replay_item)

envelope = mini_sentry.captured_events.get(timeout=20)
Expand Down
Loading