From 4abe34159fc91e91ed4de0423b3783a570fe275c Mon Sep 17 00:00:00 2001
From: Riccardo Busetti <riccardob36@gmail.com>
Date: Mon, 24 Feb 2025 09:41:56 +0100
Subject: [PATCH 01/14] feat(pool): Use new AsyncPool for the processor and
 store services

---
 Cargo.lock                             |   1 +
 relay-config/src/config.rs             |  14 +-
 relay-server/Cargo.toml                |   1 +
 relay-server/src/service.rs            |  19 +-
 relay-server/src/services/processor.rs |  26 +-
 relay-server/src/services/store.rs     |  21 +-
 relay-server/src/testutils.rs          |   9 +-
 relay-server/src/utils/thread_pool.rs  | 345 +++++++++++--------------
 8 files changed, 211 insertions(+), 225 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 6014c10c408..4974c2d5fd0 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4007,6 +4007,7 @@ dependencies = [
  "relay-statsd",
  "relay-system",
  "relay-test",
+ "relay-threading",
  "reqwest",
  "rmp-serde",
  "semver",
diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs
index 8561a297bf4..fcfeb6a9fa2 100644
--- a/relay-config/src/config.rs
+++ b/relay-config/src/config.rs
@@ -638,6 +638,11 @@ pub struct Limits {
     /// The total number of threads spawned will roughly be `2 * max_thread_count`. Defaults to
     /// the number of logical CPU cores on the host.
     pub max_thread_count: usize,
+    /// The maximum number of concurrent tasks to be run for each asynchronous thread pool.
+    ///
+    /// For each thread of the asynchronous pool, up to `max_concurrency_per_pool` futures
+    /// (aka tasks) will be polled simultaneously.
+    pub max_concurrency_per_pool: usize,
     /// The maximum number of seconds a query is allowed to take across retries. Individual requests
     /// have lower timeouts. Defaults to 30 seconds.
     pub query_timeout: u64,
@@ -646,7 +651,7 @@ pub struct Limits {
     pub shutdown_timeout: u64,
     /// Server keep-alive timeout in seconds.
     ///
-    /// By default keep-alive is set to a 5 seconds.
+    /// By default, keep-alive is set to 5 seconds.
     pub keepalive_timeout: u64,
     /// Server idle timeout in seconds.
     ///
@@ -695,6 +700,7 @@ impl Default for Limits {
             max_replay_uncompressed_size: ByteSize::mebibytes(100),
             max_replay_message_size: ByteSize::mebibytes(15),
             max_thread_count: num_cpus::get(),
+            max_concurrency_per_pool: 1,
             query_timeout: 30,
             shutdown_timeout: 10,
             keepalive_timeout: 5,
@@ -2349,6 +2355,12 @@ impl Config {
         self.values.limits.max_thread_count
     }
 
+    /// Returns the number of tasks that will be polled simultaneously by each thread of the
+    /// asynchronous pool.
+    pub fn pool_tasks_concurrency(&self) -> usize {
+        self.values.limits.max_concurrency_per_pool
+    }
+
     /// Returns the maximum size of a project config query.
     pub fn query_batch_size(&self) -> usize {
         self.values.cache.batch_size
diff --git a/relay-server/Cargo.toml b/relay-server/Cargo.toml
index 5e2cbe06253..4252c940e44 100644
--- a/relay-server/Cargo.toml
+++ b/relay-server/Cargo.toml
@@ -88,6 +88,7 @@ relay-sampling = { workspace = true }
 relay-spans = { workspace = true }
 relay-statsd = { workspace = true }
 relay-system = { workspace = true }
+relay-threading = { workspace = true }
 reqwest = { workspace = true, features = [
   "gzip",
   "hickory-dns",
diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs
index a621a0d4d11..095ea83006f 100644
--- a/relay-server/src/service.rs
+++ b/relay-server/src/service.rs
@@ -13,13 +13,15 @@ use crate::services::health_check::{HealthCheck, HealthCheckService};
 use crate::services::metrics::RouterService;
 use crate::services::outcome::{OutcomeProducer, OutcomeProducerService, TrackOutcome};
 use crate::services::outcome_aggregator::OutcomeAggregator;
-use crate::services::processor::{self, EnvelopeProcessor, EnvelopeProcessorService};
+use crate::services::processor::{
+    self, EnvelopeProcessor, EnvelopeProcessorService, EnvelopeProcessorServicePool,
+};
 use crate::services::projects::cache::{ProjectCacheHandle, ProjectCacheService};
 use crate::services::projects::source::ProjectSource;
 use crate::services::relays::{RelayCache, RelayCacheService};
 use crate::services::stats::RelayStats;
 #[cfg(feature = "processing")]
-use crate::services::store::StoreService;
+use crate::services::store::{StoreService, StoreServicePool};
 use crate::services::test_store::{TestStore, TestStoreService};
 use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
 use crate::utils::{MemoryChecker, MemoryStat, ThreadKind};
@@ -28,7 +30,6 @@ use anyhow::Context;
 use anyhow::Result;
 use axum::extract::FromRequestParts;
 use axum::http::request::Parts;
-use rayon::ThreadPool;
 use relay_cogs::Cogs;
 use relay_config::Config;
 #[cfg(feature = "processing")]
@@ -97,7 +98,7 @@ pub fn create_runtime(name: &'static str, threads: usize) -> relay_system::Runti
         .build()
 }
 
-fn create_processor_pool(config: &Config) -> Result<ThreadPool> {
+fn create_processor_pool(config: &Config) -> Result<EnvelopeProcessorServicePool> {
     // Adjust thread count for small cpu counts to not have too many idle cores
     // and distribute workload better.
     let thread_count = match config.cpu_concurrency() {
@@ -107,17 +108,17 @@ fn create_processor_pool(config: &Config) -> Result<ThreadPool> {
     };
     relay_log::info!("starting {thread_count} envelope processing workers");
 
-    let pool = crate::utils::ThreadPoolBuilder::new("processor")
+    let pool = crate::utils::ThreadPoolBuilder::new("processor", tokio::runtime::Handle::current())
         .num_threads(thread_count)
+        .max_concurrency(config.pool_tasks_concurrency())
         .thread_kind(ThreadKind::Worker)
-        .runtime(tokio::runtime::Handle::current())
         .build()?;
 
     Ok(pool)
 }
 
 #[cfg(feature = "processing")]
-fn create_store_pool(config: &Config) -> Result<ThreadPool> {
+fn create_store_pool(config: &Config) -> Result<StoreServicePool> {
     // Spawn a store worker for every 12 threads in the processor pool.
     // This ratio was found empirically and may need adjustments in the future.
     //
@@ -126,9 +127,9 @@ fn create_store_pool(config: &Config) -> Result<ThreadPool> {
     let thread_count = config.cpu_concurrency().div_ceil(12);
     relay_log::info!("starting {thread_count} store workers");
 
-    let pool = crate::utils::ThreadPoolBuilder::new("store")
+    let pool = crate::utils::ThreadPoolBuilder::new("store", tokio::runtime::Handle::current())
         .num_threads(thread_count)
-        .runtime(tokio::runtime::Handle::current())
+        .max_concurrency(config.pool_tasks_concurrency())
         .build()?;
 
     Ok(pool)
diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs
index cf1de45982d..33aae4b7fcc 100644
--- a/relay-server/src/services/processor.rs
+++ b/relay-server/src/services/processor.rs
@@ -14,6 +14,8 @@ use bytes::Bytes;
 use chrono::{DateTime, Utc};
 use flate2::write::{GzEncoder, ZlibEncoder};
 use flate2::Compression;
+use futures::future::BoxFuture;
+use futures::FutureExt;
 use relay_base_schema::project::{ProjectId, ProjectKey};
 use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
 use relay_common::time::UnixTimestamp;
@@ -60,10 +62,10 @@ use crate::services::upstream::{
 };
 use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};
 use crate::utils::{
-    self, InvalidProcessingGroupType, ManagedEnvelope, SamplingResult, ThreadPool, TypedEnvelope,
-    WorkerGroup,
+    self, InvalidProcessingGroupType, ManagedEnvelope, SamplingResult, TypedEnvelope,
 };
 use relay_base_schema::organization::OrganizationId;
+use relay_threading::AsyncPool;
 #[cfg(feature = "processing")]
 use {
     crate::services::store::{Store, StoreEnvelope},
@@ -1078,6 +1080,9 @@ impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
     }
 }
 
+/// The asynchronous thread pool used for scheduling processing tasks in the processor.
+pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;
+
 /// Service implementing the [`EnvelopeProcessor`] interface.
 ///
 /// This service handles messages in a worker pool with configurable concurrency.
@@ -1110,7 +1115,7 @@ impl Default for Addrs {
 }
 
 struct InnerProcessor {
-    workers: WorkerGroup,
+    pool: EnvelopeProcessorServicePool,
     config: Arc<Config>,
     global_config: GlobalConfigHandle,
     project_cache: ProjectCacheHandle,
@@ -1130,7 +1135,7 @@ impl EnvelopeProcessorService {
     /// Creates a multi-threaded envelope processor.
     #[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
     pub fn new(
-        pool: ThreadPool,
+        pool: EnvelopeProcessorServicePool,
         config: Arc<Config>,
         global_config: GlobalConfigHandle,
         project_cache: ProjectCacheHandle,
@@ -1160,7 +1165,7 @@ impl EnvelopeProcessorService {
         };
 
         let inner = InnerProcessor {
-            workers: WorkerGroup::new(pool),
+            pool,
             global_config,
             project_cache,
             cogs,
@@ -3085,7 +3090,7 @@ impl EnvelopeProcessorService {
         self.inner.rate_limiter.is_some()
     }
 
-    fn handle_message(&self, message: EnvelopeProcessor) {
+    async fn handle_message(&self, message: EnvelopeProcessor) {
         let ty = message.variant();
         let feature_weights = self.feature_weights(&message);
 
@@ -3157,8 +3162,13 @@ impl Service for EnvelopeProcessorService {
         while let Some(message) = rx.recv().await {
             let service = self.clone();
             self.inner
-                .workers
-                .spawn(move || service.handle_message(message))
+                .pool
+                .spawn_async(
+                    async move {
+                        service.handle_message(message).await;
+                    }
+                    .boxed(),
+                )
                 .await;
         }
     }
diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs
index b9256c38b9c..61faa2e9c23 100644
--- a/relay-server/src/services/store.rs
+++ b/relay-server/src/services/store.rs
@@ -10,12 +10,15 @@ use std::sync::Arc;
 
 use bytes::Bytes;
 use chrono::{DateTime, Utc};
+use futures::future::BoxFuture;
+use futures::FutureExt;
 use relay_base_schema::data_category::DataCategory;
 use relay_base_schema::project::ProjectId;
 use relay_common::time::UnixTimestamp;
 use relay_config::Config;
 use relay_event_schema::protocol::{EventId, VALID_PLATFORMS};
 
+use crate::envelope::{AttachmentType, Envelope, Item, ItemType};
 use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message};
 use relay_metrics::{
     Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, FiniteF64, GaugeValue,
@@ -24,13 +27,12 @@ use relay_metrics::{
 use relay_quotas::Scoping;
 use relay_statsd::metric;
 use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
+use relay_threading::AsyncPool;
 use serde::{Deserialize, Serialize};
 use serde_json::value::RawValue;
 use serde_json::Deserializer;
 use uuid::Uuid;
 
-use crate::envelope::{AttachmentType, Envelope, Item, ItemType};
-
 use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
 use crate::service::ServiceError;
 use crate::services::global_config::GlobalConfigHandle;
@@ -91,6 +93,9 @@ pub struct StoreMetrics {
     pub retention: u16,
 }
 
+/// The asynchronous thread pool used for scheduling storing tasks in the envelope store.
+pub type StoreServicePool = AsyncPool<BoxFuture<'static, ()>>;
+
 /// Service interface for the [`StoreEnvelope`] message.
 #[derive(Debug)]
 pub enum Store {
@@ -128,7 +133,7 @@ impl FromMessage<StoreMetrics> for Store {
 
 /// Service implementing the [`Store`] interface.
 pub struct StoreService {
-    workers: WorkerGroup,
+    pool: StoreServicePool,
     config: Arc<Config>,
     global_config: GlobalConfigHandle,
     outcome_aggregator: Addr<TrackOutcome>,
@@ -138,7 +143,7 @@ pub struct StoreService {
 
 impl StoreService {
     pub fn create(
-        pool: ThreadPool,
+        pool: StoreServicePool,
         config: Arc<Config>,
         global_config: GlobalConfigHandle,
         outcome_aggregator: Addr<TrackOutcome>,
@@ -146,7 +151,7 @@ impl StoreService {
     ) -> anyhow::Result<Self> {
         let producer = Producer::create(&config)?;
         Ok(Self {
-            workers: WorkerGroup::new(pool),
+            pool,
             config,
             global_config,
             outcome_aggregator,
@@ -1049,8 +1054,10 @@ impl Service for StoreService {
 
         while let Some(message) = rx.recv().await {
             let service = Arc::clone(&this);
-            this.workers
-                .spawn(move || service.handle_message(message))
+            // For now, we run each task synchronously, in the future we might explore how to make
+            // the store async.
+            this.pool
+                .spawn_async(async move { service.handle_message(message) }.boxed())
                 .await;
         }
 
diff --git a/relay-server/src/testutils.rs b/relay-server/src/testutils.rs
index a4921378089..6dd616fcfd6 100644
--- a/relay-server/src/testutils.rs
+++ b/relay-server/src/testutils.rs
@@ -18,11 +18,11 @@ use crate::metrics::{MetricOutcomes, MetricStats};
 use crate::service::create_redis_pools;
 use crate::services::global_config::GlobalConfigHandle;
 use crate::services::outcome::TrackOutcome;
-use crate::services::processor::{self, EnvelopeProcessorService};
+use crate::services::processor::{self, EnvelopeProcessorService, EnvelopeProcessorServicePool};
 use crate::services::projects::cache::ProjectCacheHandle;
 use crate::services::projects::project::ProjectInfo;
 use crate::services::test_store::TestStore;
-use crate::utils::{ThreadPool, ThreadPoolBuilder};
+use crate::utils::ThreadPoolBuilder;
 
 pub fn state_with_rule_and_condition(
     sample_rate: Option<f64>,
@@ -174,10 +174,9 @@ pub fn processor_services() -> (Addr<TrackOutcome>, Addr<TestStore>) {
     (outcome_aggregator, test_store)
 }
 
-fn create_processor_pool() -> ThreadPool {
-    ThreadPoolBuilder::new("processor")
+fn create_processor_pool() -> EnvelopeProcessorServicePool {
+    ThreadPoolBuilder::new("processor", tokio::runtime::Handle::current())
         .num_threads(1)
-        .runtime(tokio::runtime::Handle::current())
         .build()
         .unwrap()
 }
diff --git a/relay-server/src/utils/thread_pool.rs b/relay-server/src/utils/thread_pool.rs
index e6b6650318f..1f128743bac 100644
--- a/relay-server/src/utils/thread_pool.rs
+++ b/relay-server/src/utils/thread_pool.rs
@@ -1,9 +1,9 @@
-use std::sync::Arc;
-use std::thread;
+use std::future::Future;
+use std::{io, thread};
+
 use tokio::runtime::Handle;
 
-pub use rayon::{ThreadPool, ThreadPoolBuildError};
-use tokio::sync::Semaphore;
+use relay_threading::{AsyncPool, AsyncPoolBuilder};
 
 /// A thread kind.
 ///
@@ -17,126 +17,87 @@ pub enum ThreadKind {
     Worker,
 }
 
-/// Used to create a new [`ThreadPool`] thread pool.
+/// Used to create a new [`AsyncPool`] thread pool.
 pub struct ThreadPoolBuilder {
     name: &'static str,
-    runtime: Option<Handle>,
+    runtime: Handle,
     num_threads: usize,
+    max_concurrency: usize,
     kind: ThreadKind,
 }
 
 impl ThreadPoolBuilder {
     /// Creates a new named thread pool builder.
-    pub fn new(name: &'static str) -> Self {
+    pub fn new(name: &'static str, runtime: Handle) -> Self {
         Self {
             name,
-            runtime: None,
+            runtime,
             num_threads: 0,
+            max_concurrency: 1,
             kind: ThreadKind::Default,
         }
     }
 
     /// Sets the number of threads to be used in the rayon thread-pool.
     ///
-    /// See also [`rayon::ThreadPoolBuilder::num_threads`].
+    /// See also [`AsyncPoolBuilder::num_threads`].
     pub fn num_threads(mut self, num_threads: usize) -> Self {
         self.num_threads = num_threads;
         self
     }
 
-    /// Configures the [`ThreadKind`] for all threads spawned in the pool.
-    pub fn thread_kind(mut self, kind: ThreadKind) -> Self {
-        self.kind = kind;
+    /// Sets the maximum number of concurrent tasks per thread.
+    ///
+    /// See also [`AsyncPoolBuilder::max_concurrency`].
+    pub fn max_concurrency(mut self, max_concurrency: usize) -> Self {
+        self.max_concurrency = max_concurrency;
         self
     }
 
-    /// Sets the Tokio runtime which will be made available in the workers.
-    pub fn runtime(mut self, runtime: Handle) -> Self {
-        self.runtime = Some(runtime);
+    /// Configures the [`ThreadKind`] for all threads spawned in the pool.
+    pub fn thread_kind(mut self, kind: ThreadKind) -> Self {
+        self.kind = kind;
         self
     }
 
     /// Creates and returns the thread pool.
-    pub fn build(self) -> Result<ThreadPool, ThreadPoolBuildError> {
-        rayon::ThreadPoolBuilder::new()
+    pub fn build<F>(self) -> Result<AsyncPool<F>, io::Error>
+    where
+        F: Future<Output = ()> + Send + 'static,
+    {
+        AsyncPoolBuilder::new(self.runtime)
             .num_threads(self.num_threads)
+            .max_concurrency(self.max_concurrency)
             .thread_name(move |id| format!("pool-{name}-{id}", name = self.name))
-            // In case of panic, log that there was a panic but keep the thread alive and don't
-            // exist.
-            .panic_handler(move |_panic| {
-                relay_log::error!("thread in pool {name} paniced!", name = self.name)
+            // In case of panic in a task sent to the pool, we catch it to continue the remaining
+            // work and just log an error.
+            .task_panic_handler(move |_panic| {
+                relay_log::error!(
+                    "task in pool {name} panicked, other tasks will continue execution",
+                    name = self.name
+                )
+            })
+            // In case of panic in the thread, log it. After a panic in the thread, it will stop.
+            .thread_panic_handler(move |_panic| {
+                relay_log::error!("thread in pool {name} panicked", name = self.name)
             })
             .spawn_handler(|thread| {
                 let mut b = thread::Builder::new();
+
                 if let Some(name) = thread.name() {
                     b = b.name(name.to_owned());
                 }
-                if let Some(stack_size) = thread.stack_size() {
-                    b = b.stack_size(stack_size);
-                }
-                let runtime = self.runtime.clone();
                 b.spawn(move || {
                     set_current_thread_priority(self.kind);
-                    let _guard = runtime.as_ref().map(|runtime| runtime.enter());
                     thread.run()
                 })?;
+
                 Ok(())
             })
             .build()
     }
 }
 
-/// A [`WorkerGroup`] adds an async back-pressure mechanism to a [`ThreadPool`].
-pub struct WorkerGroup {
-    pool: ThreadPool,
-    semaphore: Arc<Semaphore>,
-}
-
-impl WorkerGroup {
-    /// Creates a new worker group from a thread pool.
-    pub fn new(pool: ThreadPool) -> Self {
-        // Use `current_num_threads() * 2` to guarantee all threads immediately have a new item to work on.
-        let semaphore = Arc::new(Semaphore::new(pool.current_num_threads() * 2));
-        Self { pool, semaphore }
-    }
-
-    /// Spawns an asynchronous task on the thread pool.
-    ///
-    /// If the thread pool is saturated the returned future is pending until
-    /// the thread pool has capacity to work on the task.
-    ///
-    /// # Examples:
-    ///
-    /// ```ignore
-    /// # async fn test(mut messages: tokio::sync::mpsc::Receiver<()>) {
-    /// # use relay_server::utils::{WorkerGroup, ThreadPoolBuilder};
-    /// # use std::thread;
-    /// # use std::time::Duration;
-    /// # let pool = ThreadPoolBuilder::new("test").num_threads(1).build().unwrap();
-    /// let workers = WorkerGroup::new(pool);
-    ///
-    /// while let Some(message) = messages.recv().await {
-    ///     workers.spawn(move || {
-    ///         thread::sleep(Duration::from_secs(1));
-    ///         println!("worked on message {message:?}")
-    ///     }).await;
-    /// }
-    /// # }
-    /// ```
-    pub async fn spawn(&self, op: impl FnOnce() + Send + 'static) {
-        let semaphore = Arc::clone(&self.semaphore);
-        let permit = semaphore
-            .acquire_owned()
-            .await
-            .expect("the semaphore is never closed");
-
-        self.pool.spawn(move || {
-            op();
-            drop(permit);
-        });
-    }
-}
-
 #[cfg(unix)]
 fn set_current_thread_priority(kind: ThreadKind) {
     // Lower priorities cause more favorable scheduling.
@@ -173,123 +134,117 @@ fn set_current_thread_priority(_kind: ThreadKind) {
 
 #[cfg(test)]
 mod tests {
-    use std::sync::Barrier;
-    use std::time::Duration;
-
-    use futures::FutureExt;
-
-    use super::*;
-
-    #[test]
-    fn test_thread_pool_num_threads() {
-        let pool = ThreadPoolBuilder::new("s").num_threads(3).build().unwrap();
-        assert_eq!(pool.current_num_threads(), 3);
-    }
-
-    #[test]
-    fn test_thread_pool_runtime() {
-        let rt = tokio::runtime::Runtime::new().unwrap();
-
-        let pool = ThreadPoolBuilder::new("s")
-            .num_threads(1)
-            .runtime(rt.handle().clone())
-            .build()
-            .unwrap();
-
-        let has_runtime = pool.install(|| tokio::runtime::Handle::try_current().is_ok());
-        assert!(has_runtime);
-    }
-
-    #[test]
-    fn test_thread_pool_no_runtime() {
-        let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap();
-
-        let has_runtime = pool.install(|| tokio::runtime::Handle::try_current().is_ok());
-        assert!(!has_runtime);
-    }
-
-    #[test]
-    fn test_thread_pool_panic() {
-        let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap();
-        let barrier = Arc::new(Barrier::new(2));
 
-        pool.spawn({
-            let barrier = Arc::clone(&barrier);
-            move || {
-                barrier.wait();
-                panic!();
-            }
-        });
-        barrier.wait();
-
-        pool.spawn({
-            let barrier = Arc::clone(&barrier);
-            move || {
-                barrier.wait();
-            }
-        });
-        barrier.wait();
-    }
-
-    #[test]
-    #[cfg(unix)]
-    fn test_thread_pool_priority() {
-        fn get_current_priority() -> i32 {
-            unsafe { libc::getpriority(libc::PRIO_PROCESS, 0) }
-        }
-
-        let default_prio = get_current_priority();
-
-        {
-            let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap();
-            let prio = pool.install(get_current_priority);
-            // Default pool priority must match current priority.
-            assert_eq!(prio, default_prio);
-        }
-
-        {
-            let pool = ThreadPoolBuilder::new("s")
-                .num_threads(1)
-                .thread_kind(ThreadKind::Worker)
-                .build()
-                .unwrap();
-            let prio = pool.install(get_current_priority);
-            // Worker must be higher than the default priority (higher number = lower priority).
-            assert!(prio > default_prio);
-        }
-    }
-
-    #[test]
-    fn test_worker_group_backpressure() {
-        let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap();
-        let workers = WorkerGroup::new(pool);
-
-        // Num Threads * 2 is the limit after backpressure kicks in
-        let barrier = Arc::new(Barrier::new(2));
-
-        let spawn = || {
-            let barrier = Arc::clone(&barrier);
-            workers
-                .spawn(move || {
-                    barrier.wait();
-                })
-                .now_or_never()
-                .is_some()
-        };
-
-        for _ in 0..15 {
-            // Pool should accept two immediately.
-            assert!(spawn());
-            assert!(spawn());
-            // Pool should reject because there are already 2 tasks active.
-            assert!(!spawn());
-
-            // Unblock the barrier
-            barrier.wait(); // first spawn
-            barrier.wait(); // second spawn
-
-            // wait a tiny bit to make sure the semaphore handle is dropped
-            thread::sleep(Duration::from_millis(50));
-        }
-    }
+    // #[test]
+    // fn test_thread_pool_num_threads() {
+    //     let pool = ThreadPoolBuilder::new("s").num_threads(3).build().unwrap();
+    //     assert_eq!(pool.current_num_threads(), 3);
+    // }
+    //
+    // #[test]
+    // fn test_thread_pool_runtime() {
+    //     let rt = tokio::runtime::Runtime::new().unwrap();
+    //
+    //     let pool = ThreadPoolBuilder::new("s")
+    //         .num_threads(1)
+    //         .runtime(rt.handle().clone())
+    //         .build()
+    //         .unwrap();
+    //
+    //     let has_runtime = pool.install(|| tokio::runtime::Handle::try_current().is_ok());
+    //     assert!(has_runtime);
+    // }
+    //
+    // #[test]
+    // fn test_thread_pool_no_runtime() {
+    //     let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap();
+    //
+    //     let has_runtime = pool.install(|| tokio::runtime::Handle::try_current().is_ok());
+    //     assert!(!has_runtime);
+    // }
+    //
+    // #[test]
+    // fn test_thread_pool_panic() {
+    //     let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap();
+    //     let barrier = Arc::new(Barrier::new(2));
+    //
+    //     pool.spawn({
+    //         let barrier = Arc::clone(&barrier);
+    //         move || {
+    //             barrier.wait();
+    //             panic!();
+    //         }
+    //     });
+    //     barrier.wait();
+    //
+    //     pool.spawn({
+    //         let barrier = Arc::clone(&barrier);
+    //         move || {
+    //             barrier.wait();
+    //         }
+    //     });
+    //     barrier.wait();
+    // }
+    //
+    // #[test]
+    // #[cfg(unix)]
+    // fn test_thread_pool_priority() {
+    //     fn get_current_priority() -> i32 {
+    //         unsafe { libc::getpriority(libc::PRIO_PROCESS, 0) }
+    //     }
+    //
+    //     let default_prio = get_current_priority();
+    //
+    //     {
+    //         let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap();
+    //         let prio = pool.install(get_current_priority);
+    //         // Default pool priority must match current priority.
+    //         assert_eq!(prio, default_prio);
+    //     }
+    //
+    //     {
+    //         let pool = ThreadPoolBuilder::new("s")
+    //             .num_threads(1)
+    //             .thread_kind(ThreadKind::Worker)
+    //             .build()
+    //             .unwrap();
+    //         let prio = pool.install(get_current_priority);
+    //         // Worker must be higher than the default priority (higher number = lower priority).
+    //         assert!(prio > default_prio);
+    //     }
+    // }
+    //
+    // #[test]
+    // fn test_worker_group_backpressure() {
+    //     let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap();
+    //     let workers = WorkerGroup::new(pool);
+    //
+    //     // Num Threads * 2 is the limit after backpressure kicks in
+    //     let barrier = Arc::new(Barrier::new(2));
+    //
+    //     let spawn = || {
+    //         let barrier = Arc::clone(&barrier);
+    //         workers
+    //             .spawn(move || {
+    //                 barrier.wait();
+    //             })
+    //             .now_or_never()
+    //             .is_some()
+    //     };
+    //
+    //     for _ in 0..15 {
+    //         // Pool should accept two immediately.
+    //         assert!(spawn());
+    //         assert!(spawn());
+    //         // Pool should reject because there are already 2 tasks active.
+    //         assert!(!spawn());
+    //
+    //         // Unblock the barrier
+    //         barrier.wait(); // first spawn
+    //         barrier.wait(); // second spawn
+    //
+    //         // wait a tiny bit to make sure the semaphore handle is dropped
+    //         thread::sleep(Duration::from_millis(50));
+    //     }
+    // }
 }

From e845b369576e874c6e77d8a6eb5f4296d87a5484 Mon Sep 17 00:00:00 2001
From: Riccardo Busetti <riccardob36@gmail.com>
Date: Mon, 24 Feb 2025 09:43:43 +0100
Subject: [PATCH 02/14] Fix

---
 relay-server/src/services/store.rs | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs
index 61faa2e9c23..41994745336 100644
--- a/relay-server/src/services/store.rs
+++ b/relay-server/src/services/store.rs
@@ -39,7 +39,7 @@ use crate::services::global_config::GlobalConfigHandle;
 use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
 use crate::services::processor::Processed;
 use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
-use crate::utils::{FormDataIter, ThreadPool, TypedEnvelope, WorkerGroup};
+use crate::utils::{FormDataIter, TypedEnvelope};
 
 /// Fallback name used for attachment items without a `filename` header.
 const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";

From ce32fa1c45769c67ca900ecdc8bbb403efc4eafe Mon Sep 17 00:00:00 2001
From: Riccardo Busetti <riccardob36@gmail.com>
Date: Mon, 24 Feb 2025 10:56:30 +0100
Subject: [PATCH 03/14] Fix tests

---
 relay-config/src/config.rs            |  12 --
 relay-server/src/service.rs           |   6 +-
 relay-server/src/utils/thread_pool.rs | 238 +++++++++++++-------------
 3 files changed, 121 insertions(+), 135 deletions(-)

diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs
index fcfeb6a9fa2..9aa310b15a7 100644
--- a/relay-config/src/config.rs
+++ b/relay-config/src/config.rs
@@ -638,11 +638,6 @@ pub struct Limits {
     /// The total number of threads spawned will roughly be `2 * max_thread_count`. Defaults to
     /// the number of logical CPU cores on the host.
     pub max_thread_count: usize,
-    /// The maximum number of concurrent tasks to be run for each asynchronous thread pool.
-    ///
-    /// For each thread of the asynchronous pool, up to `max_concurrency_per_pool` futures
-    /// (aka tasks) will be polled simultaneously.
-    pub max_concurrency_per_pool: usize,
     /// The maximum number of seconds a query is allowed to take across retries. Individual requests
     /// have lower timeouts. Defaults to 30 seconds.
     pub query_timeout: u64,
@@ -700,7 +695,6 @@ impl Default for Limits {
             max_replay_uncompressed_size: ByteSize::mebibytes(100),
             max_replay_message_size: ByteSize::mebibytes(15),
             max_thread_count: num_cpus::get(),
-            max_concurrency_per_pool: 1,
             query_timeout: 30,
             shutdown_timeout: 10,
             keepalive_timeout: 5,
@@ -2355,12 +2349,6 @@ impl Config {
         self.values.limits.max_thread_count
     }
 
-    /// Returns the number of tasks that will be polled simultaneously by each thread of the
-    /// asynchronous pool.
-    pub fn pool_tasks_concurrency(&self) -> usize {
-        self.values.limits.max_concurrency_per_pool
-    }
-
     /// Returns the maximum size of a project config query.
     pub fn query_batch_size(&self) -> usize {
         self.values.cache.batch_size
diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs
index 095ea83006f..7141341d889 100644
--- a/relay-server/src/service.rs
+++ b/relay-server/src/service.rs
@@ -24,7 +24,7 @@ use crate::services::stats::RelayStats;
 use crate::services::store::{StoreService, StoreServicePool};
 use crate::services::test_store::{TestStore, TestStoreService};
 use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
-use crate::utils::{MemoryChecker, MemoryStat, ThreadKind};
+use crate::utils::{MemoryChecker, MemoryStat, ThreadKind, ThreadWorkloadType};
 #[cfg(feature = "processing")]
 use anyhow::Context;
 use anyhow::Result;
@@ -110,7 +110,7 @@ fn create_processor_pool(config: &Config) -> Result<EnvelopeProcessorServicePool
 
     let pool = crate::utils::ThreadPoolBuilder::new("processor", tokio::runtime::Handle::current())
         .num_threads(thread_count)
-        .max_concurrency(config.pool_tasks_concurrency())
+        .thread_workload_type(ThreadWorkloadType::WithIO)
         .thread_kind(ThreadKind::Worker)
         .build()?;
 
@@ -129,7 +129,7 @@ fn create_store_pool(config: &Config) -> Result<StoreServicePool> {
 
     let pool = crate::utils::ThreadPoolBuilder::new("store", tokio::runtime::Handle::current())
         .num_threads(thread_count)
-        .max_concurrency(config.pool_tasks_concurrency())
+        .thread_workload_type(ThreadWorkloadType::CpuBound)
         .build()?;
 
     Ok(pool)
diff --git a/relay-server/src/utils/thread_pool.rs b/relay-server/src/utils/thread_pool.rs
index 1f128743bac..4f1dbcce405 100644
--- a/relay-server/src/utils/thread_pool.rs
+++ b/relay-server/src/utils/thread_pool.rs
@@ -5,6 +5,21 @@ use tokio::runtime::Handle;
 
 use relay_threading::{AsyncPool, AsyncPoolBuilder};
 
+/// The workload type that a thread executes.
+#[derive(Default, Debug, Clone, Copy)]
+pub enum ThreadWorkloadType {
+    /// The thread spends some of its time doing I/O together with CPU-bound work which in the
+    /// context of async, would lead to the future yielding with I/O is performed.
+    ///
+    /// This is the default since it's assumed that an async pool is used mostly because we want to
+    /// execute asynchronous work that comprises mainly of I/O.
+    #[default]
+    WithIO,
+    /// The thread spends all of its time doing CPU-bound work which in the context of async, would
+    /// lead to the future blocking the entire executor.
+    CpuBound,
+}
+
 /// A thread kind.
 ///
 /// The thread kind has an effect on how threads are prioritized and scheduled.
@@ -22,7 +37,7 @@ pub struct ThreadPoolBuilder {
     name: &'static str,
     runtime: Handle,
     num_threads: usize,
-    max_concurrency: usize,
+    workload_type: ThreadWorkloadType,
     kind: ThreadKind,
 }
 
@@ -33,7 +48,7 @@ impl ThreadPoolBuilder {
             name,
             runtime,
             num_threads: 0,
-            max_concurrency: 1,
+            workload_type: ThreadWorkloadType::CpuBound,
             kind: ThreadKind::Default,
         }
     }
@@ -46,11 +61,9 @@ impl ThreadPoolBuilder {
         self
     }
 
-    /// Sets the maximum number of concurrent tasks per thread.
-    ///
-    /// See also [`AsyncPoolBuilder::max_concurrency`].
-    pub fn max_concurrency(mut self, max_concurrency: usize) -> Self {
-        self.max_concurrency = max_concurrency;
+    /// Configures the [`ThreadWorkloadType`] for all threads spawned in the pool.
+    pub fn thread_workload_type(mut self, workload_type: ThreadWorkloadType) -> Self {
+        self.workload_type = workload_type;
         self
     }
 
@@ -65,9 +78,10 @@ impl ThreadPoolBuilder {
     where
         F: Future<Output = ()> + Send + 'static,
     {
+        let max_concurrency = self.max_concurrency();
         AsyncPoolBuilder::new(self.runtime)
             .num_threads(self.num_threads)
-            .max_concurrency(self.max_concurrency)
+            .max_concurrency(max_concurrency)
             .thread_name(move |id| format!("pool-{name}-{id}", name = self.name))
             // In case of panic in a task sent to the pool, we catch it to continue the remaining
             // work and just log an error.
@@ -96,6 +110,18 @@ impl ThreadPoolBuilder {
             })
             .build()
     }
+
+    /// Computes the maximum concurrency of the async pool.
+    fn max_concurrency(&self) -> usize {
+        match self.workload_type {
+            // If the work in the pool has I/O, we want to be able to drive more tasks concurrently,
+            // so we do 10 tasks per thread (empirically determined).
+            ThreadWorkloadType::WithIO => 10,
+            // If the work in the pool is exclusively cpu bound, we want 1 task per thread since it
+            // won't yield being CPU-bound.
+            ThreadWorkloadType::CpuBound => 1,
+        }
+    }
 }
 
 #[cfg(unix)]
@@ -134,117 +160,89 @@ fn set_current_thread_priority(_kind: ThreadKind) {
 
 #[cfg(test)]
 mod tests {
+    use crate::utils::{ThreadKind, ThreadPoolBuilder};
+    use futures::FutureExt;
+    use std::sync::atomic::{AtomicI32, Ordering};
+    use std::sync::Arc;
+    use tokio::runtime::Handle;
+    use tokio::sync::Barrier;
 
-    // #[test]
-    // fn test_thread_pool_num_threads() {
-    //     let pool = ThreadPoolBuilder::new("s").num_threads(3).build().unwrap();
-    //     assert_eq!(pool.current_num_threads(), 3);
-    // }
-    //
-    // #[test]
-    // fn test_thread_pool_runtime() {
-    //     let rt = tokio::runtime::Runtime::new().unwrap();
-    //
-    //     let pool = ThreadPoolBuilder::new("s")
-    //         .num_threads(1)
-    //         .runtime(rt.handle().clone())
-    //         .build()
-    //         .unwrap();
-    //
-    //     let has_runtime = pool.install(|| tokio::runtime::Handle::try_current().is_ok());
-    //     assert!(has_runtime);
-    // }
-    //
-    // #[test]
-    // fn test_thread_pool_no_runtime() {
-    //     let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap();
-    //
-    //     let has_runtime = pool.install(|| tokio::runtime::Handle::try_current().is_ok());
-    //     assert!(!has_runtime);
-    // }
-    //
-    // #[test]
-    // fn test_thread_pool_panic() {
-    //     let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap();
-    //     let barrier = Arc::new(Barrier::new(2));
-    //
-    //     pool.spawn({
-    //         let barrier = Arc::clone(&barrier);
-    //         move || {
-    //             barrier.wait();
-    //             panic!();
-    //         }
-    //     });
-    //     barrier.wait();
-    //
-    //     pool.spawn({
-    //         let barrier = Arc::clone(&barrier);
-    //         move || {
-    //             barrier.wait();
-    //         }
-    //     });
-    //     barrier.wait();
-    // }
-    //
-    // #[test]
-    // #[cfg(unix)]
-    // fn test_thread_pool_priority() {
-    //     fn get_current_priority() -> i32 {
-    //         unsafe { libc::getpriority(libc::PRIO_PROCESS, 0) }
-    //     }
-    //
-    //     let default_prio = get_current_priority();
-    //
-    //     {
-    //         let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap();
-    //         let prio = pool.install(get_current_priority);
-    //         // Default pool priority must match current priority.
-    //         assert_eq!(prio, default_prio);
-    //     }
-    //
-    //     {
-    //         let pool = ThreadPoolBuilder::new("s")
-    //             .num_threads(1)
-    //             .thread_kind(ThreadKind::Worker)
-    //             .build()
-    //             .unwrap();
-    //         let prio = pool.install(get_current_priority);
-    //         // Worker must be higher than the default priority (higher number = lower priority).
-    //         assert!(prio > default_prio);
-    //     }
-    // }
-    //
-    // #[test]
-    // fn test_worker_group_backpressure() {
-    //     let pool = ThreadPoolBuilder::new("s").num_threads(1).build().unwrap();
-    //     let workers = WorkerGroup::new(pool);
-    //
-    //     // Num Threads * 2 is the limit after backpressure kicks in
-    //     let barrier = Arc::new(Barrier::new(2));
-    //
-    //     let spawn = || {
-    //         let barrier = Arc::clone(&barrier);
-    //         workers
-    //             .spawn(move || {
-    //                 barrier.wait();
-    //             })
-    //             .now_or_never()
-    //             .is_some()
-    //     };
-    //
-    //     for _ in 0..15 {
-    //         // Pool should accept two immediately.
-    //         assert!(spawn());
-    //         assert!(spawn());
-    //         // Pool should reject because there are already 2 tasks active.
-    //         assert!(!spawn());
-    //
-    //         // Unblock the barrier
-    //         barrier.wait(); // first spawn
-    //         barrier.wait(); // second spawn
-    //
-    //         // wait a tiny bit to make sure the semaphore handle is dropped
-    //         thread::sleep(Duration::from_millis(50));
-    //     }
-    // }
+    #[tokio::test]
+    async fn test_thread_pool_panic() {
+        let pool = ThreadPoolBuilder::new("s", Handle::current())
+            .num_threads(1)
+            .build()
+            .unwrap();
+        let barrier = Arc::new(Barrier::new(2));
+
+        let barrier_clone = barrier.clone();
+        pool.spawn(
+            async move {
+                barrier_clone.wait().await;
+                panic!();
+            }
+            .boxed(),
+        );
+        barrier.wait().await;
+
+        let barrier_clone = barrier.clone();
+        pool.spawn(
+            async move {
+                barrier_clone.wait().await;
+            }
+            .boxed(),
+        );
+        barrier.wait().await;
+    }
+
+    #[tokio::test]
+    #[cfg(unix)]
+    async fn test_thread_pool_priority() {
+        fn get_current_priority() -> i32 {
+            unsafe { libc::getpriority(libc::PRIO_PROCESS, 0) }
+        }
+
+        let default_priority = get_current_priority();
+
+        {
+            let pool = ThreadPoolBuilder::new("s", Handle::current())
+                .num_threads(1)
+                .build()
+                .unwrap();
+
+            let barrier = Arc::new(Barrier::new(2));
+            let priority = Arc::new(AtomicI32::new(0));
+            let barrier_clone = barrier.clone();
+            let priority_clone = priority.clone();
+            pool.spawn(async move {
+                priority_clone.store(get_current_priority(), Ordering::SeqCst);
+                barrier_clone.wait().await;
+            });
+            barrier.wait().await;
+
+            // Default pool priority must match current priority.
+            assert_eq!(priority.load(Ordering::SeqCst), default_priority);
+        }
+
+        {
+            let pool = ThreadPoolBuilder::new("s", Handle::current())
+                .num_threads(1)
+                .thread_kind(ThreadKind::Worker)
+                .build()
+                .unwrap();
+
+            let barrier = Arc::new(Barrier::new(2));
+            let priority = Arc::new(AtomicI32::new(0));
+            let barrier_clone = barrier.clone();
+            let priority_clone = priority.clone();
+            pool.spawn(async move {
+                priority_clone.store(get_current_priority(), Ordering::SeqCst);
+                barrier_clone.wait().await;
+            });
+            barrier.wait().await;
+
+            // Worker must be higher than the default priority (higher number = lower priority).
+            assert!(priority.load(Ordering::SeqCst) > default_priority);
+        }
+    }
 }

From 6694b9a2942e8640c160e4c3d6435ad47bf830af Mon Sep 17 00:00:00 2001
From: Riccardo Busetti <riccardob36@gmail.com>
Date: Mon, 24 Feb 2025 11:29:22 +0100
Subject: [PATCH 04/14] Fix

---
 relay-config/src/config.rs            | 10 +++++++
 relay-server/src/service.rs           |  6 ++--
 relay-server/src/utils/thread_pool.rs | 42 +++++----------------------
 3 files changed, 21 insertions(+), 37 deletions(-)

diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs
index 9aa310b15a7..50c3944b7ce 100644
--- a/relay-config/src/config.rs
+++ b/relay-config/src/config.rs
@@ -638,6 +638,10 @@ pub struct Limits {
     /// The total number of threads spawned will roughly be `2 * max_thread_count`. Defaults to
     /// the number of logical CPU cores on the host.
     pub max_thread_count: usize,
+    /// The maximum number of tasks to execute within each thread of the asynchronous pool.
+    ///
+    /// Each thread will be able to drive concurrently at most `max_pool_concurrency` futures.
+    pub max_pool_concurrency: usize,
     /// The maximum number of seconds a query is allowed to take across retries. Individual requests
     /// have lower timeouts. Defaults to 30 seconds.
     pub query_timeout: u64,
@@ -695,6 +699,7 @@ impl Default for Limits {
             max_replay_uncompressed_size: ByteSize::mebibytes(100),
             max_replay_message_size: ByteSize::mebibytes(15),
             max_thread_count: num_cpus::get(),
+            max_pool_concurrency: 0,
             query_timeout: 30,
             shutdown_timeout: 10,
             keepalive_timeout: 5,
@@ -2349,6 +2354,11 @@ impl Config {
         self.values.limits.max_thread_count
     }
 
+    /// Returns the number of tasks that can run concurrently.
+    pub fn pool_concurrency(&self) -> usize {
+        self.values.limits.max_pool_concurrency
+    }
+
     /// Returns the maximum size of a project config query.
     pub fn query_batch_size(&self) -> usize {
         self.values.cache.batch_size
diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs
index 7141341d889..e0d65fcc579 100644
--- a/relay-server/src/service.rs
+++ b/relay-server/src/service.rs
@@ -24,7 +24,7 @@ use crate::services::stats::RelayStats;
 use crate::services::store::{StoreService, StoreServicePool};
 use crate::services::test_store::{TestStore, TestStoreService};
 use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
-use crate::utils::{MemoryChecker, MemoryStat, ThreadKind, ThreadWorkloadType};
+use crate::utils::{MemoryChecker, MemoryStat, ThreadKind};
 #[cfg(feature = "processing")]
 use anyhow::Context;
 use anyhow::Result;
@@ -110,7 +110,7 @@ fn create_processor_pool(config: &Config) -> Result<EnvelopeProcessorServicePool
 
     let pool = crate::utils::ThreadPoolBuilder::new("processor", tokio::runtime::Handle::current())
         .num_threads(thread_count)
-        .thread_workload_type(ThreadWorkloadType::WithIO)
+        .max_concurrency(config.pool_concurrency())
         .thread_kind(ThreadKind::Worker)
         .build()?;
 
@@ -129,7 +129,7 @@ fn create_store_pool(config: &Config) -> Result<StoreServicePool> {
 
     let pool = crate::utils::ThreadPoolBuilder::new("store", tokio::runtime::Handle::current())
         .num_threads(thread_count)
-        .thread_workload_type(ThreadWorkloadType::CpuBound)
+        .max_concurrency(config.pool_concurrency())
         .build()?;
 
     Ok(pool)
diff --git a/relay-server/src/utils/thread_pool.rs b/relay-server/src/utils/thread_pool.rs
index 4f1dbcce405..e2267afb980 100644
--- a/relay-server/src/utils/thread_pool.rs
+++ b/relay-server/src/utils/thread_pool.rs
@@ -5,21 +5,6 @@ use tokio::runtime::Handle;
 
 use relay_threading::{AsyncPool, AsyncPoolBuilder};
 
-/// The workload type that a thread executes.
-#[derive(Default, Debug, Clone, Copy)]
-pub enum ThreadWorkloadType {
-    /// The thread spends some of its time doing I/O together with CPU-bound work which in the
-    /// context of async, would lead to the future yielding with I/O is performed.
-    ///
-    /// This is the default since it's assumed that an async pool is used mostly because we want to
-    /// execute asynchronous work that comprises mainly of I/O.
-    #[default]
-    WithIO,
-    /// The thread spends all of its time doing CPU-bound work which in the context of async, would
-    /// lead to the future blocking the entire executor.
-    CpuBound,
-}
-
 /// A thread kind.
 ///
 /// The thread kind has an effect on how threads are prioritized and scheduled.
@@ -37,7 +22,7 @@ pub struct ThreadPoolBuilder {
     name: &'static str,
     runtime: Handle,
     num_threads: usize,
-    workload_type: ThreadWorkloadType,
+    max_concurrency: usize,
     kind: ThreadKind,
 }
 
@@ -48,7 +33,7 @@ impl ThreadPoolBuilder {
             name,
             runtime,
             num_threads: 0,
-            workload_type: ThreadWorkloadType::CpuBound,
+            max_concurrency: 1,
             kind: ThreadKind::Default,
         }
     }
@@ -61,9 +46,11 @@ impl ThreadPoolBuilder {
         self
     }
 
-    /// Configures the [`ThreadWorkloadType`] for all threads spawned in the pool.
-    pub fn thread_workload_type(mut self, workload_type: ThreadWorkloadType) -> Self {
-        self.workload_type = workload_type;
+    /// Sets the maximum number of tasks that can run concurrently per thread.
+    ///
+    /// See also [`AsyncPoolBuilder::max_concurrency`].
+    pub fn max_concurrency(mut self, max_concurrency: usize) -> Self {
+        self.max_concurrency = max_concurrency;
         self
     }
 
@@ -78,10 +65,9 @@ impl ThreadPoolBuilder {
     where
         F: Future<Output = ()> + Send + 'static,
     {
-        let max_concurrency = self.max_concurrency();
         AsyncPoolBuilder::new(self.runtime)
             .num_threads(self.num_threads)
-            .max_concurrency(max_concurrency)
+            .max_concurrency(self.max_concurrency)
             .thread_name(move |id| format!("pool-{name}-{id}", name = self.name))
             // In case of panic in a task sent to the pool, we catch it to continue the remaining
             // work and just log an error.
@@ -110,18 +96,6 @@ impl ThreadPoolBuilder {
             })
             .build()
     }
-
-    /// Computes the maximum concurrency of the async pool.
-    fn max_concurrency(&self) -> usize {
-        match self.workload_type {
-            // If the work in the pool has I/O, we want to be able to drive more tasks concurrently,
-            // so we do 10 tasks per thread (empirically determined).
-            ThreadWorkloadType::WithIO => 10,
-            // If the work in the pool is exclusively cpu bound, we want 1 task per thread since it
-            // won't yield being CPU-bound.
-            ThreadWorkloadType::CpuBound => 1,
-        }
-    }
 }
 
 #[cfg(unix)]

From 5a62978d69bf51602f50004dbe438140dbb341a6 Mon Sep 17 00:00:00 2001
From: Riccardo Busetti <riccardob36@gmail.com>
Date: Mon, 24 Feb 2025 15:54:12 +0100
Subject: [PATCH 05/14] Fix

---
 Cargo.lock                            |  1 +
 relay-server/src/utils/thread_pool.rs |  1 -
 relay-threading/Cargo.toml            |  4 +-
 relay-threading/src/builder.rs        |  9 +++
 relay-threading/src/lib.rs            |  1 +
 relay-threading/src/metrics.rs        | 18 ++++++
 relay-threading/src/multiplexing.rs   | 86 ++++++++++++++++++++++++---
 relay-threading/src/pool.rs           | 49 +++++++++++----
 8 files changed, 147 insertions(+), 22 deletions(-)
 create mode 100644 relay-threading/src/metrics.rs

diff --git a/Cargo.lock b/Cargo.lock
index 4974c2d5fd0..aa8e0a38bcb 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4090,6 +4090,7 @@ dependencies = [
  "flume",
  "futures",
  "pin-project-lite",
+ "relay-statsd",
  "tokio",
 ]
 
diff --git a/relay-server/src/utils/thread_pool.rs b/relay-server/src/utils/thread_pool.rs
index e2267afb980..4295cef8eb7 100644
--- a/relay-server/src/utils/thread_pool.rs
+++ b/relay-server/src/utils/thread_pool.rs
@@ -83,7 +83,6 @@ impl ThreadPoolBuilder {
             })
             .spawn_handler(|thread| {
                 let mut b = thread::Builder::new();
-
                 if let Some(name) = thread.name() {
                     b = b.name(name.to_owned());
                 }
diff --git a/relay-threading/Cargo.toml b/relay-threading/Cargo.toml
index 7e0038ab985..72ac995997f 100644
--- a/relay-threading/Cargo.toml
+++ b/relay-threading/Cargo.toml
@@ -10,11 +10,13 @@ license-file = "../LICENSE.md"
 publish = false
 
 [dependencies]
-flume = { workspace = true }
+flume = { workspace = true, features = ["async"] }
 futures = { workspace = true }
 tokio = { workspace = true }
 pin-project-lite = { workspace = true }
 
+relay-statsd = { workspace = true }
+
 [dev-dependencies]
 criterion = { workspace = true, features = ["async_tokio"] }
 futures = { workspace = true, features = ["executor"] }
diff --git a/relay-threading/src/builder.rs b/relay-threading/src/builder.rs
index 3879a1afa59..d4203e6d49d 100644
--- a/relay-threading/src/builder.rs
+++ b/relay-threading/src/builder.rs
@@ -16,6 +16,7 @@ pub(crate) type PanicHandler = dyn Fn(Box<dyn Any + Send>) + Send + Sync;
 /// and panic handling strategies.
 pub struct AsyncPoolBuilder<S = DefaultSpawn> {
     pub(crate) runtime: tokio::runtime::Handle,
+    pub(crate) pool_name: Option<Arc<str>>,
     pub(crate) thread_name: Option<Box<dyn FnMut(usize) -> String>>,
     pub(crate) thread_panic_handler: Option<Arc<PanicHandler>>,
     pub(crate) task_panic_handler: Option<Arc<PanicHandler>>,
@@ -31,6 +32,7 @@ impl AsyncPoolBuilder<DefaultSpawn> {
     pub fn new(runtime: tokio::runtime::Handle) -> AsyncPoolBuilder<DefaultSpawn> {
         AsyncPoolBuilder {
             runtime,
+            pool_name: None,
             thread_name: None,
             thread_panic_handler: None,
             task_panic_handler: None,
@@ -45,6 +47,12 @@ impl<S> AsyncPoolBuilder<S>
 where
     S: ThreadSpawn,
 {
+    /// Specifies a custom name for this pool.
+    pub fn pool_name(mut self, pool_name: String) -> Self {
+        self.pool_name = Some(pool_name.into());
+        self
+    }
+
     /// Specifies a custom naming convention for threads in the [`AsyncPool`].
     ///
     /// The provided closure receives the thread's index and returns a name,
@@ -91,6 +99,7 @@ where
     {
         AsyncPoolBuilder {
             runtime: self.runtime,
+            pool_name: self.pool_name,
             thread_name: self.thread_name,
             thread_panic_handler: self.thread_panic_handler,
             task_panic_handler: self.task_panic_handler,
diff --git a/relay-threading/src/lib.rs b/relay-threading/src/lib.rs
index 3c647b987a0..69355e72eb8 100644
--- a/relay-threading/src/lib.rs
+++ b/relay-threading/src/lib.rs
@@ -50,6 +50,7 @@
 //! recovery from panics in either thread execution or individual tasks.
 
 mod builder;
+mod metrics;
 mod multiplexing;
 mod pool;
 
diff --git a/relay-threading/src/metrics.rs b/relay-threading/src/metrics.rs
new file mode 100644
index 00000000000..500ca320f4f
--- /dev/null
+++ b/relay-threading/src/metrics.rs
@@ -0,0 +1,18 @@
+use relay_statsd::GaugeMetric;
+
+/// Gauge metrics emitted by the asynchronous pool.
+pub enum AsyncPoolGauges {
+    /// Number of futures queued up for execution in the asynchronous pool.
+    AsyncPoolQueueSize,
+    /// Number of futures being driven in each thread of the asynchronous pool.
+    AsyncPoolFuturesPerThread,
+}
+
+impl GaugeMetric for AsyncPoolGauges {
+    fn name(&self) -> &'static str {
+        match self {
+            Self::AsyncPoolQueueSize => "async_pool.queue_size",
+            Self::AsyncPoolFuturesPerThread => "async_pool.futures_per_thread",
+        }
+    }
+}
diff --git a/relay-threading/src/multiplexing.rs b/relay-threading/src/multiplexing.rs
index 2083ddd181c..1dcb5a7433a 100644
--- a/relay-threading/src/multiplexing.rs
+++ b/relay-threading/src/multiplexing.rs
@@ -4,6 +4,7 @@ use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
+use crate::metrics::AsyncPoolGauges;
 use crate::PanicHandler;
 use futures::future::CatchUnwind;
 use futures::stream::{FusedStream, FuturesUnordered, Stream};
@@ -101,6 +102,8 @@ pin_project! {
     ///
     /// This multiplexer is primarily used by the [`AsyncPool`] to manage task execution on worker threads.
     pub struct Multiplexed<S, F> {
+        pool_name: Arc<str>,
+        thread_name: Arc<str>,
         max_concurrency: usize,
         #[pin]
         rx: S,
@@ -117,8 +120,16 @@ where
     ///
     /// Tasks from the stream will be scheduled for execution concurrently, and an optional panic handler
     /// can be provided to manage errors during task execution.
-    pub fn new(max_concurrency: usize, rx: S, panic_handler: Option<Arc<PanicHandler>>) -> Self {
+    pub fn new(
+        pool_name: Arc<str>,
+        thread_name: Arc<str>,
+        max_concurrency: usize,
+        rx: S,
+        panic_handler: Option<Arc<PanicHandler>>,
+    ) -> Self {
         Self {
+            pool_name,
+            thread_name,
             max_concurrency,
             rx,
             tasks: Tasks::new(panic_handler),
@@ -141,6 +152,13 @@ where
         let mut this = self.project();
 
         loop {
+            // We report how many tasks are being concurrently polled in this future.
+            relay_statsd::metric!(
+                gauge(AsyncPoolGauges::AsyncPoolFuturesPerThread) = this.tasks.len() as u64,
+                pool_name = &this.pool_name,
+                thread_name = &this.thread_name
+            );
+
             this.tasks.as_mut().poll_tasks_until_pending(cx);
 
             // If we can't get anymore tasks, and we don't have anything else to process, we report
@@ -195,7 +213,13 @@ mod tests {
     #[test]
     fn test_multiplexer_with_no_futures() {
         let (_, rx) = flume::bounded::<BoxFuture<'static, _>>(10);
-        futures::executor::block_on(Multiplexed::new(1, rx.into_stream(), None));
+        futures::executor::block_on(Multiplexed::new(
+            "my_pool".into(),
+            "my_thread".into(),
+            1,
+            rx.into_stream(),
+            None,
+        ));
     }
 
     #[test]
@@ -218,6 +242,8 @@ mod tests {
             panic_handler_called_clone.store(true, Ordering::SeqCst);
         };
         futures::executor::block_on(Multiplexed::new(
+            "my_pool".into(),
+            "my_thread".into(),
             1,
             rx.into_stream(),
             Some(Arc::new(panic_handler)),
@@ -243,7 +269,13 @@ mod tests {
         drop(tx);
 
         let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
-            futures::executor::block_on(Multiplexed::new(1, rx.into_stream(), None))
+            futures::executor::block_on(Multiplexed::new(
+                "my_pool".into(),
+                "my_thread".into(),
+                1,
+                rx.into_stream(),
+                None,
+            ))
         }));
 
         // The count is expected to have been incremented and the handler called.
@@ -264,7 +296,13 @@ mod tests {
 
         drop(tx);
 
-        futures::executor::block_on(Multiplexed::new(1, rx.into_stream(), None));
+        futures::executor::block_on(Multiplexed::new(
+            "my_pool".into(),
+            "my_thread".into(),
+            1,
+            rx.into_stream(),
+            None,
+        ));
 
         // The count is expected to have been incremented.
         assert_eq!(count.load(Ordering::SeqCst), 1);
@@ -285,7 +323,13 @@ mod tests {
 
         drop(tx);
 
-        futures::executor::block_on(Multiplexed::new(1, rx.into_stream(), None));
+        futures::executor::block_on(Multiplexed::new(
+            "my_pool".into(),
+            "my_thread".into(),
+            1,
+            rx.into_stream(),
+            None,
+        ));
 
         // The order of completion is expected to match the order of submission.
         assert_eq!(*entries.lock().unwrap(), (0..5).collect::<Vec<_>>());
@@ -304,7 +348,13 @@ mod tests {
 
         drop(tx);
 
-        futures::executor::block_on(Multiplexed::new(5, rx.into_stream(), None));
+        futures::executor::block_on(Multiplexed::new(
+            "my_pool".into(),
+            "my_thread".into(),
+            5,
+            rx.into_stream(),
+            None,
+        ));
 
         // The count is expected to have been incremented.
         assert_eq!(count.load(Ordering::SeqCst), 1);
@@ -325,7 +375,13 @@ mod tests {
 
         drop(tx);
 
-        futures::executor::block_on(Multiplexed::new(5, rx.into_stream(), None));
+        futures::executor::block_on(Multiplexed::new(
+            "my_pool".into(),
+            "my_thread".into(),
+            5,
+            rx.into_stream(),
+            None,
+        ));
 
         // The order of completion is expected to be the same as the order of submission.
         assert_eq!(*entries.lock().unwrap(), (0..5).collect::<Vec<_>>());
@@ -349,7 +405,13 @@ mod tests {
 
         drop(tx);
 
-        futures::executor::block_on(Multiplexed::new(5, rx.into_stream(), None));
+        futures::executor::block_on(Multiplexed::new(
+            "my_pool".into(),
+            "my_thread".into(),
+            5,
+            rx.into_stream(),
+            None,
+        ));
 
         // The order of completion is expected to be the same as the order of submission.
         assert_eq!(*entries.lock().unwrap(), (0..3).collect::<Vec<_>>());
@@ -379,7 +441,13 @@ mod tests {
 
         drop(tx);
 
-        futures::executor::block_on(Multiplexed::new(5, rx.into_stream(), None));
+        futures::executor::block_on(Multiplexed::new(
+            "my_pool".into(),
+            "my_thread".into(),
+            5,
+            rx.into_stream(),
+            None,
+        ));
 
         // The order of completion may vary; verify that all expected elements are present.
         let mut entries = entries.lock().unwrap();
diff --git a/relay-threading/src/pool.rs b/relay-threading/src/pool.rs
index a0163b19e10..6d6b332607d 100644
--- a/relay-threading/src/pool.rs
+++ b/relay-threading/src/pool.rs
@@ -1,21 +1,26 @@
 use std::future::Future;
 use std::io;
 use std::panic::AssertUnwindSafe;
-use std::sync::Arc;
+use std::sync::{Arc, LazyLock};
 
 use futures::future::BoxFuture;
 use futures::FutureExt;
 
 use crate::builder::AsyncPoolBuilder;
+use crate::metrics::AsyncPoolGauges;
 use crate::multiplexing::Multiplexed;
 use crate::PanicHandler;
 
+/// Default name of the pool.
+static DEFAULT_POOL_NAME: LazyLock<Arc<str>> = LazyLock::new(|| Arc::from("unnamed"));
+
 /// [`AsyncPool`] is a thread-based executor that runs asynchronous tasks on dedicated worker threads.
 ///
 /// The pool collects tasks through a bounded channel and distributes them among threads, each of which runs its own
 /// Tokio executor. This design enables controlled concurrency and efficient use of system resources.
 #[derive(Debug)]
 pub struct AsyncPool<F> {
+    name: Arc<str>,
     tx: flume::Sender<F>,
 }
 
@@ -31,18 +36,23 @@ where
     where
         S: ThreadSpawn,
     {
+        let pool_name = builder.pool_name.unwrap_or(DEFAULT_POOL_NAME.clone());
         let (tx, rx) = flume::bounded(builder.num_threads * 2);
 
-        for index in 0..builder.num_threads {
+        for thread_id in 0..builder.num_threads {
             let rx = rx.clone();
+            let thread_name: Option<Arc<str>> =
+                builder.thread_name.as_mut().map(|f| f(thread_id).into());
 
             let thread = Thread {
-                index,
+                id: thread_id,
                 max_concurrency: builder.max_concurrency,
-                name: builder.thread_name.as_mut().map(|f| f(index)),
+                name: thread_name.clone(),
                 runtime: builder.runtime.clone(),
                 panic_handler: builder.thread_panic_handler.clone(),
                 task: Multiplexed::new(
+                    pool_name.clone(),
+                    thread_name.unwrap_or(format!("thread-{}", thread_id).into()),
                     builder.max_concurrency,
                     rx.into_stream(),
                     builder.task_panic_handler.clone(),
@@ -53,7 +63,10 @@ where
             builder.spawn_handler.spawn(thread)?;
         }
 
-        Ok(Self { tx })
+        Ok(Self {
+            name: pool_name.clone(),
+            tx,
+        })
     }
 }
 
@@ -74,6 +87,8 @@ where
             self.tx.send(future).is_ok(),
             "failed to schedule task: all worker threads have terminated (either none were spawned or all have panicked)"
         );
+
+        self.track_queue_size()
     }
 
     /// Asynchronously enqueues a future for execution within the [`AsyncPool`].
@@ -89,14 +104,26 @@ where
             self.tx.send_async(future).await.is_ok(),
             "failed to schedule task: all worker threads have terminated (either none were spawned or all have panicked)"
         );
+
+        self.track_queue_size()
+    }
+
+    /// Tracks the amount of elements in the queue containing all futures to be scheduled.
+    fn track_queue_size(&self) {
+        // On each poll, we report how many items are in the queue containing futures to dispatch
+        // across all threads.
+        relay_statsd::metric!(
+            gauge(AsyncPoolGauges::AsyncPoolQueueSize) = self.tx.len() as u64,
+            pool_name = &self.name
+        );
     }
 }
 
 /// [`Thread`] represents a dedicated worker thread within an [`AsyncPool`] that executes scheduled tasks.
 pub struct Thread {
-    index: usize,
+    id: usize,
     max_concurrency: usize,
-    name: Option<String>,
+    name: Option<Arc<str>>,
     runtime: tokio::runtime::Handle,
     panic_handler: Option<Arc<PanicHandler>>,
     task: BoxFuture<'static, ()>,
@@ -106,8 +133,8 @@ impl Thread {
     /// Returns the unique index assigned to this [`Thread`].
     ///
     /// The index can help identify the thread during debugging or logging.
-    pub fn index(&self) -> usize {
-        self.index
+    pub fn id(&self) -> usize {
+        self.id
     }
 
     /// Returns the maximum number of concurrent tasks permitted on this [`Thread`].
@@ -356,9 +383,9 @@ mod tests {
         };
 
         Thread {
-            index: 0,
+            id: 0,
             max_concurrency: 1,
-            name: Some("test-thread".to_owned()),
+            name: Some("test-thread".into()),
             runtime: runtime.handle().clone(),
             panic_handler: Some(Arc::new(panic_handler)),
             task: async move {

From 8729efa8595d68a5307e9dbfb5efca78371480f0 Mon Sep 17 00:00:00 2001
From: Riccardo Busetti <riccardob36@gmail.com>
Date: Mon, 24 Feb 2025 17:04:47 +0100
Subject: [PATCH 06/14] Fix

---
 relay-server/src/utils/thread_pool.rs |  3 ++-
 relay-threading/src/builder.rs        |  6 +++---
 relay-threading/src/multiplexing.rs   | 26 +++++++++++++-------------
 relay-threading/src/pool.rs           | 19 +++++++++----------
 4 files changed, 27 insertions(+), 27 deletions(-)

diff --git a/relay-server/src/utils/thread_pool.rs b/relay-server/src/utils/thread_pool.rs
index 4295cef8eb7..57f95f02e79 100644
--- a/relay-server/src/utils/thread_pool.rs
+++ b/relay-server/src/utils/thread_pool.rs
@@ -66,9 +66,10 @@ impl ThreadPoolBuilder {
         F: Future<Output = ()> + Send + 'static,
     {
         AsyncPoolBuilder::new(self.runtime)
+            .pool_name(self.name)
+            .thread_name(move |id| format!("pool-{name}-{id}", name = self.name))
             .num_threads(self.num_threads)
             .max_concurrency(self.max_concurrency)
-            .thread_name(move |id| format!("pool-{name}-{id}", name = self.name))
             // In case of panic in a task sent to the pool, we catch it to continue the remaining
             // work and just log an error.
             .task_panic_handler(move |_panic| {
diff --git a/relay-threading/src/builder.rs b/relay-threading/src/builder.rs
index d4203e6d49d..f6565263f2d 100644
--- a/relay-threading/src/builder.rs
+++ b/relay-threading/src/builder.rs
@@ -16,7 +16,7 @@ pub(crate) type PanicHandler = dyn Fn(Box<dyn Any + Send>) + Send + Sync;
 /// and panic handling strategies.
 pub struct AsyncPoolBuilder<S = DefaultSpawn> {
     pub(crate) runtime: tokio::runtime::Handle,
-    pub(crate) pool_name: Option<Arc<str>>,
+    pub(crate) pool_name: Option<&'static str>,
     pub(crate) thread_name: Option<Box<dyn FnMut(usize) -> String>>,
     pub(crate) thread_panic_handler: Option<Arc<PanicHandler>>,
     pub(crate) task_panic_handler: Option<Arc<PanicHandler>>,
@@ -48,8 +48,8 @@ where
     S: ThreadSpawn,
 {
     /// Specifies a custom name for this pool.
-    pub fn pool_name(mut self, pool_name: String) -> Self {
-        self.pool_name = Some(pool_name.into());
+    pub fn pool_name(mut self, pool_name: &'static str) -> Self {
+        self.pool_name = Some(pool_name);
         self
     }
 
diff --git a/relay-threading/src/multiplexing.rs b/relay-threading/src/multiplexing.rs
index 1dcb5a7433a..c80b4d650ed 100644
--- a/relay-threading/src/multiplexing.rs
+++ b/relay-threading/src/multiplexing.rs
@@ -102,8 +102,8 @@ pin_project! {
     ///
     /// This multiplexer is primarily used by the [`AsyncPool`] to manage task execution on worker threads.
     pub struct Multiplexed<S, F> {
-        pool_name: Arc<str>,
-        thread_name: Arc<str>,
+        pool_name: &'static str,
+        thread_name: String,
         max_concurrency: usize,
         #[pin]
         rx: S,
@@ -121,8 +121,8 @@ where
     /// Tasks from the stream will be scheduled for execution concurrently, and an optional panic handler
     /// can be provided to manage errors during task execution.
     pub fn new(
-        pool_name: Arc<str>,
-        thread_name: Arc<str>,
+        pool_name: &'static str,
+        thread_name: String,
         max_concurrency: usize,
         rx: S,
         panic_handler: Option<Arc<PanicHandler>>,
@@ -214,7 +214,7 @@ mod tests {
     fn test_multiplexer_with_no_futures() {
         let (_, rx) = flume::bounded::<BoxFuture<'static, _>>(10);
         futures::executor::block_on(Multiplexed::new(
-            "my_pool".into(),
+            "my_pool",
             "my_thread".into(),
             1,
             rx.into_stream(),
@@ -242,7 +242,7 @@ mod tests {
             panic_handler_called_clone.store(true, Ordering::SeqCst);
         };
         futures::executor::block_on(Multiplexed::new(
-            "my_pool".into(),
+            "my_pool",
             "my_thread".into(),
             1,
             rx.into_stream(),
@@ -270,7 +270,7 @@ mod tests {
 
         let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
             futures::executor::block_on(Multiplexed::new(
-                "my_pool".into(),
+                "my_pool",
                 "my_thread".into(),
                 1,
                 rx.into_stream(),
@@ -297,7 +297,7 @@ mod tests {
         drop(tx);
 
         futures::executor::block_on(Multiplexed::new(
-            "my_pool".into(),
+            "my_pool",
             "my_thread".into(),
             1,
             rx.into_stream(),
@@ -324,7 +324,7 @@ mod tests {
         drop(tx);
 
         futures::executor::block_on(Multiplexed::new(
-            "my_pool".into(),
+            "my_pool",
             "my_thread".into(),
             1,
             rx.into_stream(),
@@ -349,7 +349,7 @@ mod tests {
         drop(tx);
 
         futures::executor::block_on(Multiplexed::new(
-            "my_pool".into(),
+            "my_pool",
             "my_thread".into(),
             5,
             rx.into_stream(),
@@ -376,7 +376,7 @@ mod tests {
         drop(tx);
 
         futures::executor::block_on(Multiplexed::new(
-            "my_pool".into(),
+            "my_pool",
             "my_thread".into(),
             5,
             rx.into_stream(),
@@ -406,7 +406,7 @@ mod tests {
         drop(tx);
 
         futures::executor::block_on(Multiplexed::new(
-            "my_pool".into(),
+            "my_pool",
             "my_thread".into(),
             5,
             rx.into_stream(),
@@ -442,7 +442,7 @@ mod tests {
         drop(tx);
 
         futures::executor::block_on(Multiplexed::new(
-            "my_pool".into(),
+            "my_pool",
             "my_thread".into(),
             5,
             rx.into_stream(),
diff --git a/relay-threading/src/pool.rs b/relay-threading/src/pool.rs
index 6d6b332607d..e2ae5b61f57 100644
--- a/relay-threading/src/pool.rs
+++ b/relay-threading/src/pool.rs
@@ -1,7 +1,7 @@
 use std::future::Future;
 use std::io;
 use std::panic::AssertUnwindSafe;
-use std::sync::{Arc, LazyLock};
+use std::sync::Arc;
 
 use futures::future::BoxFuture;
 use futures::FutureExt;
@@ -12,7 +12,7 @@ use crate::multiplexing::Multiplexed;
 use crate::PanicHandler;
 
 /// Default name of the pool.
-static DEFAULT_POOL_NAME: LazyLock<Arc<str>> = LazyLock::new(|| Arc::from("unnamed"));
+static DEFAULT_POOL_NAME: &str = "unnamed";
 
 /// [`AsyncPool`] is a thread-based executor that runs asynchronous tasks on dedicated worker threads.
 ///
@@ -20,7 +20,7 @@ static DEFAULT_POOL_NAME: LazyLock<Arc<str>> = LazyLock::new(|| Arc::from("unnam
 /// Tokio executor. This design enables controlled concurrency and efficient use of system resources.
 #[derive(Debug)]
 pub struct AsyncPool<F> {
-    name: Arc<str>,
+    name: &'static str,
     tx: flume::Sender<F>,
 }
 
@@ -36,13 +36,12 @@ where
     where
         S: ThreadSpawn,
     {
-        let pool_name = builder.pool_name.unwrap_or(DEFAULT_POOL_NAME.clone());
+        let pool_name = builder.pool_name.unwrap_or(DEFAULT_POOL_NAME);
         let (tx, rx) = flume::bounded(builder.num_threads * 2);
 
         for thread_id in 0..builder.num_threads {
             let rx = rx.clone();
-            let thread_name: Option<Arc<str>> =
-                builder.thread_name.as_mut().map(|f| f(thread_id).into());
+            let thread_name: Option<String> = builder.thread_name.as_mut().map(|f| f(thread_id));
 
             let thread = Thread {
                 id: thread_id,
@@ -51,8 +50,8 @@ where
                 runtime: builder.runtime.clone(),
                 panic_handler: builder.thread_panic_handler.clone(),
                 task: Multiplexed::new(
-                    pool_name.clone(),
-                    thread_name.unwrap_or(format!("thread-{}", thread_id).into()),
+                    pool_name,
+                    thread_name.unwrap_or(format!("thread-{}", thread_id)),
                     builder.max_concurrency,
                     rx.into_stream(),
                     builder.task_panic_handler.clone(),
@@ -64,7 +63,7 @@ where
         }
 
         Ok(Self {
-            name: pool_name.clone(),
+            name: pool_name,
             tx,
         })
     }
@@ -123,7 +122,7 @@ where
 pub struct Thread {
     id: usize,
     max_concurrency: usize,
-    name: Option<Arc<str>>,
+    name: Option<String>,
     runtime: tokio::runtime::Handle,
     panic_handler: Option<Arc<PanicHandler>>,
     task: BoxFuture<'static, ()>,

From d2b76013695ae05c766d7e0641e08b3aaecfc3e7 Mon Sep 17 00:00:00 2001
From: Riccardo Busetti <riccardob36@gmail.com>
Date: Mon, 24 Feb 2025 17:06:17 +0100
Subject: [PATCH 07/14] Fix

---
 relay-threading/src/pool.rs | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/relay-threading/src/pool.rs b/relay-threading/src/pool.rs
index e2ae5b61f57..04cd41dc1a3 100644
--- a/relay-threading/src/pool.rs
+++ b/relay-threading/src/pool.rs
@@ -12,7 +12,7 @@ use crate::multiplexing::Multiplexed;
 use crate::PanicHandler;
 
 /// Default name of the pool.
-static DEFAULT_POOL_NAME: &str = "unnamed";
+const DEFAULT_POOL_NAME: &str = "unnamed";
 
 /// [`AsyncPool`] is a thread-based executor that runs asynchronous tasks on dedicated worker threads.
 ///

From 51261dc5a42ba9e1bd5867d71ccab2332a9e255d Mon Sep 17 00:00:00 2001
From: Riccardo Busetti <riccardob36@gmail.com>
Date: Tue, 25 Feb 2025 10:16:26 +0100
Subject: [PATCH 08/14] Fix min concurrency

---
 relay-config/src/config.rs | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs
index 50c3944b7ce..6476c16eddd 100644
--- a/relay-config/src/config.rs
+++ b/relay-config/src/config.rs
@@ -699,7 +699,7 @@ impl Default for Limits {
             max_replay_uncompressed_size: ByteSize::mebibytes(100),
             max_replay_message_size: ByteSize::mebibytes(15),
             max_thread_count: num_cpus::get(),
-            max_pool_concurrency: 0,
+            max_pool_concurrency: 1,
             query_timeout: 30,
             shutdown_timeout: 10,
             keepalive_timeout: 5,

From 5880f0fc85ad097df7b306c9e639bc4808433d6e Mon Sep 17 00:00:00 2001
From: Riccardo Busetti <riccardob36@gmail.com>
Date: Tue, 25 Feb 2025 10:18:25 +0100
Subject: [PATCH 09/14] Update changelog

---
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index a7bd942b6b6..96086ee559d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -11,6 +11,7 @@
 
 - Track an utilization metric for internal services. ([#4501](https://github.com/getsentry/relay/pull/4501))
 - Add new `relay-threading` crate with asynchronous thread pool. ([#4500](https://github.com/getsentry/relay/pull/4500))
+- Adopt new `AsyncPool` for the `EnvelopeProcessorService` and `StoreService`. ([#4520](https://github.com/getsentry/relay/pull/4520))
 
 ## 25.2.0
 

From 28b8666a1cf40dcfe9d25a71049bf0c48a964b10 Mon Sep 17 00:00:00 2001
From: Riccardo Busetti <riccardob36@gmail.com>
Date: Tue, 25 Feb 2025 11:24:31 +0100
Subject: [PATCH 10/14] Improve

---
 relay-threading/src/pool.rs | 5 -----
 1 file changed, 5 deletions(-)

diff --git a/relay-threading/src/pool.rs b/relay-threading/src/pool.rs
index 04cd41dc1a3..b07cf4de40d 100644
--- a/relay-threading/src/pool.rs
+++ b/relay-threading/src/pool.rs
@@ -67,12 +67,7 @@ where
             tx,
         })
     }
-}
 
-impl<F> AsyncPool<F>
-where
-    F: Future<Output = ()>,
-{
     /// Schedules a future for execution within the [`AsyncPool`].
     ///
     /// The task is added to the pool's internal queue to be executed by an available worker thread.

From 0af7ec08c8a7aad6d259e4167a1abc4b6ab7d3cf Mon Sep 17 00:00:00 2001
From: Riccardo Busetti <riccardob36@gmail.com>
Date: Tue, 25 Feb 2025 13:16:36 +0100
Subject: [PATCH 11/14] Fix PR comments

---
 relay-config/src/config.rs             |  9 ++++++---
 relay-server/src/services/processor.rs |  9 ++-------
 relay-server/src/utils/thread_pool.rs  |  4 +++-
 relay-threading/src/multiplexing.rs    | 17 +++++++++--------
 4 files changed, 20 insertions(+), 19 deletions(-)

diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs
index 6476c16eddd..fce846d2f86 100644
--- a/relay-config/src/config.rs
+++ b/relay-config/src/config.rs
@@ -638,9 +638,12 @@ pub struct Limits {
     /// The total number of threads spawned will roughly be `2 * max_thread_count`. Defaults to
     /// the number of logical CPU cores on the host.
     pub max_thread_count: usize,
-    /// The maximum number of tasks to execute within each thread of the asynchronous pool.
+    /// Controls the maximum concurrency of each worker thread.
     ///
-    /// Each thread will be able to drive concurrently at most `max_pool_concurrency` futures.
+    /// Increasing the concurrency, can lead to a better utilization of worker threads by
+    /// increasing the amount of I/O done concurrently.
+    //
+    /// Currently has no effect on defaults to `1`.
     pub max_pool_concurrency: usize,
     /// The maximum number of seconds a query is allowed to take across retries. Individual requests
     /// have lower timeouts. Defaults to 30 seconds.
@@ -2354,7 +2357,7 @@ impl Config {
         self.values.limits.max_thread_count
     }
 
-    /// Returns the number of tasks that can run concurrently.
+    /// Returns the number of tasks that can run concurrently in the worker pool.
     pub fn pool_concurrency(&self) -> usize {
         self.values.limits.max_pool_concurrency
     }
diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs
index 33aae4b7fcc..3dae2b70f40 100644
--- a/relay-server/src/services/processor.rs
+++ b/relay-server/src/services/processor.rs
@@ -3090,7 +3090,7 @@ impl EnvelopeProcessorService {
         self.inner.rate_limiter.is_some()
     }
 
-    async fn handle_message(&self, message: EnvelopeProcessor) {
+    fn handle_message(&self, message: EnvelopeProcessor) {
         let ty = message.variant();
         let feature_weights = self.feature_weights(&message);
 
@@ -3163,12 +3163,7 @@ impl Service for EnvelopeProcessorService {
             let service = self.clone();
             self.inner
                 .pool
-                .spawn_async(
-                    async move {
-                        service.handle_message(message).await;
-                    }
-                    .boxed(),
-                )
+                .spawn_async(async move { service.handle_message(message) }.boxed())
                 .await;
         }
     }
diff --git a/relay-server/src/utils/thread_pool.rs b/relay-server/src/utils/thread_pool.rs
index 57f95f02e79..4b05ac00e2e 100644
--- a/relay-server/src/utils/thread_pool.rs
+++ b/relay-server/src/utils/thread_pool.rs
@@ -76,7 +76,9 @@ impl ThreadPoolBuilder {
                 relay_log::error!(
                     "task in pool {name} panicked, other tasks will continue execution",
                     name = self.name
-                )
+                );
+                // We want to propagate the panic to Relay since the failure of a thread is
+                std::process::exit(1);
             })
             // In case of panic in the thread, log it. After a panic in the thread, it will stop.
             .thread_panic_handler(move |_panic| {
diff --git a/relay-threading/src/multiplexing.rs b/relay-threading/src/multiplexing.rs
index c80b4d650ed..2dcf6365b8a 100644
--- a/relay-threading/src/multiplexing.rs
+++ b/relay-threading/src/multiplexing.rs
@@ -152,13 +152,6 @@ where
         let mut this = self.project();
 
         loop {
-            // We report how many tasks are being concurrently polled in this future.
-            relay_statsd::metric!(
-                gauge(AsyncPoolGauges::AsyncPoolFuturesPerThread) = this.tasks.len() as u64,
-                pool_name = &this.pool_name,
-                thread_name = &this.thread_name
-            );
-
             this.tasks.as_mut().poll_tasks_until_pending(cx);
 
             // If we can't get anymore tasks, and we don't have anything else to process, we report
@@ -176,7 +169,15 @@ where
 
             // At this point, we are free to start driving another future.
             match this.rx.as_mut().poll_next(cx) {
-                Poll::Ready(Some(task)) => this.tasks.push(task),
+                Poll::Ready(Some(task)) => {
+                    this.tasks.push(task);
+                    // We report how many tasks are being concurrently polled in this future.
+                    relay_statsd::metric!(
+                        gauge(AsyncPoolGauges::AsyncPoolFuturesPerThread) = this.tasks.len() as u64,
+                        pool_name = &this.pool_name,
+                        thread_name = &this.thread_name
+                    );
+                }
                 // The stream is exhausted and there are no remaining tasks.
                 Poll::Ready(None) if this.tasks.is_empty() => return Poll::Ready(()),
                 // The stream is exhausted but tasks remain active. Now we need to make sure we

From 9ce102434c3e5ceab0bc555ffad5907630411d81 Mon Sep 17 00:00:00 2001
From: Riccardo Busetti <riccardob36@gmail.com>
Date: Tue, 25 Feb 2025 13:22:36 +0100
Subject: [PATCH 12/14] Fix PR comments

---
 relay-server/src/utils/thread_pool.rs | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/relay-server/src/utils/thread_pool.rs b/relay-server/src/utils/thread_pool.rs
index 4b05ac00e2e..f8d48db73e2 100644
--- a/relay-server/src/utils/thread_pool.rs
+++ b/relay-server/src/utils/thread_pool.rs
@@ -77,12 +77,12 @@ impl ThreadPoolBuilder {
                     "task in pool {name} panicked, other tasks will continue execution",
                     name = self.name
                 );
-                // We want to propagate the panic to Relay since the failure of a thread is
-                std::process::exit(1);
             })
             // In case of panic in the thread, log it. After a panic in the thread, it will stop.
-            .thread_panic_handler(move |_panic| {
-                relay_log::error!("thread in pool {name} panicked", name = self.name)
+            .thread_panic_handler(move |panic| {
+                relay_log::error!("thread in pool {name} panicked", name = self.name);
+                // We want to propagate the panic to Relay since the failure of a thread is
+                std::panic::resume_unwind(panic);
             })
             .spawn_handler(|thread| {
                 let mut b = thread::Builder::new();

From 1079e2f9c26edd50c42fe639bb67501b5f163ee5 Mon Sep 17 00:00:00 2001
From: Riccardo Busetti <riccardob36@gmail.com>
Date: Tue, 25 Feb 2025 13:24:31 +0100
Subject: [PATCH 13/14] Fix PR comments

---
 relay-server/src/utils/thread_pool.rs | 1 -
 1 file changed, 1 deletion(-)

diff --git a/relay-server/src/utils/thread_pool.rs b/relay-server/src/utils/thread_pool.rs
index f8d48db73e2..e5bf8af2199 100644
--- a/relay-server/src/utils/thread_pool.rs
+++ b/relay-server/src/utils/thread_pool.rs
@@ -81,7 +81,6 @@ impl ThreadPoolBuilder {
             // In case of panic in the thread, log it. After a panic in the thread, it will stop.
             .thread_panic_handler(move |panic| {
                 relay_log::error!("thread in pool {name} panicked", name = self.name);
-                // We want to propagate the panic to Relay since the failure of a thread is
                 std::panic::resume_unwind(panic);
             })
             .spawn_handler(|thread| {

From e71187a096e3913fb134a6ae5a4c5c9d4a9a964a Mon Sep 17 00:00:00 2001
From: Riccardo Busetti <riccardob36@gmail.com>
Date: Tue, 25 Feb 2025 16:36:00 +0100
Subject: [PATCH 14/14] Fix PR comments

---
 relay-threading/src/multiplexing.rs | 71 ++++-------------------------
 relay-threading/src/pool.rs         |  1 -
 2 files changed, 9 insertions(+), 63 deletions(-)

diff --git a/relay-threading/src/multiplexing.rs b/relay-threading/src/multiplexing.rs
index 2dcf6365b8a..e68dd051a26 100644
--- a/relay-threading/src/multiplexing.rs
+++ b/relay-threading/src/multiplexing.rs
@@ -103,7 +103,6 @@ pin_project! {
     /// This multiplexer is primarily used by the [`AsyncPool`] to manage task execution on worker threads.
     pub struct Multiplexed<S, F> {
         pool_name: &'static str,
-        thread_name: String,
         max_concurrency: usize,
         #[pin]
         rx: S,
@@ -122,14 +121,12 @@ where
     /// can be provided to manage errors during task execution.
     pub fn new(
         pool_name: &'static str,
-        thread_name: String,
         max_concurrency: usize,
         rx: S,
         panic_handler: Option<Arc<PanicHandler>>,
     ) -> Self {
         Self {
             pool_name,
-            thread_name,
             max_concurrency,
             rx,
             tasks: Tasks::new(panic_handler),
@@ -174,8 +171,7 @@ where
                     // We report how many tasks are being concurrently polled in this future.
                     relay_statsd::metric!(
                         gauge(AsyncPoolGauges::AsyncPoolFuturesPerThread) = this.tasks.len() as u64,
-                        pool_name = &this.pool_name,
-                        thread_name = &this.thread_name
+                        pool_name = &this.pool_name
                     );
                 }
                 // The stream is exhausted and there are no remaining tasks.
@@ -214,13 +210,7 @@ mod tests {
     #[test]
     fn test_multiplexer_with_no_futures() {
         let (_, rx) = flume::bounded::<BoxFuture<'static, _>>(10);
-        futures::executor::block_on(Multiplexed::new(
-            "my_pool",
-            "my_thread".into(),
-            1,
-            rx.into_stream(),
-            None,
-        ));
+        futures::executor::block_on(Multiplexed::new("my_pool", 1, rx.into_stream(), None));
     }
 
     #[test]
@@ -244,7 +234,6 @@ mod tests {
         };
         futures::executor::block_on(Multiplexed::new(
             "my_pool",
-            "my_thread".into(),
             1,
             rx.into_stream(),
             Some(Arc::new(panic_handler)),
@@ -270,13 +259,7 @@ mod tests {
         drop(tx);
 
         let result = std::panic::catch_unwind(AssertUnwindSafe(|| {
-            futures::executor::block_on(Multiplexed::new(
-                "my_pool",
-                "my_thread".into(),
-                1,
-                rx.into_stream(),
-                None,
-            ))
+            futures::executor::block_on(Multiplexed::new("my_pool", 1, rx.into_stream(), None))
         }));
 
         // The count is expected to have been incremented and the handler called.
@@ -297,13 +280,7 @@ mod tests {
 
         drop(tx);
 
-        futures::executor::block_on(Multiplexed::new(
-            "my_pool",
-            "my_thread".into(),
-            1,
-            rx.into_stream(),
-            None,
-        ));
+        futures::executor::block_on(Multiplexed::new("my_pool", 1, rx.into_stream(), None));
 
         // The count is expected to have been incremented.
         assert_eq!(count.load(Ordering::SeqCst), 1);
@@ -324,13 +301,7 @@ mod tests {
 
         drop(tx);
 
-        futures::executor::block_on(Multiplexed::new(
-            "my_pool",
-            "my_thread".into(),
-            1,
-            rx.into_stream(),
-            None,
-        ));
+        futures::executor::block_on(Multiplexed::new("my_pool", 1, rx.into_stream(), None));
 
         // The order of completion is expected to match the order of submission.
         assert_eq!(*entries.lock().unwrap(), (0..5).collect::<Vec<_>>());
@@ -349,13 +320,7 @@ mod tests {
 
         drop(tx);
 
-        futures::executor::block_on(Multiplexed::new(
-            "my_pool",
-            "my_thread".into(),
-            5,
-            rx.into_stream(),
-            None,
-        ));
+        futures::executor::block_on(Multiplexed::new("my_pool", 5, rx.into_stream(), None));
 
         // The count is expected to have been incremented.
         assert_eq!(count.load(Ordering::SeqCst), 1);
@@ -376,13 +341,7 @@ mod tests {
 
         drop(tx);
 
-        futures::executor::block_on(Multiplexed::new(
-            "my_pool",
-            "my_thread".into(),
-            5,
-            rx.into_stream(),
-            None,
-        ));
+        futures::executor::block_on(Multiplexed::new("my_pool", 5, rx.into_stream(), None));
 
         // The order of completion is expected to be the same as the order of submission.
         assert_eq!(*entries.lock().unwrap(), (0..5).collect::<Vec<_>>());
@@ -406,13 +365,7 @@ mod tests {
 
         drop(tx);
 
-        futures::executor::block_on(Multiplexed::new(
-            "my_pool",
-            "my_thread".into(),
-            5,
-            rx.into_stream(),
-            None,
-        ));
+        futures::executor::block_on(Multiplexed::new("my_pool", 5, rx.into_stream(), None));
 
         // The order of completion is expected to be the same as the order of submission.
         assert_eq!(*entries.lock().unwrap(), (0..3).collect::<Vec<_>>());
@@ -442,13 +395,7 @@ mod tests {
 
         drop(tx);
 
-        futures::executor::block_on(Multiplexed::new(
-            "my_pool",
-            "my_thread".into(),
-            5,
-            rx.into_stream(),
-            None,
-        ));
+        futures::executor::block_on(Multiplexed::new("my_pool", 5, rx.into_stream(), None));
 
         // The order of completion may vary; verify that all expected elements are present.
         let mut entries = entries.lock().unwrap();
diff --git a/relay-threading/src/pool.rs b/relay-threading/src/pool.rs
index b07cf4de40d..c72288679b3 100644
--- a/relay-threading/src/pool.rs
+++ b/relay-threading/src/pool.rs
@@ -51,7 +51,6 @@ where
                 panic_handler: builder.thread_panic_handler.clone(),
                 task: Multiplexed::new(
                     pool_name,
-                    thread_name.unwrap_or(format!("thread-{}", thread_id)),
                     builder.max_concurrency,
                     rx.into_stream(),
                     builder.task_panic_handler.clone(),