From 4abe34159fc91e91ed4de0423b3783a570fe275c Mon Sep 17 00:00:00 2001 From: Riccardo Busetti <riccardob36@gmail.com> Date: Mon, 24 Feb 2025 09:41:56 +0100 Subject: [PATCH 01/14] feat(pool): Use new AsyncPool for the processor and store services --- Cargo.lock | 1 + relay-config/src/config.rs | 14 +- relay-server/Cargo.toml | 1 + relay-server/src/service.rs | 19 +- relay-server/src/services/processor.rs | 26 +- relay-server/src/services/store.rs | 21 +- relay-server/src/testutils.rs | 9 +- relay-server/src/utils/thread_pool.rs | 345 +++++++++++-------------- 8 files changed, 211 insertions(+), 225 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6014c10c408..4974c2d5fd0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4007,6 +4007,7 @@ dependencies = [ "relay-statsd", "relay-system", "relay-test", + "relay-threading", "reqwest", "rmp-serde", "semver", diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 8561a297bf4..fcfeb6a9fa2 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -638,6 +638,11 @@ pub struct Limits { /// The total number of threads spawned will roughly be `2 * max_thread_count`. Defaults to /// the number of logical CPU cores on the host. pub max_thread_count: usize, + /// The maximum number of concurrent tasks to be run for each asynchronous thread pool. + /// + /// For each thread of the asynchronous pool, up to `max_concurrency_per_pool` futures + /// (aka tasks) will be polled simultaneously. + pub max_concurrency_per_pool: usize, /// The maximum number of seconds a query is allowed to take across retries. Individual requests /// have lower timeouts. Defaults to 30 seconds. pub query_timeout: u64, @@ -646,7 +651,7 @@ pub struct Limits { pub shutdown_timeout: u64, /// Server keep-alive timeout in seconds. /// - /// By default keep-alive is set to a 5 seconds. + /// By default, keep-alive is set to 5 seconds. pub keepalive_timeout: u64, /// Server idle timeout in seconds. /// @@ -695,6 +700,7 @@ impl Default for Limits { max_replay_uncompressed_size: ByteSize::mebibytes(100), max_replay_message_size: ByteSize::mebibytes(15), max_thread_count: num_cpus::get(), + max_concurrency_per_pool: 1, query_timeout: 30, shutdown_timeout: 10, keepalive_timeout: 5, @@ -2349,6 +2355,12 @@ impl Config { self.values.limits.max_thread_count } + /// Returns the number of tasks that will be polled simultaneously by each thread of the + /// asynchronous pool. + pub fn pool_tasks_concurrency(&self) -> usize { + self.values.limits.max_concurrency_per_pool + } + /// Returns the maximum size of a project config query. pub fn query_batch_size(&self) -> usize { self.values.cache.batch_size diff --git a/relay-server/Cargo.toml b/relay-server/Cargo.toml index 5e2cbe06253..4252c940e44 100644 --- a/relay-server/Cargo.toml +++ b/relay-server/Cargo.toml @@ -88,6 +88,7 @@ relay-sampling = { workspace = true } relay-spans = { workspace = true } relay-statsd = { workspace = true } relay-system = { workspace = true } +relay-threading = { workspace = true } reqwest = { workspace = true, features = [ "gzip", "hickory-dns", diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index a621a0d4d11..095ea83006f 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -13,13 +13,15 @@ use crate::services::health_check::{HealthCheck, HealthCheckService}; use crate::services::metrics::RouterService; use crate::services::outcome::{OutcomeProducer, OutcomeProducerService, TrackOutcome}; use crate::services::outcome_aggregator::OutcomeAggregator; -use crate::services::processor::{self, EnvelopeProcessor, EnvelopeProcessorService}; +use crate::services::processor::{ + self, EnvelopeProcessor, EnvelopeProcessorService, EnvelopeProcessorServicePool, +}; use crate::services::projects::cache::{ProjectCacheHandle, ProjectCacheService}; use crate::services::projects::source::ProjectSource; use crate::services::relays::{RelayCache, RelayCacheService}; use crate::services::stats::RelayStats; #[cfg(feature = "processing")] -use crate::services::store::StoreService; +use crate::services::store::{StoreService, StoreServicePool}; use crate::services::test_store::{TestStore, TestStoreService}; use crate::services::upstream::{UpstreamRelay, UpstreamRelayService}; use crate::utils::{MemoryChecker, MemoryStat, ThreadKind}; @@ -28,7 +30,6 @@ use anyhow::Context; use anyhow::Result; use axum::extract::FromRequestParts; use axum::http::request::Parts; -use rayon::ThreadPool; use relay_cogs::Cogs; use relay_config::Config; #[cfg(feature = "processing")] @@ -97,7 +98,7 @@ pub fn create_runtime(name: &'static str, threads: usize) -> relay_system::Runti .build() } -fn create_processor_pool(config: &Config) -> Result<ThreadPool> { +fn create_processor_pool(config: &Config) -> Result<EnvelopeProcessorServicePool> { // Adjust thread count for small cpu counts to not have too many idle cores // and distribute workload better. let thread_count = match config.cpu_concurrency() { @@ -107,17 +108,17 @@ fn create_processor_pool(config: &Config) -> Result<ThreadPool> { }; relay_log::info!("starting {thread_count} envelope processing workers"); - let pool = crate::utils::ThreadPoolBuilder::new("processor") + let pool = crate::utils::ThreadPoolBuilder::new("processor", tokio::runtime::Handle::current()) .num_threads(thread_count) + .max_concurrency(config.pool_tasks_concurrency()) .thread_kind(ThreadKind::Worker) - .runtime(tokio::runtime::Handle::current()) .build()?; Ok(pool) } #[cfg(feature = "processing")] -fn create_store_pool(config: &Config) -> Result<ThreadPool> { +fn create_store_pool(config: &Config) -> Result<StoreServicePool> { // Spawn a store worker for every 12 threads in the processor pool. // This ratio was found empirically and may need adjustments in the future. // @@ -126,9 +127,9 @@ fn create_store_pool(config: &Config) -> Result<ThreadPool> { let thread_count = config.cpu_concurrency().div_ceil(12); relay_log::info!("starting {thread_count} store workers"); - let pool = crate::utils::ThreadPoolBuilder::new("store") + let pool = crate::utils::ThreadPoolBuilder::new("store", tokio::runtime::Handle::current()) .num_threads(thread_count) - .runtime(tokio::runtime::Handle::current()) + .max_concurrency(config.pool_tasks_concurrency()) .build()?; Ok(pool) diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index cf1de45982d..33aae4b7fcc 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -14,6 +14,8 @@ use bytes::Bytes; use chrono::{DateTime, Utc}; use flate2::write::{GzEncoder, ZlibEncoder}; use flate2::Compression; +use futures::future::BoxFuture; +use futures::FutureExt; use relay_base_schema::project::{ProjectId, ProjectKey}; use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token}; use relay_common::time::UnixTimestamp; @@ -60,10 +62,10 @@ use crate::services::upstream::{ }; use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers}; use crate::utils::{ - self, InvalidProcessingGroupType, ManagedEnvelope, SamplingResult, ThreadPool, TypedEnvelope, - WorkerGroup, + self, InvalidProcessingGroupType, ManagedEnvelope, SamplingResult, TypedEnvelope, }; use relay_base_schema::organization::OrganizationId; +use relay_threading::AsyncPool; #[cfg(feature = "processing")] use { crate::services::store::{Store, StoreEnvelope}, @@ -1078,6 +1080,9 @@ impl FromMessage<SubmitClientReports> for EnvelopeProcessor { } } +/// The asynchronous thread pool used for scheduling processing tasks in the processor. +pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>; + /// Service implementing the [`EnvelopeProcessor`] interface. /// /// This service handles messages in a worker pool with configurable concurrency. @@ -1110,7 +1115,7 @@ impl Default for Addrs { } struct InnerProcessor { - workers: WorkerGroup, + pool: EnvelopeProcessorServicePool, config: Arc<Config>, global_config: GlobalConfigHandle, project_cache: ProjectCacheHandle, @@ -1130,7 +1135,7 @@ impl EnvelopeProcessorService { /// Creates a multi-threaded envelope processor. #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))] pub fn new( - pool: ThreadPool, + pool: EnvelopeProcessorServicePool, config: Arc<Config>, global_config: GlobalConfigHandle, project_cache: ProjectCacheHandle, @@ -1160,7 +1165,7 @@ impl EnvelopeProcessorService { }; let inner = InnerProcessor { - workers: WorkerGroup::new(pool), + pool, global_config, project_cache, cogs, @@ -3085,7 +3090,7 @@ impl EnvelopeProcessorService { self.inner.rate_limiter.is_some() } - fn handle_message(&self, message: EnvelopeProcessor) { + async fn handle_message(&self, message: EnvelopeProcessor) { let ty = message.variant(); let feature_weights = self.feature_weights(&message); @@ -3157,8 +3162,13 @@ impl Service for EnvelopeProcessorService { while let Some(message) = rx.recv().await { let service = self.clone(); self.inner - .workers - .spawn(move || service.handle_message(message)) + .pool + .spawn_async( + async move { + service.handle_message(message).await; + } + .boxed(), + ) .await; } } diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index b9256c38b9c..61faa2e9c23 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -10,12 +10,15 @@ use std::sync::Arc; use bytes::Bytes; use chrono::{DateTime, Utc}; +use futures::future::BoxFuture; +use futures::FutureExt; use relay_base_schema::data_category::DataCategory; use relay_base_schema::project::ProjectId; use relay_common::time::UnixTimestamp; use relay_config::Config; use relay_event_schema::protocol::{EventId, VALID_PLATFORMS}; +use crate::envelope::{AttachmentType, Envelope, Item, ItemType}; use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message}; use relay_metrics::{ Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, FiniteF64, GaugeValue, @@ -24,13 +27,12 @@ use relay_metrics::{ use relay_quotas::Scoping; use relay_statsd::metric; use relay_system::{Addr, FromMessage, Interface, NoResponse, Service}; +use relay_threading::AsyncPool; use serde::{Deserialize, Serialize}; use serde_json::value::RawValue; use serde_json::Deserializer; use uuid::Uuid; -use crate::envelope::{AttachmentType, Envelope, Item, ItemType}; - use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes}; use crate::service::ServiceError; use crate::services::global_config::GlobalConfigHandle; @@ -91,6 +93,9 @@ pub struct StoreMetrics { pub retention: u16, } +/// The asynchronous thread pool used for scheduling storing tasks in the envelope store. +pub type StoreServicePool = AsyncPool<BoxFuture<'static, ()>>; + /// Service interface for the [`StoreEnvelope`] message. #[derive(Debug)] pub enum Store { @@ -128,7 +133,7 @@ impl FromMessage<StoreMetrics> for Store { /// Service implementing the [`Store`] interface. pub struct StoreService { - workers: WorkerGroup, + pool: StoreServicePool, config: Arc<Config>, global_config: GlobalConfigHandle, outcome_aggregator: Addr<TrackOutcome>, @@ -138,7 +143,7 @@ pub struct StoreService { impl StoreService { pub fn create( - pool: ThreadPool, + pool: StoreServicePool, config: Arc<Config>, global_config: GlobalConfigHandle, outcome_aggregator: Addr<TrackOutcome>, @@ -146,7 +151,7 @@ impl StoreService { ) -> anyhow::Result<Self> { let producer = Producer::create(&config)?; Ok(Self { - workers: WorkerGroup::new(pool), + pool, config, global_config, outcome_aggregator, @@ -1049,8 +1054,10 @@ impl Service for StoreService { while let Some(message) = rx.recv().await { let service = Arc::clone(&this); - this.workers - .spawn(move || service.handle_message(message)) + // For now, we run each task synchronously, in the future we might explore how to make + // the store async. + this.pool + .spawn_async(async move { service.handle_message(message) }.boxed()) .await; } diff --git a/relay-server/src/testutils.rs b/relay-server/src/testutils.rs index a4921378089..6dd616fcfd6 100644 --- a/relay-server/src/testutils.rs +++ b/relay-server/src/testutils.rs @@ -18,11 +18,11 @@ use crate::metrics::{MetricOutcomes, MetricStats}; use crate::service::create_redis_pools; use crate::services::global_config::GlobalConfigHandle; use crate::services::outcome::TrackOutcome; -use crate::services::processor::{self, EnvelopeProcessorService}; +use crate::services::processor::{self, EnvelopeProcessorService, EnvelopeProcessorServicePool}; use crate::services::projects::cache::ProjectCacheHandle; use crate::services::projects::project::ProjectInfo; use crate::services::test_store::TestStore; -use crate::utils::{ThreadPool, ThreadPoolBuilder}; +use crate::utils::ThreadPoolBuilder; pub fn state_with_rule_and_condition( sample_rate: Option<f64>, @@ -174,10 +174,9 @@ pub fn processor_services() -> (Addr<TrackOutcome>, Addr<TestStore>) { (outcome_aggregator, test_store) } -fn create_processor_pool() -> ThreadPool { - ThreadPoolBuilder::new("processor") +fn create_processor_pool() -> EnvelopeProcessorServicePool { + ThreadPoolBuilder::new("processor", tokio::runtime::Handle::current()) .num_threads(1) - .runtime(tokio::runtime::Handle::current()) .build() .unwrap() } diff --git a/relay-server/src/utils/thread_pool.rs b/relay-server/src/utils/thread_pool.rs index e6b6650318f..1f128743bac 100644 --- a/relay-server/src/utils/thread_pool.rs +++ b/relay-server/src/utils/thread_pool.rs @@ -1,9 +1,9 @@ -use std::sync::Arc; -use std::thread; +use std::future::Future; +use std::{io, thread}; + use tokio::runtime::Handle; -pub use rayon::{ThreadPool, ThreadPoolBuildError}; -use tokio::sync::Semaphore; +use relay_threading::{AsyncPool, AsyncPoolBuilder}; /// A thread kind. /// @@ -17,126 +17,87 @@ pub enum ThreadKind { Worker, } -/// Used to create a new [`ThreadPool`] thread pool. +/// Used to create a new [`AsyncPool`] thread pool. pub struct ThreadPoolBuilder { name: &'static str, - runtime: Option<Handle>, + runtime: Handle, num_threads: usize, + max_concurrency: usize, kind: ThreadKind, } impl ThreadPoolBuilder { /// Creates a new named thread pool builder. - pub fn new(name: &'static str) -> Self { + pub fn new(name: &'static str, runtime: Handle) -> Self { Self { name, - runtime: None, + runtime, num_threads: 0, + max_concurrency: 1, kind: ThreadKind::Default, } } /// Sets the number of threads to be used in the rayon thread-pool. /// - /// See also [`rayon::ThreadPoolBuilder::num_threads`]. + /// See also [`AsyncPoolBuilder::num_threads`]. pub fn num_threads(mut self, num_threads: usize) -> Self { self.num_threads = num_threads; self } - /// Configures the [`ThreadKind`] for all threads spawned in the pool. - pub fn thread_kind(mut self, kind: ThreadKind) -> Self { - self.kind = kind; + /// Sets the maximum number of concurrent tasks per thread. + /// + /// See also [`AsyncPoolBuilder::max_concurrency`]. + pub fn max_concurrency(mut self, max_concurrency: usize) -> Self { + self.max_concurrency = max_concurrency; self } - /// Sets the Tokio runtime which will be made available in the workers. - pub fn runtime(mut self, runtime: Handle) -> Self { - self.runtime = Some(runtime); + /// Configures the [`ThreadKind`] for all threads spawned in the pool. + pub fn thread_kind(mut self, kind: ThreadKind) -> Self { + self.kind = kind; self } /// Creates and returns the thread pool. - pub fn build(self) -> Result<ThreadPool, ThreadPoolBuildError> { - rayon::ThreadPoolBuilder::new() + pub fn build<F>(self) -> Result<AsyncPool<F>, io::Error> + where + F: Future<Output = ()> + Send + 'static, + { + AsyncPoolBuilder::new(self.runtime) .num_threads(self.num_threads) + .max_concurrency(self.max_concurrency) .thread_name(move |id| format!("pool-{name}-{id}", name = self.name)) - // In case of panic, log that there was a panic but keep the thread alive and don't - // exist. - .panic_handler(move |_panic| { - relay_log::error!("thread in pool {name} paniced!", name = self.name) + // In case of panic in a task sent to the pool, we catch it to continue the remaining + // work and just log an error. + .task_panic_handler(move |_panic| { + relay_log::error!( + "task in pool {name} panicked, other tasks will continue execution", + name = self.name + ) + }) + // In case of panic in the thread, log it. After a panic in the thread, it will stop. + .thread_panic_handler(move |_panic| { + relay_log::error!("thread in pool {name} panicked", name = self.name) }) .spawn_handler(|thread| { let mut b = thread::Builder::new(); + if let Some(name) = thread.name() { b = b.name(name.to_owned()); } - if let Some(stack_size) = thread.stack_size() { - b = b.stack_size(stack_size); - } - let runtime = self.runtime.clone(); b.spawn(move || { set_current_thread_priority(self.kind); - let _guard = runtime.as_ref().map(|runtime| runtime.enter()); thread.run() })?; + Ok(()) }) .build() } } -/// A [`WorkerGroup`] adds an async back-pressure mechanism to a [`ThreadPool`]. -pub struct WorkerGroup { - pool: ThreadPool, - semaphore: Arc<Semaphore>, -} - -impl WorkerGroup { - /// Creates a new worker group from a thread pool. - pub fn new(pool: ThreadPool) -> Self { - // Use `current_num_threads() * 2` to guarantee all threads immediately have a new item to work on. - let semaphore = Arc::new(Semaphore::new(pool.current_num_threads() * 2)); - Self { pool, semaphore } - } - - /// Spawns an asynchronous task on the thread pool. - /// - /// If the thread pool is saturated the returned future is pending until - /// the thread pool has capacity to work on the task. - /// - /// # Examples: - /// - /// ```ignore - /// # async fn test(mut messages: tokio::sync::mpsc::Receiver<()>) { - /// # use relay_server::utils::{WorkerGroup, ThreadPoolBuilder}; - /// # use std::thread; - /// # use std::time::Duration; - /// # let pool = ThreadPoolBuilder::new("test").num_threads(1).build().unwrap(); - /// let workers = WorkerGroup::new(pool); - /// - /// while let Some(message) = messages.recv().await { - /// workers.spawn(move || { - /// thread::sleep(Duration::from_secs(1)); - /// println!("worked on message {message:?}") - /// }).await; - /// } - /// # } - /// ``` - pub async fn spawn(&self, op: impl FnOnce() + Send + 'static) { - let semaphore = Arc::clone(&self.semaphore); - let permit = semaphore - .acquire_owned() - .await - .expect("the semaphore is never closed"); - - self.pool.spawn(move || { - op(); - drop(permit); - }); - } -} - #[cfg(unix)] fn set_current_thread_priority(kind: ThreadKind) { // Lower priorities cause more favorable scheduling. @@ -173,123 +134,117 @@ fn set_current_thread_priority(_kind: ThreadKind) { #[cfg(test)] mod tests { - use std::sync::Barrier; - use std::time::Duration; - - use futures::FutureExt; - - use super::*; - - #[test] - fn test_thread_pool_num_threads() { - let pool = ThreadPoolBuilder::new("s").num_threads(3).build().unwrap(); - assert_eq!(pool.current_num_threads(), 3); - } - - #[test] - fn test_thread_pool_runtime() { - let rt = tokio::runtime::Runtime::new().unwrap(); - - let pool = ThreadPoolBuilder::new("s") - .num_threads(1) - .runtime(rt.handle().clone()) - .build() - .unwrap(); - - let has_runtime = pool.install(|| tokio::runtime::Handle::try_current().is_ok()); - assert!(has_runtime); - } - - #[test] - fn test_thread_pool_no_runtime() { - let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap(); - - let has_runtime = pool.install(|| tokio::runtime::Handle::try_current().is_ok()); - assert!(!has_runtime); - } - - #[test] - fn test_thread_pool_panic() { - let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap(); - let barrier = Arc::new(Barrier::new(2)); - pool.spawn({ - let barrier = Arc::clone(&barrier); - move || { - barrier.wait(); - panic!(); - } - }); - barrier.wait(); - - pool.spawn({ - let barrier = Arc::clone(&barrier); - move || { - barrier.wait(); - } - }); - barrier.wait(); - } - - #[test] - #[cfg(unix)] - fn test_thread_pool_priority() { - fn get_current_priority() -> i32 { - unsafe { libc::getpriority(libc::PRIO_PROCESS, 0) } - } - - let default_prio = get_current_priority(); - - { - let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap(); - let prio = pool.install(get_current_priority); - // Default pool priority must match current priority. - assert_eq!(prio, default_prio); - } - - { - let pool = ThreadPoolBuilder::new("s") - .num_threads(1) - .thread_kind(ThreadKind::Worker) - .build() - .unwrap(); - let prio = pool.install(get_current_priority); - // Worker must be higher than the default priority (higher number = lower priority). - assert!(prio > default_prio); - } - } - - #[test] - fn test_worker_group_backpressure() { - let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap(); - let workers = WorkerGroup::new(pool); - - // Num Threads * 2 is the limit after backpressure kicks in - let barrier = Arc::new(Barrier::new(2)); - - let spawn = || { - let barrier = Arc::clone(&barrier); - workers - .spawn(move || { - barrier.wait(); - }) - .now_or_never() - .is_some() - }; - - for _ in 0..15 { - // Pool should accept two immediately. - assert!(spawn()); - assert!(spawn()); - // Pool should reject because there are already 2 tasks active. - assert!(!spawn()); - - // Unblock the barrier - barrier.wait(); // first spawn - barrier.wait(); // second spawn - - // wait a tiny bit to make sure the semaphore handle is dropped - thread::sleep(Duration::from_millis(50)); - } - } + // #[test] + // fn test_thread_pool_num_threads() { + // let pool = ThreadPoolBuilder::new("s").num_threads(3).build().unwrap(); + // assert_eq!(pool.current_num_threads(), 3); + // } + // + // #[test] + // fn test_thread_pool_runtime() { + // let rt = tokio::runtime::Runtime::new().unwrap(); + // + // let pool = ThreadPoolBuilder::new("s") + // .num_threads(1) + // .runtime(rt.handle().clone()) + // .build() + // .unwrap(); + // + // let has_runtime = pool.install(|| tokio::runtime::Handle::try_current().is_ok()); + // assert!(has_runtime); + // } + // + // #[test] + // fn test_thread_pool_no_runtime() { + // let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap(); + // + // let has_runtime = pool.install(|| tokio::runtime::Handle::try_current().is_ok()); + // assert!(!has_runtime); + // } + // + // #[test] + // fn test_thread_pool_panic() { + // let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap(); + // let barrier = Arc::new(Barrier::new(2)); + // + // pool.spawn({ + // let barrier = Arc::clone(&barrier); + // move || { + // barrier.wait(); + // panic!(); + // } + // }); + // barrier.wait(); + // + // pool.spawn({ + // let barrier = Arc::clone(&barrier); + // move || { + // barrier.wait(); + // } + // }); + // barrier.wait(); + // } + // + // #[test] + // #[cfg(unix)] + // fn test_thread_pool_priority() { + // fn get_current_priority() -> i32 { + // unsafe { libc::getpriority(libc::PRIO_PROCESS, 0) } + // } + // + // let default_prio = get_current_priority(); + // + // { + // let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap(); + // let prio = pool.install(get_current_priority); + // // Default pool priority must match current priority. + // assert_eq!(prio, default_prio); + // } + // + // { + // let pool = ThreadPoolBuilder::new("s") + // .num_threads(1) + // .thread_kind(ThreadKind::Worker) + // .build() + // .unwrap(); + // let prio = pool.install(get_current_priority); + // // Worker must be higher than the default priority (higher number = lower priority). + // assert!(prio > default_prio); + // } + // } + // + // #[test] + // fn test_worker_group_backpressure() { + // let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap(); + // let workers = WorkerGroup::new(pool); + // + // // Num Threads * 2 is the limit after backpressure kicks in + // let barrier = Arc::new(Barrier::new(2)); + // + // let spawn = || { + // let barrier = Arc::clone(&barrier); + // workers + // .spawn(move || { + // barrier.wait(); + // }) + // .now_or_never() + // .is_some() + // }; + // + // for _ in 0..15 { + // // Pool should accept two immediately. + // assert!(spawn()); + // assert!(spawn()); + // // Pool should reject because there are already 2 tasks active. + // assert!(!spawn()); + // + // // Unblock the barrier + // barrier.wait(); // first spawn + // barrier.wait(); // second spawn + // + // // wait a tiny bit to make sure the semaphore handle is dropped + // thread::sleep(Duration::from_millis(50)); + // } + // } } From e845b369576e874c6e77d8a6eb5f4296d87a5484 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti <riccardob36@gmail.com> Date: Mon, 24 Feb 2025 09:43:43 +0100 Subject: [PATCH 02/14] Fix --- relay-server/src/services/store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 61faa2e9c23..41994745336 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -39,7 +39,7 @@ use crate::services::global_config::GlobalConfigHandle; use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome}; use crate::services::processor::Processed; use crate::statsd::{RelayCounters, RelayGauges, RelayTimers}; -use crate::utils::{FormDataIter, ThreadPool, TypedEnvelope, WorkerGroup}; +use crate::utils::{FormDataIter, TypedEnvelope}; /// Fallback name used for attachment items without a `filename` header. const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment"; From ce32fa1c45769c67ca900ecdc8bbb403efc4eafe Mon Sep 17 00:00:00 2001 From: Riccardo Busetti <riccardob36@gmail.com> Date: Mon, 24 Feb 2025 10:56:30 +0100 Subject: [PATCH 03/14] Fix tests --- relay-config/src/config.rs | 12 -- relay-server/src/service.rs | 6 +- relay-server/src/utils/thread_pool.rs | 238 +++++++++++++------------- 3 files changed, 121 insertions(+), 135 deletions(-) diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index fcfeb6a9fa2..9aa310b15a7 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -638,11 +638,6 @@ pub struct Limits { /// The total number of threads spawned will roughly be `2 * max_thread_count`. Defaults to /// the number of logical CPU cores on the host. pub max_thread_count: usize, - /// The maximum number of concurrent tasks to be run for each asynchronous thread pool. - /// - /// For each thread of the asynchronous pool, up to `max_concurrency_per_pool` futures - /// (aka tasks) will be polled simultaneously. - pub max_concurrency_per_pool: usize, /// The maximum number of seconds a query is allowed to take across retries. Individual requests /// have lower timeouts. Defaults to 30 seconds. pub query_timeout: u64, @@ -700,7 +695,6 @@ impl Default for Limits { max_replay_uncompressed_size: ByteSize::mebibytes(100), max_replay_message_size: ByteSize::mebibytes(15), max_thread_count: num_cpus::get(), - max_concurrency_per_pool: 1, query_timeout: 30, shutdown_timeout: 10, keepalive_timeout: 5, @@ -2355,12 +2349,6 @@ impl Config { self.values.limits.max_thread_count } - /// Returns the number of tasks that will be polled simultaneously by each thread of the - /// asynchronous pool. - pub fn pool_tasks_concurrency(&self) -> usize { - self.values.limits.max_concurrency_per_pool - } - /// Returns the maximum size of a project config query. pub fn query_batch_size(&self) -> usize { self.values.cache.batch_size diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 095ea83006f..7141341d889 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -24,7 +24,7 @@ use crate::services::stats::RelayStats; use crate::services::store::{StoreService, StoreServicePool}; use crate::services::test_store::{TestStore, TestStoreService}; use crate::services::upstream::{UpstreamRelay, UpstreamRelayService}; -use crate::utils::{MemoryChecker, MemoryStat, ThreadKind}; +use crate::utils::{MemoryChecker, MemoryStat, ThreadKind, ThreadWorkloadType}; #[cfg(feature = "processing")] use anyhow::Context; use anyhow::Result; @@ -110,7 +110,7 @@ fn create_processor_pool(config: &Config) -> Result<EnvelopeProcessorServicePool let pool = crate::utils::ThreadPoolBuilder::new("processor", tokio::runtime::Handle::current()) .num_threads(thread_count) - .max_concurrency(config.pool_tasks_concurrency()) + .thread_workload_type(ThreadWorkloadType::WithIO) .thread_kind(ThreadKind::Worker) .build()?; @@ -129,7 +129,7 @@ fn create_store_pool(config: &Config) -> Result<StoreServicePool> { let pool = crate::utils::ThreadPoolBuilder::new("store", tokio::runtime::Handle::current()) .num_threads(thread_count) - .max_concurrency(config.pool_tasks_concurrency()) + .thread_workload_type(ThreadWorkloadType::CpuBound) .build()?; Ok(pool) diff --git a/relay-server/src/utils/thread_pool.rs b/relay-server/src/utils/thread_pool.rs index 1f128743bac..4f1dbcce405 100644 --- a/relay-server/src/utils/thread_pool.rs +++ b/relay-server/src/utils/thread_pool.rs @@ -5,6 +5,21 @@ use tokio::runtime::Handle; use relay_threading::{AsyncPool, AsyncPoolBuilder}; +/// The workload type that a thread executes. +#[derive(Default, Debug, Clone, Copy)] +pub enum ThreadWorkloadType { + /// The thread spends some of its time doing I/O together with CPU-bound work which in the + /// context of async, would lead to the future yielding with I/O is performed. + /// + /// This is the default since it's assumed that an async pool is used mostly because we want to + /// execute asynchronous work that comprises mainly of I/O. + #[default] + WithIO, + /// The thread spends all of its time doing CPU-bound work which in the context of async, would + /// lead to the future blocking the entire executor. + CpuBound, +} + /// A thread kind. /// /// The thread kind has an effect on how threads are prioritized and scheduled. @@ -22,7 +37,7 @@ pub struct ThreadPoolBuilder { name: &'static str, runtime: Handle, num_threads: usize, - max_concurrency: usize, + workload_type: ThreadWorkloadType, kind: ThreadKind, } @@ -33,7 +48,7 @@ impl ThreadPoolBuilder { name, runtime, num_threads: 0, - max_concurrency: 1, + workload_type: ThreadWorkloadType::CpuBound, kind: ThreadKind::Default, } } @@ -46,11 +61,9 @@ impl ThreadPoolBuilder { self } - /// Sets the maximum number of concurrent tasks per thread. - /// - /// See also [`AsyncPoolBuilder::max_concurrency`]. - pub fn max_concurrency(mut self, max_concurrency: usize) -> Self { - self.max_concurrency = max_concurrency; + /// Configures the [`ThreadWorkloadType`] for all threads spawned in the pool. + pub fn thread_workload_type(mut self, workload_type: ThreadWorkloadType) -> Self { + self.workload_type = workload_type; self } @@ -65,9 +78,10 @@ impl ThreadPoolBuilder { where F: Future<Output = ()> + Send + 'static, { + let max_concurrency = self.max_concurrency(); AsyncPoolBuilder::new(self.runtime) .num_threads(self.num_threads) - .max_concurrency(self.max_concurrency) + .max_concurrency(max_concurrency) .thread_name(move |id| format!("pool-{name}-{id}", name = self.name)) // In case of panic in a task sent to the pool, we catch it to continue the remaining // work and just log an error. @@ -96,6 +110,18 @@ impl ThreadPoolBuilder { }) .build() } + + /// Computes the maximum concurrency of the async pool. + fn max_concurrency(&self) -> usize { + match self.workload_type { + // If the work in the pool has I/O, we want to be able to drive more tasks concurrently, + // so we do 10 tasks per thread (empirically determined). + ThreadWorkloadType::WithIO => 10, + // If the work in the pool is exclusively cpu bound, we want 1 task per thread since it + // won't yield being CPU-bound. + ThreadWorkloadType::CpuBound => 1, + } + } } #[cfg(unix)] @@ -134,117 +160,89 @@ fn set_current_thread_priority(_kind: ThreadKind) { #[cfg(test)] mod tests { + use crate::utils::{ThreadKind, ThreadPoolBuilder}; + use futures::FutureExt; + use std::sync::atomic::{AtomicI32, Ordering}; + use std::sync::Arc; + use tokio::runtime::Handle; + use tokio::sync::Barrier; - // #[test] - // fn test_thread_pool_num_threads() { - // let pool = ThreadPoolBuilder::new("s").num_threads(3).build().unwrap(); - // assert_eq!(pool.current_num_threads(), 3); - // } - // - // #[test] - // fn test_thread_pool_runtime() { - // let rt = tokio::runtime::Runtime::new().unwrap(); - // - // let pool = ThreadPoolBuilder::new("s") - // .num_threads(1) - // .runtime(rt.handle().clone()) - // .build() - // .unwrap(); - // - // let has_runtime = pool.install(|| tokio::runtime::Handle::try_current().is_ok()); - // assert!(has_runtime); - // } - // - // #[test] - // fn test_thread_pool_no_runtime() { - // let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap(); - // - // let has_runtime = pool.install(|| tokio::runtime::Handle::try_current().is_ok()); - // assert!(!has_runtime); - // } - // - // #[test] - // fn test_thread_pool_panic() { - // let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap(); - // let barrier = Arc::new(Barrier::new(2)); - // - // pool.spawn({ - // let barrier = Arc::clone(&barrier); - // move || { - // barrier.wait(); - // panic!(); - // } - // }); - // barrier.wait(); - // - // pool.spawn({ - // let barrier = Arc::clone(&barrier); - // move || { - // barrier.wait(); - // } - // }); - // barrier.wait(); - // } - // - // #[test] - // #[cfg(unix)] - // fn test_thread_pool_priority() { - // fn get_current_priority() -> i32 { - // unsafe { libc::getpriority(libc::PRIO_PROCESS, 0) } - // } - // - // let default_prio = get_current_priority(); - // - // { - // let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap(); - // let prio = pool.install(get_current_priority); - // // Default pool priority must match current priority. - // assert_eq!(prio, default_prio); - // } - // - // { - // let pool = ThreadPoolBuilder::new("s") - // .num_threads(1) - // .thread_kind(ThreadKind::Worker) - // .build() - // .unwrap(); - // let prio = pool.install(get_current_priority); - // // Worker must be higher than the default priority (higher number = lower priority). - // assert!(prio > default_prio); - // } - // } - // - // #[test] - // fn test_worker_group_backpressure() { - // let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap(); - // let workers = WorkerGroup::new(pool); - // - // // Num Threads * 2 is the limit after backpressure kicks in - // let barrier = Arc::new(Barrier::new(2)); - // - // let spawn = || { - // let barrier = Arc::clone(&barrier); - // workers - // .spawn(move || { - // barrier.wait(); - // }) - // .now_or_never() - // .is_some() - // }; - // - // for _ in 0..15 { - // // Pool should accept two immediately. - // assert!(spawn()); - // assert!(spawn()); - // // Pool should reject because there are already 2 tasks active. - // assert!(!spawn()); - // - // // Unblock the barrier - // barrier.wait(); // first spawn - // barrier.wait(); // second spawn - // - // // wait a tiny bit to make sure the semaphore handle is dropped - // thread::sleep(Duration::from_millis(50)); - // } - // } + #[tokio::test] + async fn test_thread_pool_panic() { + let pool = ThreadPoolBuilder::new("s", Handle::current()) + .num_threads(1) + .build() + .unwrap(); + let barrier = Arc::new(Barrier::new(2)); + + let barrier_clone = barrier.clone(); + pool.spawn( + async move { + barrier_clone.wait().await; + panic!(); + } + .boxed(), + ); + barrier.wait().await; + + let barrier_clone = barrier.clone(); + pool.spawn( + async move { + barrier_clone.wait().await; + } + .boxed(), + ); + barrier.wait().await; + } + + #[tokio::test] + #[cfg(unix)] + async fn test_thread_pool_priority() { + fn get_current_priority() -> i32 { + unsafe { libc::getpriority(libc::PRIO_PROCESS, 0) } + } + + let default_priority = get_current_priority(); + + { + let pool = ThreadPoolBuilder::new("s", Handle::current()) + .num_threads(1) + .build() + .unwrap(); + + let barrier = Arc::new(Barrier::new(2)); + let priority = Arc::new(AtomicI32::new(0)); + let barrier_clone = barrier.clone(); + let priority_clone = priority.clone(); + pool.spawn(async move { + priority_clone.store(get_current_priority(), Ordering::SeqCst); + barrier_clone.wait().await; + }); + barrier.wait().await; + + // Default pool priority must match current priority. + assert_eq!(priority.load(Ordering::SeqCst), default_priority); + } + + { + let pool = ThreadPoolBuilder::new("s", Handle::current()) + .num_threads(1) + .thread_kind(ThreadKind::Worker) + .build() + .unwrap(); + + let barrier = Arc::new(Barrier::new(2)); + let priority = Arc::new(AtomicI32::new(0)); + let barrier_clone = barrier.clone(); + let priority_clone = priority.clone(); + pool.spawn(async move { + priority_clone.store(get_current_priority(), Ordering::SeqCst); + barrier_clone.wait().await; + }); + barrier.wait().await; + + // Worker must be higher than the default priority (higher number = lower priority). + assert!(priority.load(Ordering::SeqCst) > default_priority); + } + } } From 6694b9a2942e8640c160e4c3d6435ad47bf830af Mon Sep 17 00:00:00 2001 From: Riccardo Busetti <riccardob36@gmail.com> Date: Mon, 24 Feb 2025 11:29:22 +0100 Subject: [PATCH 04/14] Fix --- relay-config/src/config.rs | 10 +++++++ relay-server/src/service.rs | 6 ++-- relay-server/src/utils/thread_pool.rs | 42 +++++---------------------- 3 files changed, 21 insertions(+), 37 deletions(-) diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 9aa310b15a7..50c3944b7ce 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -638,6 +638,10 @@ pub struct Limits { /// The total number of threads spawned will roughly be `2 * max_thread_count`. Defaults to /// the number of logical CPU cores on the host. pub max_thread_count: usize, + /// The maximum number of tasks to execute within each thread of the asynchronous pool. + /// + /// Each thread will be able to drive concurrently at most `max_pool_concurrency` futures. + pub max_pool_concurrency: usize, /// The maximum number of seconds a query is allowed to take across retries. Individual requests /// have lower timeouts. Defaults to 30 seconds. pub query_timeout: u64, @@ -695,6 +699,7 @@ impl Default for Limits { max_replay_uncompressed_size: ByteSize::mebibytes(100), max_replay_message_size: ByteSize::mebibytes(15), max_thread_count: num_cpus::get(), + max_pool_concurrency: 0, query_timeout: 30, shutdown_timeout: 10, keepalive_timeout: 5, @@ -2349,6 +2354,11 @@ impl Config { self.values.limits.max_thread_count } + /// Returns the number of tasks that can run concurrently. + pub fn pool_concurrency(&self) -> usize { + self.values.limits.max_pool_concurrency + } + /// Returns the maximum size of a project config query. pub fn query_batch_size(&self) -> usize { self.values.cache.batch_size diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 7141341d889..e0d65fcc579 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -24,7 +24,7 @@ use crate::services::stats::RelayStats; use crate::services::store::{StoreService, StoreServicePool}; use crate::services::test_store::{TestStore, TestStoreService}; use crate::services::upstream::{UpstreamRelay, UpstreamRelayService}; -use crate::utils::{MemoryChecker, MemoryStat, ThreadKind, ThreadWorkloadType}; +use crate::utils::{MemoryChecker, MemoryStat, ThreadKind}; #[cfg(feature = "processing")] use anyhow::Context; use anyhow::Result; @@ -110,7 +110,7 @@ fn create_processor_pool(config: &Config) -> Result<EnvelopeProcessorServicePool let pool = crate::utils::ThreadPoolBuilder::new("processor", tokio::runtime::Handle::current()) .num_threads(thread_count) - .thread_workload_type(ThreadWorkloadType::WithIO) + .max_concurrency(config.pool_concurrency()) .thread_kind(ThreadKind::Worker) .build()?; @@ -129,7 +129,7 @@ fn create_store_pool(config: &Config) -> Result<StoreServicePool> { let pool = crate::utils::ThreadPoolBuilder::new("store", tokio::runtime::Handle::current()) .num_threads(thread_count) - .thread_workload_type(ThreadWorkloadType::CpuBound) + .max_concurrency(config.pool_concurrency()) .build()?; Ok(pool) diff --git a/relay-server/src/utils/thread_pool.rs b/relay-server/src/utils/thread_pool.rs index 4f1dbcce405..e2267afb980 100644 --- a/relay-server/src/utils/thread_pool.rs +++ b/relay-server/src/utils/thread_pool.rs @@ -5,21 +5,6 @@ use tokio::runtime::Handle; use relay_threading::{AsyncPool, AsyncPoolBuilder}; -/// The workload type that a thread executes. -#[derive(Default, Debug, Clone, Copy)] -pub enum ThreadWorkloadType { - /// The thread spends some of its time doing I/O together with CPU-bound work which in the - /// context of async, would lead to the future yielding with I/O is performed. - /// - /// This is the default since it's assumed that an async pool is used mostly because we want to - /// execute asynchronous work that comprises mainly of I/O. - #[default] - WithIO, - /// The thread spends all of its time doing CPU-bound work which in the context of async, would - /// lead to the future blocking the entire executor. - CpuBound, -} - /// A thread kind. /// /// The thread kind has an effect on how threads are prioritized and scheduled. @@ -37,7 +22,7 @@ pub struct ThreadPoolBuilder { name: &'static str, runtime: Handle, num_threads: usize, - workload_type: ThreadWorkloadType, + max_concurrency: usize, kind: ThreadKind, } @@ -48,7 +33,7 @@ impl ThreadPoolBuilder { name, runtime, num_threads: 0, - workload_type: ThreadWorkloadType::CpuBound, + max_concurrency: 1, kind: ThreadKind::Default, } } @@ -61,9 +46,11 @@ impl ThreadPoolBuilder { self } - /// Configures the [`ThreadWorkloadType`] for all threads spawned in the pool. - pub fn thread_workload_type(mut self, workload_type: ThreadWorkloadType) -> Self { - self.workload_type = workload_type; + /// Sets the maximum number of tasks that can run concurrently per thread. + /// + /// See also [`AsyncPoolBuilder::max_concurrency`]. + pub fn max_concurrency(mut self, max_concurrency: usize) -> Self { + self.max_concurrency = max_concurrency; self } @@ -78,10 +65,9 @@ impl ThreadPoolBuilder { where F: Future<Output = ()> + Send + 'static, { - let max_concurrency = self.max_concurrency(); AsyncPoolBuilder::new(self.runtime) .num_threads(self.num_threads) - .max_concurrency(max_concurrency) + .max_concurrency(self.max_concurrency) .thread_name(move |id| format!("pool-{name}-{id}", name = self.name)) // In case of panic in a task sent to the pool, we catch it to continue the remaining // work and just log an error. @@ -110,18 +96,6 @@ impl ThreadPoolBuilder { }) .build() } - - /// Computes the maximum concurrency of the async pool. - fn max_concurrency(&self) -> usize { - match self.workload_type { - // If the work in the pool has I/O, we want to be able to drive more tasks concurrently, - // so we do 10 tasks per thread (empirically determined). - ThreadWorkloadType::WithIO => 10, - // If the work in the pool is exclusively cpu bound, we want 1 task per thread since it - // won't yield being CPU-bound. - ThreadWorkloadType::CpuBound => 1, - } - } } #[cfg(unix)] From 5a62978d69bf51602f50004dbe438140dbb341a6 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti <riccardob36@gmail.com> Date: Mon, 24 Feb 2025 15:54:12 +0100 Subject: [PATCH 05/14] Fix --- Cargo.lock | 1 + relay-server/src/utils/thread_pool.rs | 1 - relay-threading/Cargo.toml | 4 +- relay-threading/src/builder.rs | 9 +++ relay-threading/src/lib.rs | 1 + relay-threading/src/metrics.rs | 18 ++++++ relay-threading/src/multiplexing.rs | 86 ++++++++++++++++++++++++--- relay-threading/src/pool.rs | 49 +++++++++++---- 8 files changed, 147 insertions(+), 22 deletions(-) create mode 100644 relay-threading/src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 4974c2d5fd0..aa8e0a38bcb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4090,6 +4090,7 @@ dependencies = [ "flume", "futures", "pin-project-lite", + "relay-statsd", "tokio", ] diff --git a/relay-server/src/utils/thread_pool.rs b/relay-server/src/utils/thread_pool.rs index e2267afb980..4295cef8eb7 100644 --- a/relay-server/src/utils/thread_pool.rs +++ b/relay-server/src/utils/thread_pool.rs @@ -83,7 +83,6 @@ impl ThreadPoolBuilder { }) .spawn_handler(|thread| { let mut b = thread::Builder::new(); - if let Some(name) = thread.name() { b = b.name(name.to_owned()); } diff --git a/relay-threading/Cargo.toml b/relay-threading/Cargo.toml index 7e0038ab985..72ac995997f 100644 --- a/relay-threading/Cargo.toml +++ b/relay-threading/Cargo.toml @@ -10,11 +10,13 @@ license-file = "../LICENSE.md" publish = false [dependencies] -flume = { workspace = true } +flume = { workspace = true, features = ["async"] } futures = { workspace = true } tokio = { workspace = true } pin-project-lite = { workspace = true } +relay-statsd = { workspace = true } + [dev-dependencies] criterion = { workspace = true, features = ["async_tokio"] } futures = { workspace = true, features = ["executor"] } diff --git a/relay-threading/src/builder.rs b/relay-threading/src/builder.rs index 3879a1afa59..d4203e6d49d 100644 --- a/relay-threading/src/builder.rs +++ b/relay-threading/src/builder.rs @@ -16,6 +16,7 @@ pub(crate) type PanicHandler = dyn Fn(Box<dyn Any + Send>) + Send + Sync; /// and panic handling strategies. pub struct AsyncPoolBuilder<S = DefaultSpawn> { pub(crate) runtime: tokio::runtime::Handle, + pub(crate) pool_name: Option<Arc<str>>, pub(crate) thread_name: Option<Box<dyn FnMut(usize) -> String>>, pub(crate) thread_panic_handler: Option<Arc<PanicHandler>>, pub(crate) task_panic_handler: Option<Arc<PanicHandler>>, @@ -31,6 +32,7 @@ impl AsyncPoolBuilder<DefaultSpawn> { pub fn new(runtime: tokio::runtime::Handle) -> AsyncPoolBuilder<DefaultSpawn> { AsyncPoolBuilder { runtime, + pool_name: None, thread_name: None, thread_panic_handler: None, task_panic_handler: None, @@ -45,6 +47,12 @@ impl<S> AsyncPoolBuilder<S> where S: ThreadSpawn, { + /// Specifies a custom name for this pool. + pub fn pool_name(mut self, pool_name: String) -> Self { + self.pool_name = Some(pool_name.into()); + self + } + /// Specifies a custom naming convention for threads in the [`AsyncPool`]. /// /// The provided closure receives the thread's index and returns a name, @@ -91,6 +99,7 @@ where { AsyncPoolBuilder { runtime: self.runtime, + pool_name: self.pool_name, thread_name: self.thread_name, thread_panic_handler: self.thread_panic_handler, task_panic_handler: self.task_panic_handler, diff --git a/relay-threading/src/lib.rs b/relay-threading/src/lib.rs index 3c647b987a0..69355e72eb8 100644 --- a/relay-threading/src/lib.rs +++ b/relay-threading/src/lib.rs @@ -50,6 +50,7 @@ //! recovery from panics in either thread execution or individual tasks. mod builder; +mod metrics; mod multiplexing; mod pool; diff --git a/relay-threading/src/metrics.rs b/relay-threading/src/metrics.rs new file mode 100644 index 00000000000..500ca320f4f --- /dev/null +++ b/relay-threading/src/metrics.rs @@ -0,0 +1,18 @@ +use relay_statsd::GaugeMetric; + +/// Gauge metrics emitted by the asynchronous pool. +pub enum AsyncPoolGauges { + /// Number of futures queued up for execution in the asynchronous pool. + AsyncPoolQueueSize, + /// Number of futures being driven in each thread of the asynchronous pool. + AsyncPoolFuturesPerThread, +} + +impl GaugeMetric for AsyncPoolGauges { + fn name(&self) -> &'static str { + match self { + Self::AsyncPoolQueueSize => "async_pool.queue_size", + Self::AsyncPoolFuturesPerThread => "async_pool.futures_per_thread", + } + } +} diff --git a/relay-threading/src/multiplexing.rs b/relay-threading/src/multiplexing.rs index 2083ddd181c..1dcb5a7433a 100644 --- a/relay-threading/src/multiplexing.rs +++ b/relay-threading/src/multiplexing.rs @@ -4,6 +4,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use crate::metrics::AsyncPoolGauges; use crate::PanicHandler; use futures::future::CatchUnwind; use futures::stream::{FusedStream, FuturesUnordered, Stream}; @@ -101,6 +102,8 @@ pin_project! { /// /// This multiplexer is primarily used by the [`AsyncPool`] to manage task execution on worker threads. pub struct Multiplexed<S, F> { + pool_name: Arc<str>, + thread_name: Arc<str>, max_concurrency: usize, #[pin] rx: S, @@ -117,8 +120,16 @@ where /// /// Tasks from the stream will be scheduled for execution concurrently, and an optional panic handler /// can be provided to manage errors during task execution. - pub fn new(max_concurrency: usize, rx: S, panic_handler: Option<Arc<PanicHandler>>) -> Self { + pub fn new( + pool_name: Arc<str>, + thread_name: Arc<str>, + max_concurrency: usize, + rx: S, + panic_handler: Option<Arc<PanicHandler>>, + ) -> Self { Self { + pool_name, + thread_name, max_concurrency, rx, tasks: Tasks::new(panic_handler), @@ -141,6 +152,13 @@ where let mut this = self.project(); loop { + // We report how many tasks are being concurrently polled in this future. + relay_statsd::metric!( + gauge(AsyncPoolGauges::AsyncPoolFuturesPerThread) = this.tasks.len() as u64, + pool_name = &this.pool_name, + thread_name = &this.thread_name + ); + this.tasks.as_mut().poll_tasks_until_pending(cx); // If we can't get anymore tasks, and we don't have anything else to process, we report @@ -195,7 +213,13 @@ mod tests { #[test] fn test_multiplexer_with_no_futures() { let (_, rx) = flume::bounded::<BoxFuture<'static, _>>(10); - futures::executor::block_on(Multiplexed::new(1, rx.into_stream(), None)); + futures::executor::block_on(Multiplexed::new( + "my_pool".into(), + "my_thread".into(), + 1, + rx.into_stream(), + None, + )); } #[test] @@ -218,6 +242,8 @@ mod tests { panic_handler_called_clone.store(true, Ordering::SeqCst); }; futures::executor::block_on(Multiplexed::new( + "my_pool".into(), + "my_thread".into(), 1, rx.into_stream(), Some(Arc::new(panic_handler)), @@ -243,7 +269,13 @@ mod tests { drop(tx); let result = std::panic::catch_unwind(AssertUnwindSafe(|| { - futures::executor::block_on(Multiplexed::new(1, rx.into_stream(), None)) + futures::executor::block_on(Multiplexed::new( + "my_pool".into(), + "my_thread".into(), + 1, + rx.into_stream(), + None, + )) })); // The count is expected to have been incremented and the handler called. @@ -264,7 +296,13 @@ mod tests { drop(tx); - futures::executor::block_on(Multiplexed::new(1, rx.into_stream(), None)); + futures::executor::block_on(Multiplexed::new( + "my_pool".into(), + "my_thread".into(), + 1, + rx.into_stream(), + None, + )); // The count is expected to have been incremented. assert_eq!(count.load(Ordering::SeqCst), 1); @@ -285,7 +323,13 @@ mod tests { drop(tx); - futures::executor::block_on(Multiplexed::new(1, rx.into_stream(), None)); + futures::executor::block_on(Multiplexed::new( + "my_pool".into(), + "my_thread".into(), + 1, + rx.into_stream(), + None, + )); // The order of completion is expected to match the order of submission. assert_eq!(*entries.lock().unwrap(), (0..5).collect::<Vec<_>>()); @@ -304,7 +348,13 @@ mod tests { drop(tx); - futures::executor::block_on(Multiplexed::new(5, rx.into_stream(), None)); + futures::executor::block_on(Multiplexed::new( + "my_pool".into(), + "my_thread".into(), + 5, + rx.into_stream(), + None, + )); // The count is expected to have been incremented. assert_eq!(count.load(Ordering::SeqCst), 1); @@ -325,7 +375,13 @@ mod tests { drop(tx); - futures::executor::block_on(Multiplexed::new(5, rx.into_stream(), None)); + futures::executor::block_on(Multiplexed::new( + "my_pool".into(), + "my_thread".into(), + 5, + rx.into_stream(), + None, + )); // The order of completion is expected to be the same as the order of submission. assert_eq!(*entries.lock().unwrap(), (0..5).collect::<Vec<_>>()); @@ -349,7 +405,13 @@ mod tests { drop(tx); - futures::executor::block_on(Multiplexed::new(5, rx.into_stream(), None)); + futures::executor::block_on(Multiplexed::new( + "my_pool".into(), + "my_thread".into(), + 5, + rx.into_stream(), + None, + )); // The order of completion is expected to be the same as the order of submission. assert_eq!(*entries.lock().unwrap(), (0..3).collect::<Vec<_>>()); @@ -379,7 +441,13 @@ mod tests { drop(tx); - futures::executor::block_on(Multiplexed::new(5, rx.into_stream(), None)); + futures::executor::block_on(Multiplexed::new( + "my_pool".into(), + "my_thread".into(), + 5, + rx.into_stream(), + None, + )); // The order of completion may vary; verify that all expected elements are present. let mut entries = entries.lock().unwrap(); diff --git a/relay-threading/src/pool.rs b/relay-threading/src/pool.rs index a0163b19e10..6d6b332607d 100644 --- a/relay-threading/src/pool.rs +++ b/relay-threading/src/pool.rs @@ -1,21 +1,26 @@ use std::future::Future; use std::io; use std::panic::AssertUnwindSafe; -use std::sync::Arc; +use std::sync::{Arc, LazyLock}; use futures::future::BoxFuture; use futures::FutureExt; use crate::builder::AsyncPoolBuilder; +use crate::metrics::AsyncPoolGauges; use crate::multiplexing::Multiplexed; use crate::PanicHandler; +/// Default name of the pool. +static DEFAULT_POOL_NAME: LazyLock<Arc<str>> = LazyLock::new(|| Arc::from("unnamed")); + /// [`AsyncPool`] is a thread-based executor that runs asynchronous tasks on dedicated worker threads. /// /// The pool collects tasks through a bounded channel and distributes them among threads, each of which runs its own /// Tokio executor. This design enables controlled concurrency and efficient use of system resources. #[derive(Debug)] pub struct AsyncPool<F> { + name: Arc<str>, tx: flume::Sender<F>, } @@ -31,18 +36,23 @@ where where S: ThreadSpawn, { + let pool_name = builder.pool_name.unwrap_or(DEFAULT_POOL_NAME.clone()); let (tx, rx) = flume::bounded(builder.num_threads * 2); - for index in 0..builder.num_threads { + for thread_id in 0..builder.num_threads { let rx = rx.clone(); + let thread_name: Option<Arc<str>> = + builder.thread_name.as_mut().map(|f| f(thread_id).into()); let thread = Thread { - index, + id: thread_id, max_concurrency: builder.max_concurrency, - name: builder.thread_name.as_mut().map(|f| f(index)), + name: thread_name.clone(), runtime: builder.runtime.clone(), panic_handler: builder.thread_panic_handler.clone(), task: Multiplexed::new( + pool_name.clone(), + thread_name.unwrap_or(format!("thread-{}", thread_id).into()), builder.max_concurrency, rx.into_stream(), builder.task_panic_handler.clone(), @@ -53,7 +63,10 @@ where builder.spawn_handler.spawn(thread)?; } - Ok(Self { tx }) + Ok(Self { + name: pool_name.clone(), + tx, + }) } } @@ -74,6 +87,8 @@ where self.tx.send(future).is_ok(), "failed to schedule task: all worker threads have terminated (either none were spawned or all have panicked)" ); + + self.track_queue_size() } /// Asynchronously enqueues a future for execution within the [`AsyncPool`]. @@ -89,14 +104,26 @@ where self.tx.send_async(future).await.is_ok(), "failed to schedule task: all worker threads have terminated (either none were spawned or all have panicked)" ); + + self.track_queue_size() + } + + /// Tracks the amount of elements in the queue containing all futures to be scheduled. + fn track_queue_size(&self) { + // On each poll, we report how many items are in the queue containing futures to dispatch + // across all threads. + relay_statsd::metric!( + gauge(AsyncPoolGauges::AsyncPoolQueueSize) = self.tx.len() as u64, + pool_name = &self.name + ); } } /// [`Thread`] represents a dedicated worker thread within an [`AsyncPool`] that executes scheduled tasks. pub struct Thread { - index: usize, + id: usize, max_concurrency: usize, - name: Option<String>, + name: Option<Arc<str>>, runtime: tokio::runtime::Handle, panic_handler: Option<Arc<PanicHandler>>, task: BoxFuture<'static, ()>, @@ -106,8 +133,8 @@ impl Thread { /// Returns the unique index assigned to this [`Thread`]. /// /// The index can help identify the thread during debugging or logging. - pub fn index(&self) -> usize { - self.index + pub fn id(&self) -> usize { + self.id } /// Returns the maximum number of concurrent tasks permitted on this [`Thread`]. @@ -356,9 +383,9 @@ mod tests { }; Thread { - index: 0, + id: 0, max_concurrency: 1, - name: Some("test-thread".to_owned()), + name: Some("test-thread".into()), runtime: runtime.handle().clone(), panic_handler: Some(Arc::new(panic_handler)), task: async move { From 8729efa8595d68a5307e9dbfb5efca78371480f0 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti <riccardob36@gmail.com> Date: Mon, 24 Feb 2025 17:04:47 +0100 Subject: [PATCH 06/14] Fix --- relay-server/src/utils/thread_pool.rs | 3 ++- relay-threading/src/builder.rs | 6 +++--- relay-threading/src/multiplexing.rs | 26 +++++++++++++------------- relay-threading/src/pool.rs | 19 +++++++++---------- 4 files changed, 27 insertions(+), 27 deletions(-) diff --git a/relay-server/src/utils/thread_pool.rs b/relay-server/src/utils/thread_pool.rs index 4295cef8eb7..57f95f02e79 100644 --- a/relay-server/src/utils/thread_pool.rs +++ b/relay-server/src/utils/thread_pool.rs @@ -66,9 +66,10 @@ impl ThreadPoolBuilder { F: Future<Output = ()> + Send + 'static, { AsyncPoolBuilder::new(self.runtime) + .pool_name(self.name) + .thread_name(move |id| format!("pool-{name}-{id}", name = self.name)) .num_threads(self.num_threads) .max_concurrency(self.max_concurrency) - .thread_name(move |id| format!("pool-{name}-{id}", name = self.name)) // In case of panic in a task sent to the pool, we catch it to continue the remaining // work and just log an error. .task_panic_handler(move |_panic| { diff --git a/relay-threading/src/builder.rs b/relay-threading/src/builder.rs index d4203e6d49d..f6565263f2d 100644 --- a/relay-threading/src/builder.rs +++ b/relay-threading/src/builder.rs @@ -16,7 +16,7 @@ pub(crate) type PanicHandler = dyn Fn(Box<dyn Any + Send>) + Send + Sync; /// and panic handling strategies. pub struct AsyncPoolBuilder<S = DefaultSpawn> { pub(crate) runtime: tokio::runtime::Handle, - pub(crate) pool_name: Option<Arc<str>>, + pub(crate) pool_name: Option<&'static str>, pub(crate) thread_name: Option<Box<dyn FnMut(usize) -> String>>, pub(crate) thread_panic_handler: Option<Arc<PanicHandler>>, pub(crate) task_panic_handler: Option<Arc<PanicHandler>>, @@ -48,8 +48,8 @@ where S: ThreadSpawn, { /// Specifies a custom name for this pool. - pub fn pool_name(mut self, pool_name: String) -> Self { - self.pool_name = Some(pool_name.into()); + pub fn pool_name(mut self, pool_name: &'static str) -> Self { + self.pool_name = Some(pool_name); self } diff --git a/relay-threading/src/multiplexing.rs b/relay-threading/src/multiplexing.rs index 1dcb5a7433a..c80b4d650ed 100644 --- a/relay-threading/src/multiplexing.rs +++ b/relay-threading/src/multiplexing.rs @@ -102,8 +102,8 @@ pin_project! { /// /// This multiplexer is primarily used by the [`AsyncPool`] to manage task execution on worker threads. pub struct Multiplexed<S, F> { - pool_name: Arc<str>, - thread_name: Arc<str>, + pool_name: &'static str, + thread_name: String, max_concurrency: usize, #[pin] rx: S, @@ -121,8 +121,8 @@ where /// Tasks from the stream will be scheduled for execution concurrently, and an optional panic handler /// can be provided to manage errors during task execution. pub fn new( - pool_name: Arc<str>, - thread_name: Arc<str>, + pool_name: &'static str, + thread_name: String, max_concurrency: usize, rx: S, panic_handler: Option<Arc<PanicHandler>>, @@ -214,7 +214,7 @@ mod tests { fn test_multiplexer_with_no_futures() { let (_, rx) = flume::bounded::<BoxFuture<'static, _>>(10); futures::executor::block_on(Multiplexed::new( - "my_pool".into(), + "my_pool", "my_thread".into(), 1, rx.into_stream(), @@ -242,7 +242,7 @@ mod tests { panic_handler_called_clone.store(true, Ordering::SeqCst); }; futures::executor::block_on(Multiplexed::new( - "my_pool".into(), + "my_pool", "my_thread".into(), 1, rx.into_stream(), @@ -270,7 +270,7 @@ mod tests { let result = std::panic::catch_unwind(AssertUnwindSafe(|| { futures::executor::block_on(Multiplexed::new( - "my_pool".into(), + "my_pool", "my_thread".into(), 1, rx.into_stream(), @@ -297,7 +297,7 @@ mod tests { drop(tx); futures::executor::block_on(Multiplexed::new( - "my_pool".into(), + "my_pool", "my_thread".into(), 1, rx.into_stream(), @@ -324,7 +324,7 @@ mod tests { drop(tx); futures::executor::block_on(Multiplexed::new( - "my_pool".into(), + "my_pool", "my_thread".into(), 1, rx.into_stream(), @@ -349,7 +349,7 @@ mod tests { drop(tx); futures::executor::block_on(Multiplexed::new( - "my_pool".into(), + "my_pool", "my_thread".into(), 5, rx.into_stream(), @@ -376,7 +376,7 @@ mod tests { drop(tx); futures::executor::block_on(Multiplexed::new( - "my_pool".into(), + "my_pool", "my_thread".into(), 5, rx.into_stream(), @@ -406,7 +406,7 @@ mod tests { drop(tx); futures::executor::block_on(Multiplexed::new( - "my_pool".into(), + "my_pool", "my_thread".into(), 5, rx.into_stream(), @@ -442,7 +442,7 @@ mod tests { drop(tx); futures::executor::block_on(Multiplexed::new( - "my_pool".into(), + "my_pool", "my_thread".into(), 5, rx.into_stream(), diff --git a/relay-threading/src/pool.rs b/relay-threading/src/pool.rs index 6d6b332607d..e2ae5b61f57 100644 --- a/relay-threading/src/pool.rs +++ b/relay-threading/src/pool.rs @@ -1,7 +1,7 @@ use std::future::Future; use std::io; use std::panic::AssertUnwindSafe; -use std::sync::{Arc, LazyLock}; +use std::sync::Arc; use futures::future::BoxFuture; use futures::FutureExt; @@ -12,7 +12,7 @@ use crate::multiplexing::Multiplexed; use crate::PanicHandler; /// Default name of the pool. -static DEFAULT_POOL_NAME: LazyLock<Arc<str>> = LazyLock::new(|| Arc::from("unnamed")); +static DEFAULT_POOL_NAME: &str = "unnamed"; /// [`AsyncPool`] is a thread-based executor that runs asynchronous tasks on dedicated worker threads. /// @@ -20,7 +20,7 @@ static DEFAULT_POOL_NAME: LazyLock<Arc<str>> = LazyLock::new(|| Arc::from("unnam /// Tokio executor. This design enables controlled concurrency and efficient use of system resources. #[derive(Debug)] pub struct AsyncPool<F> { - name: Arc<str>, + name: &'static str, tx: flume::Sender<F>, } @@ -36,13 +36,12 @@ where where S: ThreadSpawn, { - let pool_name = builder.pool_name.unwrap_or(DEFAULT_POOL_NAME.clone()); + let pool_name = builder.pool_name.unwrap_or(DEFAULT_POOL_NAME); let (tx, rx) = flume::bounded(builder.num_threads * 2); for thread_id in 0..builder.num_threads { let rx = rx.clone(); - let thread_name: Option<Arc<str>> = - builder.thread_name.as_mut().map(|f| f(thread_id).into()); + let thread_name: Option<String> = builder.thread_name.as_mut().map(|f| f(thread_id)); let thread = Thread { id: thread_id, @@ -51,8 +50,8 @@ where runtime: builder.runtime.clone(), panic_handler: builder.thread_panic_handler.clone(), task: Multiplexed::new( - pool_name.clone(), - thread_name.unwrap_or(format!("thread-{}", thread_id).into()), + pool_name, + thread_name.unwrap_or(format!("thread-{}", thread_id)), builder.max_concurrency, rx.into_stream(), builder.task_panic_handler.clone(), @@ -64,7 +63,7 @@ where } Ok(Self { - name: pool_name.clone(), + name: pool_name, tx, }) } @@ -123,7 +122,7 @@ where pub struct Thread { id: usize, max_concurrency: usize, - name: Option<Arc<str>>, + name: Option<String>, runtime: tokio::runtime::Handle, panic_handler: Option<Arc<PanicHandler>>, task: BoxFuture<'static, ()>, From d2b76013695ae05c766d7e0641e08b3aaecfc3e7 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti <riccardob36@gmail.com> Date: Mon, 24 Feb 2025 17:06:17 +0100 Subject: [PATCH 07/14] Fix --- relay-threading/src/pool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-threading/src/pool.rs b/relay-threading/src/pool.rs index e2ae5b61f57..04cd41dc1a3 100644 --- a/relay-threading/src/pool.rs +++ b/relay-threading/src/pool.rs @@ -12,7 +12,7 @@ use crate::multiplexing::Multiplexed; use crate::PanicHandler; /// Default name of the pool. -static DEFAULT_POOL_NAME: &str = "unnamed"; +const DEFAULT_POOL_NAME: &str = "unnamed"; /// [`AsyncPool`] is a thread-based executor that runs asynchronous tasks on dedicated worker threads. /// From 51261dc5a42ba9e1bd5867d71ccab2332a9e255d Mon Sep 17 00:00:00 2001 From: Riccardo Busetti <riccardob36@gmail.com> Date: Tue, 25 Feb 2025 10:16:26 +0100 Subject: [PATCH 08/14] Fix min concurrency --- relay-config/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 50c3944b7ce..6476c16eddd 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -699,7 +699,7 @@ impl Default for Limits { max_replay_uncompressed_size: ByteSize::mebibytes(100), max_replay_message_size: ByteSize::mebibytes(15), max_thread_count: num_cpus::get(), - max_pool_concurrency: 0, + max_pool_concurrency: 1, query_timeout: 30, shutdown_timeout: 10, keepalive_timeout: 5, From 5880f0fc85ad097df7b306c9e639bc4808433d6e Mon Sep 17 00:00:00 2001 From: Riccardo Busetti <riccardob36@gmail.com> Date: Tue, 25 Feb 2025 10:18:25 +0100 Subject: [PATCH 09/14] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a7bd942b6b6..96086ee559d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ - Track an utilization metric for internal services. ([#4501](https://github.com/getsentry/relay/pull/4501)) - Add new `relay-threading` crate with asynchronous thread pool. ([#4500](https://github.com/getsentry/relay/pull/4500)) +- Adopt new `AsyncPool` for the `EnvelopeProcessorService` and `StoreService`. ([#4520](https://github.com/getsentry/relay/pull/4520)) ## 25.2.0 From 28b8666a1cf40dcfe9d25a71049bf0c48a964b10 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti <riccardob36@gmail.com> Date: Tue, 25 Feb 2025 11:24:31 +0100 Subject: [PATCH 10/14] Improve --- relay-threading/src/pool.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/relay-threading/src/pool.rs b/relay-threading/src/pool.rs index 04cd41dc1a3..b07cf4de40d 100644 --- a/relay-threading/src/pool.rs +++ b/relay-threading/src/pool.rs @@ -67,12 +67,7 @@ where tx, }) } -} -impl<F> AsyncPool<F> -where - F: Future<Output = ()>, -{ /// Schedules a future for execution within the [`AsyncPool`]. /// /// The task is added to the pool's internal queue to be executed by an available worker thread. From 0af7ec08c8a7aad6d259e4167a1abc4b6ab7d3cf Mon Sep 17 00:00:00 2001 From: Riccardo Busetti <riccardob36@gmail.com> Date: Tue, 25 Feb 2025 13:16:36 +0100 Subject: [PATCH 11/14] Fix PR comments --- relay-config/src/config.rs | 9 ++++++--- relay-server/src/services/processor.rs | 9 ++------- relay-server/src/utils/thread_pool.rs | 4 +++- relay-threading/src/multiplexing.rs | 17 +++++++++-------- 4 files changed, 20 insertions(+), 19 deletions(-) diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 6476c16eddd..fce846d2f86 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -638,9 +638,12 @@ pub struct Limits { /// The total number of threads spawned will roughly be `2 * max_thread_count`. Defaults to /// the number of logical CPU cores on the host. pub max_thread_count: usize, - /// The maximum number of tasks to execute within each thread of the asynchronous pool. + /// Controls the maximum concurrency of each worker thread. /// - /// Each thread will be able to drive concurrently at most `max_pool_concurrency` futures. + /// Increasing the concurrency, can lead to a better utilization of worker threads by + /// increasing the amount of I/O done concurrently. + // + /// Currently has no effect on defaults to `1`. pub max_pool_concurrency: usize, /// The maximum number of seconds a query is allowed to take across retries. Individual requests /// have lower timeouts. Defaults to 30 seconds. @@ -2354,7 +2357,7 @@ impl Config { self.values.limits.max_thread_count } - /// Returns the number of tasks that can run concurrently. + /// Returns the number of tasks that can run concurrently in the worker pool. pub fn pool_concurrency(&self) -> usize { self.values.limits.max_pool_concurrency } diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 33aae4b7fcc..3dae2b70f40 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -3090,7 +3090,7 @@ impl EnvelopeProcessorService { self.inner.rate_limiter.is_some() } - async fn handle_message(&self, message: EnvelopeProcessor) { + fn handle_message(&self, message: EnvelopeProcessor) { let ty = message.variant(); let feature_weights = self.feature_weights(&message); @@ -3163,12 +3163,7 @@ impl Service for EnvelopeProcessorService { let service = self.clone(); self.inner .pool - .spawn_async( - async move { - service.handle_message(message).await; - } - .boxed(), - ) + .spawn_async(async move { service.handle_message(message) }.boxed()) .await; } } diff --git a/relay-server/src/utils/thread_pool.rs b/relay-server/src/utils/thread_pool.rs index 57f95f02e79..4b05ac00e2e 100644 --- a/relay-server/src/utils/thread_pool.rs +++ b/relay-server/src/utils/thread_pool.rs @@ -76,7 +76,9 @@ impl ThreadPoolBuilder { relay_log::error!( "task in pool {name} panicked, other tasks will continue execution", name = self.name - ) + ); + // We want to propagate the panic to Relay since the failure of a thread is + std::process::exit(1); }) // In case of panic in the thread, log it. After a panic in the thread, it will stop. .thread_panic_handler(move |_panic| { diff --git a/relay-threading/src/multiplexing.rs b/relay-threading/src/multiplexing.rs index c80b4d650ed..2dcf6365b8a 100644 --- a/relay-threading/src/multiplexing.rs +++ b/relay-threading/src/multiplexing.rs @@ -152,13 +152,6 @@ where let mut this = self.project(); loop { - // We report how many tasks are being concurrently polled in this future. - relay_statsd::metric!( - gauge(AsyncPoolGauges::AsyncPoolFuturesPerThread) = this.tasks.len() as u64, - pool_name = &this.pool_name, - thread_name = &this.thread_name - ); - this.tasks.as_mut().poll_tasks_until_pending(cx); // If we can't get anymore tasks, and we don't have anything else to process, we report @@ -176,7 +169,15 @@ where // At this point, we are free to start driving another future. match this.rx.as_mut().poll_next(cx) { - Poll::Ready(Some(task)) => this.tasks.push(task), + Poll::Ready(Some(task)) => { + this.tasks.push(task); + // We report how many tasks are being concurrently polled in this future. + relay_statsd::metric!( + gauge(AsyncPoolGauges::AsyncPoolFuturesPerThread) = this.tasks.len() as u64, + pool_name = &this.pool_name, + thread_name = &this.thread_name + ); + } // The stream is exhausted and there are no remaining tasks. Poll::Ready(None) if this.tasks.is_empty() => return Poll::Ready(()), // The stream is exhausted but tasks remain active. Now we need to make sure we From 9ce102434c3e5ceab0bc555ffad5907630411d81 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti <riccardob36@gmail.com> Date: Tue, 25 Feb 2025 13:22:36 +0100 Subject: [PATCH 12/14] Fix PR comments --- relay-server/src/utils/thread_pool.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/relay-server/src/utils/thread_pool.rs b/relay-server/src/utils/thread_pool.rs index 4b05ac00e2e..f8d48db73e2 100644 --- a/relay-server/src/utils/thread_pool.rs +++ b/relay-server/src/utils/thread_pool.rs @@ -77,12 +77,12 @@ impl ThreadPoolBuilder { "task in pool {name} panicked, other tasks will continue execution", name = self.name ); - // We want to propagate the panic to Relay since the failure of a thread is - std::process::exit(1); }) // In case of panic in the thread, log it. After a panic in the thread, it will stop. - .thread_panic_handler(move |_panic| { - relay_log::error!("thread in pool {name} panicked", name = self.name) + .thread_panic_handler(move |panic| { + relay_log::error!("thread in pool {name} panicked", name = self.name); + // We want to propagate the panic to Relay since the failure of a thread is + std::panic::resume_unwind(panic); }) .spawn_handler(|thread| { let mut b = thread::Builder::new(); From 1079e2f9c26edd50c42fe639bb67501b5f163ee5 Mon Sep 17 00:00:00 2001 From: Riccardo Busetti <riccardob36@gmail.com> Date: Tue, 25 Feb 2025 13:24:31 +0100 Subject: [PATCH 13/14] Fix PR comments --- relay-server/src/utils/thread_pool.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/relay-server/src/utils/thread_pool.rs b/relay-server/src/utils/thread_pool.rs index f8d48db73e2..e5bf8af2199 100644 --- a/relay-server/src/utils/thread_pool.rs +++ b/relay-server/src/utils/thread_pool.rs @@ -81,7 +81,6 @@ impl ThreadPoolBuilder { // In case of panic in the thread, log it. After a panic in the thread, it will stop. .thread_panic_handler(move |panic| { relay_log::error!("thread in pool {name} panicked", name = self.name); - // We want to propagate the panic to Relay since the failure of a thread is std::panic::resume_unwind(panic); }) .spawn_handler(|thread| { From e71187a096e3913fb134a6ae5a4c5c9d4a9a964a Mon Sep 17 00:00:00 2001 From: Riccardo Busetti <riccardob36@gmail.com> Date: Tue, 25 Feb 2025 16:36:00 +0100 Subject: [PATCH 14/14] Fix PR comments --- relay-threading/src/multiplexing.rs | 71 ++++------------------------- relay-threading/src/pool.rs | 1 - 2 files changed, 9 insertions(+), 63 deletions(-) diff --git a/relay-threading/src/multiplexing.rs b/relay-threading/src/multiplexing.rs index 2dcf6365b8a..e68dd051a26 100644 --- a/relay-threading/src/multiplexing.rs +++ b/relay-threading/src/multiplexing.rs @@ -103,7 +103,6 @@ pin_project! { /// This multiplexer is primarily used by the [`AsyncPool`] to manage task execution on worker threads. pub struct Multiplexed<S, F> { pool_name: &'static str, - thread_name: String, max_concurrency: usize, #[pin] rx: S, @@ -122,14 +121,12 @@ where /// can be provided to manage errors during task execution. pub fn new( pool_name: &'static str, - thread_name: String, max_concurrency: usize, rx: S, panic_handler: Option<Arc<PanicHandler>>, ) -> Self { Self { pool_name, - thread_name, max_concurrency, rx, tasks: Tasks::new(panic_handler), @@ -174,8 +171,7 @@ where // We report how many tasks are being concurrently polled in this future. relay_statsd::metric!( gauge(AsyncPoolGauges::AsyncPoolFuturesPerThread) = this.tasks.len() as u64, - pool_name = &this.pool_name, - thread_name = &this.thread_name + pool_name = &this.pool_name ); } // The stream is exhausted and there are no remaining tasks. @@ -214,13 +210,7 @@ mod tests { #[test] fn test_multiplexer_with_no_futures() { let (_, rx) = flume::bounded::<BoxFuture<'static, _>>(10); - futures::executor::block_on(Multiplexed::new( - "my_pool", - "my_thread".into(), - 1, - rx.into_stream(), - None, - )); + futures::executor::block_on(Multiplexed::new("my_pool", 1, rx.into_stream(), None)); } #[test] @@ -244,7 +234,6 @@ mod tests { }; futures::executor::block_on(Multiplexed::new( "my_pool", - "my_thread".into(), 1, rx.into_stream(), Some(Arc::new(panic_handler)), @@ -270,13 +259,7 @@ mod tests { drop(tx); let result = std::panic::catch_unwind(AssertUnwindSafe(|| { - futures::executor::block_on(Multiplexed::new( - "my_pool", - "my_thread".into(), - 1, - rx.into_stream(), - None, - )) + futures::executor::block_on(Multiplexed::new("my_pool", 1, rx.into_stream(), None)) })); // The count is expected to have been incremented and the handler called. @@ -297,13 +280,7 @@ mod tests { drop(tx); - futures::executor::block_on(Multiplexed::new( - "my_pool", - "my_thread".into(), - 1, - rx.into_stream(), - None, - )); + futures::executor::block_on(Multiplexed::new("my_pool", 1, rx.into_stream(), None)); // The count is expected to have been incremented. assert_eq!(count.load(Ordering::SeqCst), 1); @@ -324,13 +301,7 @@ mod tests { drop(tx); - futures::executor::block_on(Multiplexed::new( - "my_pool", - "my_thread".into(), - 1, - rx.into_stream(), - None, - )); + futures::executor::block_on(Multiplexed::new("my_pool", 1, rx.into_stream(), None)); // The order of completion is expected to match the order of submission. assert_eq!(*entries.lock().unwrap(), (0..5).collect::<Vec<_>>()); @@ -349,13 +320,7 @@ mod tests { drop(tx); - futures::executor::block_on(Multiplexed::new( - "my_pool", - "my_thread".into(), - 5, - rx.into_stream(), - None, - )); + futures::executor::block_on(Multiplexed::new("my_pool", 5, rx.into_stream(), None)); // The count is expected to have been incremented. assert_eq!(count.load(Ordering::SeqCst), 1); @@ -376,13 +341,7 @@ mod tests { drop(tx); - futures::executor::block_on(Multiplexed::new( - "my_pool", - "my_thread".into(), - 5, - rx.into_stream(), - None, - )); + futures::executor::block_on(Multiplexed::new("my_pool", 5, rx.into_stream(), None)); // The order of completion is expected to be the same as the order of submission. assert_eq!(*entries.lock().unwrap(), (0..5).collect::<Vec<_>>()); @@ -406,13 +365,7 @@ mod tests { drop(tx); - futures::executor::block_on(Multiplexed::new( - "my_pool", - "my_thread".into(), - 5, - rx.into_stream(), - None, - )); + futures::executor::block_on(Multiplexed::new("my_pool", 5, rx.into_stream(), None)); // The order of completion is expected to be the same as the order of submission. assert_eq!(*entries.lock().unwrap(), (0..3).collect::<Vec<_>>()); @@ -442,13 +395,7 @@ mod tests { drop(tx); - futures::executor::block_on(Multiplexed::new( - "my_pool", - "my_thread".into(), - 5, - rx.into_stream(), - None, - )); + futures::executor::block_on(Multiplexed::new("my_pool", 5, rx.into_stream(), None)); // The order of completion may vary; verify that all expected elements are present. let mut entries = entries.lock().unwrap(); diff --git a/relay-threading/src/pool.rs b/relay-threading/src/pool.rs index b07cf4de40d..c72288679b3 100644 --- a/relay-threading/src/pool.rs +++ b/relay-threading/src/pool.rs @@ -51,7 +51,6 @@ where panic_handler: builder.thread_panic_handler.clone(), task: Multiplexed::new( pool_name, - thread_name.unwrap_or(format!("thread-{}", thread_id)), builder.max_concurrency, rx.into_stream(), builder.task_panic_handler.clone(),