Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): Support dedicated topics per metrics usecase, drop metrics from unknown usecases [INGEST-1309] #1285

Merged
merged 19 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
- Add support for profile outcomes. ([#1272](https://github.com/getsentry/relay/pull/1272))
- Avoid potential panics when scrubbing minidumps. ([#1282](https://github.com/getsentry/relay/pull/1282))
- Fix typescript profile validation. ([#1283](https://github.com/getsentry/relay/pull/1283))
- Support dedicated topics per metrics usecase, drop metrics from unknown usecases. ([#1285](https://github.com/getsentry/relay/pull/1285))

## 22.5.0

Expand Down
24 changes: 20 additions & 4 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -775,8 +775,12 @@ pub enum KafkaTopic {
OutcomesBilling,
/// Session health updates.
Sessions,
/// Aggregate Metrics.
Metrics,
/// The default topic for metrics. We use this mainly for tests at the moment.
MetricsDefault,
/// Any metric that is extracted from sessions.
MetricsSessions,
/// Any metric that is extracted from transactions.
MetricsTransactions,
/// Profiles
Profiles,
}
Expand All @@ -797,8 +801,14 @@ pub struct TopicAssignments {
pub outcomes_billing: Option<TopicAssignment>,
/// Session health topic name.
pub sessions: TopicAssignment,
/// Metrics topic name.
/// Default topic name for all aggregate metrics. Specialized topics for session-based and
/// transaction-based metrics can be configured via `metrics_sessions` and
/// `metrics_transactions` each.
pub metrics: TopicAssignment,
/// Topic name for metrics extracted from sessions. Defaults to the assignment of `metrics`.
pub metrics_sessions: Option<TopicAssignment>,
/// Topic name for metrics extracted from transactions. Defaults to the assignment of `metrics`.
pub metrics_transactions: Option<TopicAssignment>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If we introduce a MetricNamespace enum, this could become a mapping of MetricNamespace => TopicAssignment, which would allow for more generic code in all other places. Provided that all unknown namespace strings are mapped to MetricNamespace::Unknown, this would even allow to create config before updating Relay.

I have not fully thought through all implications here, however.

/// Stacktrace topic name
pub profiles: TopicAssignment,
}
Expand All @@ -813,7 +823,11 @@ impl TopicAssignments {
KafkaTopic::Outcomes => &self.outcomes,
KafkaTopic::OutcomesBilling => self.outcomes_billing.as_ref().unwrap_or(&self.outcomes),
KafkaTopic::Sessions => &self.sessions,
KafkaTopic::Metrics => &self.metrics,
KafkaTopic::MetricsDefault => &self.metrics,
KafkaTopic::MetricsSessions => self.metrics_sessions.as_ref().unwrap_or(&self.metrics),
KafkaTopic::MetricsTransactions => {
self.metrics_transactions.as_ref().unwrap_or(&self.metrics)
}
KafkaTopic::Profiles => &self.profiles,
}
}
Expand All @@ -829,6 +843,8 @@ impl Default for TopicAssignments {
outcomes_billing: None,
sessions: "ingest-sessions".to_owned().into(),
metrics: "ingest-metrics".to_owned().into(),
metrics_sessions: None,
metrics_transactions: None,
profiles: "profiles".to_owned().into(),
}
}
Expand Down
22 changes: 11 additions & 11 deletions relay-metrics/src/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use relay_common::{MonotonicResult, ProjectKey, UnixTimestamp};
use relay_system::{Controller, Shutdown};

use crate::statsd::{MetricCounters, MetricGauges, MetricHistograms, MetricSets, MetricTimers};
use crate::{protocol, Metric, MetricType, MetricUnit, MetricValue};
use crate::{protocol, Metric, MetricResourceIdentifier, MetricType, MetricUnit, MetricValue};

/// Interval for the flush cycle of the [`Aggregator`].
const FLUSH_INTERVAL: Duration = Duration::from_millis(100);
Expand Down Expand Up @@ -1129,7 +1129,7 @@ impl Aggregator {
return Err(AggregateMetricsErrorKind::InvalidStringLength.into());
}

if !protocol::is_valid_mri(&key.metric_name) {
if key.metric_name.parse::<MetricResourceIdentifier>().is_err() {
relay_log::debug!("invalid metric name {:?}", key.metric_name);
relay_log::configure_scope(|scope| {
scope.set_extra(
Expand Down Expand Up @@ -1584,7 +1584,7 @@ mod tests {

fn some_metric() -> Metric {
Metric {
name: "c:foo".to_owned(),
name: "c:transactions/foo".to_owned(),
unit: MetricUnit::None,
value: MetricValue::Counter(42.),
timestamp: UnixTimestamp::from_secs(999994711),
Expand Down Expand Up @@ -1910,7 +1910,7 @@ mod tests {
BucketKey {
project_key: ProjectKey("a94ae32be2584e0bbd7a4cbb95971fee"),
timestamp: UnixTimestamp(999994711),
metric_name: "c:foo",
metric_name: "c:transactions/foo",
metric_type: Counter,
metric_unit: None,
tags: {},
Expand Down Expand Up @@ -1959,7 +1959,7 @@ mod tests {
BucketKey {
project_key: ProjectKey("a94ae32be2584e0bbd7a4cbb95971fee"),
timestamp: UnixTimestamp(999994710),
metric_name: "c:foo",
metric_name: "c:transactions/foo",
metric_type: Counter,
metric_unit: None,
tags: {},
Expand All @@ -1972,7 +1972,7 @@ mod tests {
BucketKey {
project_key: ProjectKey("a94ae32be2584e0bbd7a4cbb95971fee"),
timestamp: UnixTimestamp(999994720),
metric_name: "c:foo",
metric_name: "c:transactions/foo",
metric_type: Counter,
metric_unit: None,
tags: {},
Expand Down Expand Up @@ -2239,7 +2239,7 @@ mod tests {
let bucket_key = BucketKey {
project_key,
timestamp: UnixTimestamp::now(),
metric_name: "c:hergus.bergus".to_owned(),
metric_name: "c:transactions/hergus.bergus".to_owned(),
metric_type: MetricType::Counter,
metric_unit: MetricUnit::None,
tags: {
Expand Down Expand Up @@ -2289,7 +2289,7 @@ mod tests {
let short_metric = BucketKey {
project_key,
timestamp: UnixTimestamp::now(),
metric_name: "c:a_short_metric".to_owned(),
metric_name: "c:transactions/a_short_metric".to_owned(),
metric_type: MetricType::Counter,
metric_unit: MetricUnit::None,
tags: BTreeMap::new(),
Expand All @@ -2299,7 +2299,7 @@ mod tests {
let long_metric = BucketKey {
project_key,
timestamp: UnixTimestamp::now(),
metric_name: "c:long_name_a_very_long_name_its_super_long_really_but_like_super_long_probably_the_longest_name_youve_seen_and_even_the_longest_name_ever_its_extremly_long_i_cant_tell_how_long_it_is_because_i_dont_have_that_many_fingers_thus_i_cant_count_the_many_characters_this_long_name_is".to_owned(),
metric_name: "c:transactions/long_name_a_very_long_name_its_super_long_really_but_like_super_long_probably_the_longest_name_youve_seen_and_even_the_longest_name_ever_its_extremly_long_i_cant_tell_how_long_it_is_because_i_dont_have_that_many_fingers_thus_i_cant_count_the_many_characters_this_long_name_is".to_owned(),
metric_type: MetricType::Counter,
metric_unit: MetricUnit::None,
tags: BTreeMap::new(),
Expand All @@ -2314,7 +2314,7 @@ mod tests {
let short_metric_long_tag_key = BucketKey {
project_key,
timestamp: UnixTimestamp::now(),
metric_name: "c:a_short_metric_with_long_tag_key".to_owned(),
metric_name: "c:transactions/a_short_metric_with_long_tag_key".to_owned(),
metric_type: MetricType::Counter,
metric_unit: MetricUnit::None,
tags: BTreeMap::from([("i_run_out_of_creativity_so_here_we_go_Lorem_Ipsum_is_simply_dummy_text_of_the_printing_and_typesetting_industry_Lorem_Ipsum_has_been_the_industrys_standard_dummy_text_ever_since_the_1500s_when_an_unknown_printer_took_a_galley_of_type_and_scrambled_it_to_make_a_type_specimen_book".into(), "tag_value".into())]),
Expand All @@ -2326,7 +2326,7 @@ mod tests {
let short_metric_long_tag_value = BucketKey {
project_key,
timestamp: UnixTimestamp::now(),
metric_name: "c:a_short_metric_with_long_tag_value".to_owned(),
metric_name: "c:transactions/a_short_metric_with_long_tag_value".to_owned(),
metric_type: MetricType::Counter,
metric_unit: MetricUnit::None,
tags: BTreeMap::from([("tag_key".into(), "i_run_out_of_creativity_so_here_we_go_Lorem_Ipsum_is_simply_dummy_text_of_the_printing_and_typesetting_industry_Lorem_Ipsum_has_been_the_industrys_standard_dummy_text_ever_since_the_1500s_when_an_unknown_printer_took_a_galley_of_type_and_scrambled_it_to_make_a_type_specimen_book".into())]),
Expand Down
49 changes: 35 additions & 14 deletions relay-metrics/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,20 +152,41 @@ fn is_valid_name(name: &str) -> bool {
false
}

/// Validates an MRI of the form `<ty>:<ns>/<name>@<unit>`
///
/// Note that the format used in the statsd protocol is different: Metric names are not prefixed
/// with `<ty>:` as the type is somewhere else in the protocol.
pub(crate) fn is_valid_mri(name: &str) -> bool {
let mut components = name.splitn(2, ':');
if components.next().is_none() {
return false;
}

if let Some(name_unit) = components.next() {
parse_name_unit(name_unit).is_some()
} else {
false
/// A metric name as MRI, a naming scheme which includes most of the metric's bucket key
/// (excl. timestamp and tags).
pub struct MetricResourceIdentifier {
/// The metric type.
pub ty: MetricType,
/// The namespace/usecase for this metric. For example `sessions` or `transactions`.
pub namespace: String,
/// The actual name, such as `duration` as part of `d:transactions/duration@ms`
pub name: String,
/// The metric unit.
pub unit: MetricUnit,
}

impl std::str::FromStr for MetricResourceIdentifier {
type Err = ParseMetricError;

/// Parses and validates an MRI of the form `<ty>:<ns>/<name>@<unit>`
///
/// Note that the format used in the statsd protocol is different: Metric names are not prefixed
/// with `<ty>:` as the type is somewhere else in the protocol.
fn from_str(name: &str) -> Result<Self, Self::Err> {
let (raw_ty, rest) = name.split_once(':').ok_or(ParseMetricError(()))?;
let ty = raw_ty.parse()?;

let (raw_namespace, rest) = rest.split_once('/').unwrap_or(("custom", rest));
let namespace = raw_namespace.to_owned();

let (name, unit) = parse_name_unit(rest).ok_or(ParseMetricError(()))?;

Ok(Self {
ty,
namespace,
name,
unit,
})
}
}

Expand Down
38 changes: 33 additions & 5 deletions relay-server/src/actors/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use relay_common::{ProjectId, UnixTimestamp, Uuid};
use relay_config::{Config, KafkaTopic};
use relay_general::protocol::{self, EventId, SessionAggregates, SessionStatus, SessionUpdate};
use relay_log::LogError;
use relay_metrics::{Bucket, BucketValue, MetricUnit};
use relay_metrics::{Bucket, BucketValue, MetricResourceIdentifier, MetricUnit};
use relay_quotas::Scoping;
use relay_statsd::metric;

Expand Down Expand Up @@ -57,7 +57,9 @@ struct Producers {
attachments: Producer,
transactions: Producer,
sessions: Producer,
metrics: Producer,
metrics_default: Producer,
metrics_sessions: Producer,
metrics_transactions: Producer,
profiles: Producer,
}

Expand All @@ -74,7 +76,9 @@ impl Producers {
None
}
KafkaTopic::Sessions => Some(&self.sessions),
KafkaTopic::Metrics => Some(&self.metrics),
KafkaTopic::MetricsDefault => Some(&self.metrics_default),
KafkaTopic::MetricsSessions => Some(&self.metrics_sessions),
KafkaTopic::MetricsTransactions => Some(&self.metrics_transactions),
Comment on lines +79 to +80
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: can these ever be None?

KafkaTopic::Profiles => Some(&self.profiles),
}
}
Expand Down Expand Up @@ -131,7 +135,21 @@ impl StoreForwarder {
events: make_producer(&*config, &mut reused_producers, KafkaTopic::Events)?,
transactions: make_producer(&*config, &mut reused_producers, KafkaTopic::Transactions)?,
sessions: make_producer(&*config, &mut reused_producers, KafkaTopic::Sessions)?,
metrics: make_producer(&*config, &mut reused_producers, KafkaTopic::Metrics)?,
metrics_default: make_producer(
&*config,
&mut reused_producers,
KafkaTopic::MetricsDefault,
)?,
metrics_sessions: make_producer(
&*config,
&mut reused_producers,
KafkaTopic::MetricsSessions,
)?,
metrics_transactions: make_producer(
&*config,
&mut reused_producers,
KafkaTopic::MetricsTransactions,
)?,
profiles: make_producer(&*config, &mut reused_producers, KafkaTopic::Profiles)?,
};

Expand Down Expand Up @@ -374,8 +392,18 @@ impl StoreForwarder {
}

fn send_metric_message(&self, message: MetricKafkaMessage) -> Result<(), StoreError> {
let topic = match message.name.parse() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This would be easier to read if you map the namespace here. For instance:

Suggested change
let topic = match message.name.parse() {
let mri: MetricResourceIdentifier = message.name.parse();
let topic = match mri.map(|mri| mri.namespace) {

Ok(MetricResourceIdentifier { namespace, .. }) if namespace == "transactions" => {
KafkaTopic::MetricsTransactions
}
Ok(MetricResourceIdentifier { namespace, .. }) if namespace == "sessions" => {
KafkaTopic::MetricsSessions
}
_ => KafkaTopic::MetricsDefault,
};

relay_log::trace!("Sending metric message to kafka");
self.produce(KafkaTopic::Metrics, KafkaMessage::Metric(message))?;
self.produce(topic, KafkaMessage::Metric(message))?;
metric!(
counter(RelayCounters::ProcessingMessageProduced) += 1,
event_type = "metric"
Expand Down