-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(metrics): Add metrics buffering and sampling #821
Changes from all commits
429966f
7464415
2cf8426
feb29ee
77129c2
11e50e0
f0e34dd
af6085f
c5d05d8
08d3852
b0b0edc
99cd298
04e1ece
fd40ce2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,7 +24,7 @@ | |
//! # use std::collections::BTreeMap; | ||
//! use relay_common::metrics; | ||
//! | ||
//! metrics::configure_statsd("myprefix", "localhost:8125", BTreeMap::new()); | ||
//! metrics::configure_statsd("myprefix", "localhost:8125", BTreeMap::new(), true, 1.0); | ||
//! ``` | ||
//! | ||
//! ## Macro Usage | ||
|
@@ -60,13 +60,21 @@ | |
//! [Metric Types]: https://github.com/statsd/statsd/blob/master/docs/metric_types.md | ||
|
||
use std::collections::BTreeMap; | ||
use std::net::ToSocketAddrs; | ||
use std::net::{ToSocketAddrs, UdpSocket}; | ||
use std::ops::{Deref, DerefMut}; | ||
use std::sync::Arc; | ||
|
||
use cadence::{Metric, MetricBuilder, StatsdClient}; | ||
use cadence::{ | ||
BufferedUdpMetricSink, Metric, MetricBuilder, QueuingMetricSink, StatsdClient, UdpMetricSink, | ||
}; | ||
use lazy_static::lazy_static; | ||
use parking_lot::RwLock; | ||
use rand::distributions::{Distribution, Uniform}; | ||
|
||
use crate::LogError; | ||
|
||
/// Maximum number of metric events that can be queued before we start dropping them | ||
const METRICS_MAX_QUEUE_SIZE: usize = 100_000; | ||
|
||
/// Client configuration object to store globally. | ||
#[derive(Debug)] | ||
|
@@ -75,6 +83,8 @@ pub struct MetricsClient { | |
pub statsd_client: StatsdClient, | ||
/// Default tags to apply to every metric | ||
pub default_tags: BTreeMap<String, String>, | ||
/// Global sample rate | ||
pub sample_rate: f32, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note: After merging, this needs to be added to docs as well. |
||
} | ||
|
||
impl Deref for MetricsClient { | ||
|
@@ -98,11 +108,37 @@ impl MetricsClient { | |
where | ||
T: Metric + From<String>, | ||
{ | ||
if !self._should_send() { | ||
return; | ||
} | ||
|
||
for (k, v) in &self.default_tags { | ||
metric = metric.with_tag(k, v); | ||
} | ||
|
||
metric.send(); | ||
if let Err(error) = metric.try_send() { | ||
log::error!( | ||
"Error sending a metric: {}, maximum capacity: {}", | ||
LogError(&error), | ||
METRICS_MAX_QUEUE_SIZE | ||
); | ||
}; | ||
} | ||
|
||
fn _should_send(&self) -> bool { | ||
if self.sample_rate <= 0.0 { | ||
false | ||
} else if self.sample_rate >= 1.0 { | ||
true | ||
} else { | ||
// Using thread local RNG and uniform distribution here because Rng::gen_range is | ||
// "optimized for the case that only a single sample is made from the given range". | ||
// See https://docs.rs/rand/0.7.3/rand/distributions/uniform/struct.Uniform.html for more | ||
// details. | ||
let mut rng = rand::thread_rng(); | ||
RNG_UNIFORM_DISTRIBUTION | ||
.with(|uniform_dist| uniform_dist.sample(&mut rng) <= self.sample_rate) | ||
} | ||
} | ||
} | ||
|
||
|
@@ -112,6 +148,7 @@ lazy_static! { | |
|
||
thread_local! { | ||
static CURRENT_CLIENT: Option<Arc<MetricsClient>> = METRICS_CLIENT.read().clone(); | ||
static RNG_UNIFORM_DISTRIBUTION: Uniform<f32> = Uniform::new(0.0, 1.0); | ||
} | ||
|
||
/// Internal prelude for the macro | ||
|
@@ -140,15 +177,46 @@ pub fn configure_statsd<A: ToSocketAddrs>( | |
prefix: &str, | ||
host: A, | ||
default_tags: BTreeMap<String, String>, | ||
buffering: bool, | ||
sample_rate: f32, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function is now sufficiently complex to become a builder. I'm happy to make the changes in a follow-up, since I'd like to move all metrics and logging related concerns from relay-common into a separate crate anyway. |
||
) { | ||
let addrs: Vec<_> = host.to_socket_addrs().unwrap().collect(); | ||
if !addrs.is_empty() { | ||
log::info!("reporting metrics to statsd at {}", addrs[0]); | ||
} | ||
let statsd_client = StatsdClient::from_udp_host(prefix, &addrs[..]).unwrap(); | ||
|
||
// Normalize sample_rate | ||
let sample_rate = sample_rate.max(0.).min(1.); | ||
log::debug!( | ||
"metrics sample rate is set to {}{}", | ||
sample_rate, | ||
if sample_rate == 0.0 { | ||
", no metrics will be reported" | ||
} else { | ||
"" | ||
} | ||
); | ||
|
||
let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); | ||
socket.set_nonblocking(true).unwrap(); | ||
tonyo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
let statsd_client = if buffering { | ||
let udp_sink = BufferedUdpMetricSink::from(host, socket).unwrap(); | ||
let queuing_sink = QueuingMetricSink::with_capacity(udp_sink, METRICS_MAX_QUEUE_SIZE); | ||
StatsdClient::from_sink(prefix, queuing_sink) | ||
} else { | ||
let simple_sink = UdpMetricSink::from(host, socket).unwrap(); | ||
StatsdClient::from_sink(prefix, simple_sink) | ||
}; | ||
log::debug!( | ||
"metrics buffering is {}", | ||
if buffering { "enabled" } else { "disabled" } | ||
); | ||
|
||
set_client(MetricsClient { | ||
statsd_client, | ||
default_tags, | ||
sample_rate, | ||
}); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is unrelated, can we revert?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found it useful to see what we're actually building, it's the same behavior we have for
test
andlint
targets.