Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): Support dedicated topics per metrics usecase, drop metrics from unknown usecases [INGEST-1309] #1285

Merged
merged 19 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions relay-metrics/src/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -753,8 +753,8 @@ enum AggregateMetricsErrorKind {
#[fail(display = "found invalid characters")]
InvalidCharacters,
/// A metric bucket had an unknown namespace in the metric name.
#[fail(display = "found invalid namespace")]
InvalidNamespace,
#[fail(display = "found unsupported namespace")]
UnsupportedNamespace,
/// A metric bucket's timestamp was out of the configured acceptable range.
#[fail(display = "found invalid timestamp")]
InvalidTimestamp,
Expand Down Expand Up @@ -1297,18 +1297,26 @@ impl Aggregator {
return Err(AggregateMetricsErrorKind::InvalidStringLength.into());
}

key.metric_name = match key.metric_name.parse::<MetricResourceIdentifier>() {
if let Err(err) = Self::normalize_metric_name(&mut key) {
relay_log::configure_scope(|scope| {
scope.set_extra(
"bucket.project_key",
key.project_key.as_str().to_owned().into(),
);
scope.set_extra("bucket.metric_name", key.metric_name.into());
});
return Err(err);
}

Ok(key)
}

fn normalize_metric_name(key: &mut BucketKey) -> Result<(), AggregateMetricsError> {
key.metric_name = match MetricResourceIdentifier::parse(&key.metric_name) {
Ok(mri) => {
if matches!(mri.namespace, MetricNamespace::Unsupported) {
relay_log::debug!("invalid metric namespace {:?}", key.metric_name);
relay_log::configure_scope(|scope| {
scope.set_extra(
"bucket.project_key",
key.project_key.as_str().to_owned().into(),
);
scope.set_extra("bucket.metric_name", key.metric_name.into());
});
return Err(AggregateMetricsErrorKind::InvalidNamespace.into());
return Err(AggregateMetricsErrorKind::UnsupportedNamespace.into());
}

let mut metric_name = mri.to_string();
Expand All @@ -1318,18 +1326,11 @@ impl Aggregator {
}
Err(_) => {
relay_log::debug!("invalid metric name {:?}", key.metric_name);
relay_log::configure_scope(|scope| {
scope.set_extra(
"bucket.project_key",
key.project_key.as_str().to_owned().into(),
);
scope.set_extra("bucket.metric_name", key.metric_name.into());
});
return Err(AggregateMetricsErrorKind::InvalidCharacters.into());
}
};

Ok(key)
Ok(())
}

/// Removes tags with invalid characters in the key, and validates tag values.
Expand Down Expand Up @@ -1536,7 +1537,6 @@ impl Aggregator {
let bucket_interval = self.config.bucket_interval;
let cost_tracker = &mut self.cost_tracker;
self.buckets.retain(|key, entry| {
dbg!(&key);
if force || entry.elapsed() {
// Take the value and leave a placeholder behind. It'll be removed right after.
let value = mem::replace(&mut entry.value, BucketValue::Counter(0.0));
Expand Down
3 changes: 3 additions & 0 deletions relay-metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
//! ...
//! ```
//!
//! Note that the name format used in the statsd protocol is different from the MRI: Metric names
//! are not prefixed with `<ty>:` as the type is somewhere else in the protocol.
//!
//! The timestamp in the item header is used to send backdated metrics. If it is omitted,
//! the `received` time of the envelope is assumed.
//!
Expand Down
120 changes: 70 additions & 50 deletions relay-metrics/src/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::borrow::Cow;
use std::collections::BTreeMap;
use std::fmt;
use std::iter::FusedIterator;
Expand Down Expand Up @@ -167,20 +166,27 @@ fn is_valid_name(name: &str) -> bool {

/// The namespace of a metric.
///
/// This successfully deserializes any kind of string, but in reality only `"sessions"` and
/// `"transactions"` is supported. Everything else is dropped both in the metrics aggregator and in
/// the store actor.
/// Namespaces allow to identify the product entity that the metric got extracted from, and/or
/// identify the use case that the metric belongs to. These namespaces cannot be defined freely,
/// instead they are defined by Sentry. Over time, there will be more namespaces as we introduce
/// new metrics-based products.
///
/// Right now this successfully deserializes any kind of string, but in reality only `"sessions"`
/// (for release health) and `"transactions"` (for metrics-enhanced performance) is supported.
/// Everything else is dropped both in the metrics aggregator and in the store actor.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum MetricNamespace {
/// Metrics extracted from sessions.
Sessions,
/// Metrics extracted from transaction events.
Transactions,
/// Metrics that relay doesn't know the namespace of, and will drop before aggregating.
/// Metrics that relay either doesn't know or recognize the namespace of, will be dropped before
/// aggregating. For instance, an MRI of `c:something_new/foo@none` has the namespace
/// `something_new`, but as Relay doesn't support that namespace, it gets deserialized into
/// this variant.
///
/// We could make this variant contain a string such that customer and PoP-relays can forward
/// unknown namespaces, but decided against it for now because there was no obvious usecase for
/// it that didn't require a Relay update anyway.
/// Relay currently drops all metrics whose namespace ends up being deserialized as
/// `unsupported`. We may revise that in the future.
Unsupported,
}

Expand Down Expand Up @@ -215,19 +221,19 @@ impl fmt::Display for MetricNamespace {
pub struct MetricResourceIdentifier<'a> {
/// The metric type.
pub ty: MetricType,
/// The namespace/usecase for this metric. For example `sessions` or `transactions`.
/// The namespace/usecase for this metric. For example `sessions` or `transactions`. In the
/// case of the statsd protocol, a missing namespace is converted into the valueconverted into
/// the value `"custom"`.
pub namespace: MetricNamespace,
/// The actual name, such as `duration` as part of `d:transactions/duration@ms`
pub name: Cow<'a, str>,
pub name: &'a str,
/// The metric unit.
pub unit: MetricUnit,
}

impl<'a> std::str::FromStr for MetricResourceIdentifier<'a> {
type Err = ParseMetricError;

impl<'a> MetricResourceIdentifier<'a> {
/// Parses and validates an MRI of the form `<ty>:<ns>/<name>@<unit>`
fn from_str(name: &str) -> Result<Self, Self::Err> {
pub fn parse(name: &'a str) -> Result<Self, ParseMetricError> {
let (raw_ty, rest) = name.split_once(':').ok_or(ParseMetricError(()))?;
let ty = raw_ty.parse()?;

Expand All @@ -237,7 +243,7 @@ impl<'a> std::str::FromStr for MetricResourceIdentifier<'a> {
Ok(Self {
ty,
namespace: raw_namespace.parse()?,
name: name.to_owned().into(),
name,
unit,
})
}
Expand Down Expand Up @@ -419,26 +425,35 @@ fn parse_tags(string: &str) -> Option<BTreeMap<String, String>> {

#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
pub struct Metric {
/// The name of the metric, i.e. the MRI.
/// The metric resource identifier, in short MRI.
///
/// The metric name historically has been a freeform string. Over time we realized that we can
/// use the metric name to encode things like metric unit, type, etc in order to force
/// downstream systems to bucket by those properties, without having to update them. This is
/// how the concept of the Metrics Resource Identifier was born.
/// MRIs follow three core principles:

/// 1. **Robustness:** Metrics must be addressed via a stable identifier. During ingestion in
/// Relay and Snuba, metrics are preaggregated and bucketed based on this identifier, so it
/// cannot change over time without breaking bucketing.
/// 2. **Uniqueness:** The identifier for metrics must be unique across variations of units and
/// metric types, within and across use cases, as well as between projects and
/// organizations.
/// 3. **Abstraction:** The user-facing product changes its terminology over time, and splits
/// concepts into smaller parts. The internal metric identifiers must abstract from that,
/// and offer sufficient granularity to allow for such changes.
///
/// An MRI has the following format: `<type>:<namespace>/<name>@<unit>`
/// MRIs have the format `<type>:<ns>/<name>@<unit>`, comprising the following components:
///
/// * **Type:** counter (`c`), set (`s`), distribution (`d`), gauge (`g`), and evaluated (`e`)
/// for derived numeric metrics (the latter is a pure query-time construct and is not relevant
/// to Relay or ingestion). See [`MetricType`].
/// * **Namespace:** Identifying the product entity and use case affiliation of the metric. See
/// [`MetricNamespace`].
/// * **Name:** The display name of the metric in the allowed character set.
/// * **Unit:** The verbatim unit name. See [`MetricUnit`].
///
/// * `type` is the single-letter representation of the metric's type, e.g. one of `d`, `s`, `c`.
/// * `namespace` is one of the values representable by `MetricNamespace`.
/// * `name` cannot be empty, must start with a letter and can consist of ASCII alphanumerics, underscores, slashes, @s and periods.
/// * `unit` is one of the values representable by `MetricUnit`.
///
/// For parsing and normalization, the [`MetricResourceIdentifier`] struct is used in the aggregator. It is
/// also used in the kafka producer to route certain namespaces to certain topics.
///
/// Note that the format used in the statsd protocol is different: Metric names are not prefixed
/// with `<ty>:` as the type is somewhere else in the protocol.
/// Parsing a metric (or set of metrics) should not fail hard if the MRI is invalid, so this is
/// typed as string. Later in the metrics aggregator, the MRI is parsed using
/// [`MetricResourceIdentifier`] and validated for invalid characters as well.
/// [`MetricResourceIdentifier`] is also used in the kafka producer to route certain namespaces
/// to certain topics.
pub name: String,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Same as for bucket: Should we make this a MetricsResourceIdentifier? With appropriate serializing logic so it still serializes as a string.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Closing the conversation here in favor of the one on bucket.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should capture why we use String over MetricsResourceIdentifier in the docstrings, based on this convo.

/// The value of the metric.
///
Expand Down Expand Up @@ -473,7 +488,7 @@ impl Metric {
/// ensures that just the name determines correct bucketing of metrics with name collisions.
pub fn new_mri(
namespace: MetricNamespace,
name: impl fmt::Display,
name: impl AsRef<str>,
unit: MetricUnit,
value: MetricValue,
timestamp: UnixTimestamp,
Expand All @@ -482,7 +497,7 @@ impl Metric {
Self {
name: MetricResourceIdentifier {
ty: value.ty(),
name: name.to_string().into(),
name: name.as_ref(),
namespace,
unit,
}
Expand All @@ -498,26 +513,31 @@ impl Metric {
/// [<ns>/]<name>[@<unit>]:<value>|<type>[|#<tags>]`
/// ```
fn parse_str(string: &str, timestamp: UnixTimestamp) -> Option<Self> {
let (raw_namespace, rest) = string.split_once('/').unwrap_or(("custom", string));
let (raw_name_unit_value, rest) = rest.split_once('|')?;
let (raw_ty, rest) = rest.split_once('|').unwrap_or((rest, ""));
let ty = raw_ty.parse().ok()?;
let (name, unit, value) = parse_name_unit_value(raw_name_unit_value, ty)?;

let tags = if rest.starts_with('#') {
parse_tags(rest.get(1..)?)?
} else {
BTreeMap::new()
};

Some(Self::new_mri(
let mut components = string.split('|');

let name_value_str = components.next()?;
let ty = components.next().and_then(|s| s.parse().ok())?;
let (name_and_namespace, unit, value) = parse_name_unit_value(name_value_str, ty)?;
let (raw_namespace, name) = name_and_namespace
.split_once('/')
.unwrap_or(("custom", string));

let mut metric = Self::new_mri(
raw_namespace.parse().ok()?,
name,
unit,
value,
timestamp,
tags,
))
BTreeMap::new(),
);

for component in components {
if let Some('#') = component.chars().next() {
metric.tags = parse_tags(component.get(1..)?)?;
}
}

Some(metric)
}

/// Parses a single metric value from the raw protocol.
Expand Down Expand Up @@ -709,7 +729,7 @@ mod tests {
let s = "transactions/foo@second:17.5|d";
let timestamp = UnixTimestamp::from_secs(4711);
let metric = Metric::parse(s.as_bytes(), timestamp).unwrap();
let mri: MetricResourceIdentifier = metric.name.parse().unwrap();
let mri = MetricResourceIdentifier::parse(&metric.name).unwrap();
assert_eq!(mri.unit, MetricUnit::Duration(DurationUnit::Second));
}

Expand All @@ -718,7 +738,7 @@ mod tests {
let s = "transactions/foo@s:17.5|d";
let timestamp = UnixTimestamp::from_secs(4711);
let metric = Metric::parse(s.as_bytes(), timestamp).unwrap();
let mri: MetricResourceIdentifier = metric.name.parse().unwrap();
let mri = MetricResourceIdentifier::parse(&metric.name).unwrap();
assert_eq!(mri.unit, MetricUnit::Duration(DurationUnit::Second));
}

Expand Down
27 changes: 13 additions & 14 deletions relay-server/src/actors/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,20 +385,19 @@ impl StoreForwarder {
}

fn send_metric_message(&self, message: MetricKafkaMessage) -> Result<(), StoreError> {
let topic = match message.name.parse() {
Ok(MetricResourceIdentifier {
namespace: MetricNamespace::Transactions,
..
}) => KafkaTopic::MetricsTransactions,
Ok(MetricResourceIdentifier {
namespace: MetricNamespace::Sessions,
..
}) => KafkaTopic::MetricsSessions,
_ => {
relay_log::configure_scope(|scope| {
scope.set_extra("metric_message.name", message.name.into());
});
relay_log::error!("Store actor dropping unknown metric usecase");
let mri = MetricResourceIdentifier::parse(&message.name);
let topic = match mri.map(|mri| mri.namespace) {
Ok(MetricNamespace::Transactions) => KafkaTopic::MetricsTransactions,
Ok(MetricNamespace::Sessions) => KafkaTopic::MetricsSessions,
Ok(MetricNamespace::Unsupported) | Err(_) => {
relay_log::with_scope(
|scope| {
scope.set_extra("metric_message.name", message.name.into());
},
|| {
relay_log::error!("Store actor dropping unknown metric usecase");
},
);
return Ok(());
}
};
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/metrics_extraction/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ fn extract_transaction_metrics_inner(

push_metric(Metric::new_mri(
METRIC_NAMESPACE,
format_args!("measurements.{}", name),
format!("measurements.{}", name),
stated_unit.or(default_unit).unwrap_or_default(),
MetricValue::Distribution(value),
unix_timestamp,
Expand Down Expand Up @@ -364,7 +364,7 @@ fn extract_transaction_metrics_inner(

push_metric(Metric::new_mri(
METRIC_NAMESPACE,
format_args!("breakdowns.{}.{}", breakdown, measurement_name),
format!("breakdowns.{}.{}", breakdown, measurement_name),
unit.copied().unwrap_or(MetricUnit::None),
MetricValue::Distribution(value),
unix_timestamp,
Expand Down