Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pool): Use new AsyncPool for the processor and store services #4520

Merged
merged 15 commits into from
Mar 3, 2025
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

- Track an utilization metric for internal services. ([#4501](https://github.com/getsentry/relay/pull/4501))
- Add new `relay-threading` crate with asynchronous thread pool. ([#4500](https://github.com/getsentry/relay/pull/4500))
- Adopt new `AsyncPool` for the `EnvelopeProcessorService` and `StoreService`. ([#4520](https://github.com/getsentry/relay/pull/4520))

## 25.2.0

Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 11 additions & 1 deletion relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,10 @@ pub struct Limits {
/// The total number of threads spawned will roughly be `2 * max_thread_count`. Defaults to
/// the number of logical CPU cores on the host.
pub max_thread_count: usize,
/// The maximum number of tasks to execute within each thread of the asynchronous pool.
///
/// Each thread will be able to drive concurrently at most `max_pool_concurrency` futures.
pub max_pool_concurrency: usize,
/// The maximum number of seconds a query is allowed to take across retries. Individual requests
/// have lower timeouts. Defaults to 30 seconds.
pub query_timeout: u64,
Expand All @@ -646,7 +650,7 @@ pub struct Limits {
pub shutdown_timeout: u64,
/// Server keep-alive timeout in seconds.
///
/// By default keep-alive is set to a 5 seconds.
/// By default, keep-alive is set to 5 seconds.
pub keepalive_timeout: u64,
/// Server idle timeout in seconds.
///
Expand Down Expand Up @@ -695,6 +699,7 @@ impl Default for Limits {
max_replay_uncompressed_size: ByteSize::mebibytes(100),
max_replay_message_size: ByteSize::mebibytes(15),
max_thread_count: num_cpus::get(),
max_pool_concurrency: 1,
query_timeout: 30,
shutdown_timeout: 10,
keepalive_timeout: 5,
Expand Down Expand Up @@ -2349,6 +2354,11 @@ impl Config {
self.values.limits.max_thread_count
}

/// Returns the number of tasks that can run concurrently.
pub fn pool_concurrency(&self) -> usize {
self.values.limits.max_pool_concurrency
}

/// Returns the maximum size of a project config query.
pub fn query_batch_size(&self) -> usize {
self.values.cache.batch_size
Expand Down
1 change: 1 addition & 0 deletions relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ relay-sampling = { workspace = true }
relay-spans = { workspace = true }
relay-statsd = { workspace = true }
relay-system = { workspace = true }
relay-threading = { workspace = true }
reqwest = { workspace = true, features = [
"gzip",
"hickory-dns",
Expand Down
19 changes: 10 additions & 9 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ use crate::services::health_check::{HealthCheck, HealthCheckService};
use crate::services::metrics::RouterService;
use crate::services::outcome::{OutcomeProducer, OutcomeProducerService, TrackOutcome};
use crate::services::outcome_aggregator::OutcomeAggregator;
use crate::services::processor::{self, EnvelopeProcessor, EnvelopeProcessorService};
use crate::services::processor::{
self, EnvelopeProcessor, EnvelopeProcessorService, EnvelopeProcessorServicePool,
};
use crate::services::projects::cache::{ProjectCacheHandle, ProjectCacheService};
use crate::services::projects::source::ProjectSource;
use crate::services::relays::{RelayCache, RelayCacheService};
use crate::services::stats::RelayStats;
#[cfg(feature = "processing")]
use crate::services::store::StoreService;
use crate::services::store::{StoreService, StoreServicePool};
use crate::services::test_store::{TestStore, TestStoreService};
use crate::services::upstream::{UpstreamRelay, UpstreamRelayService};
use crate::utils::{MemoryChecker, MemoryStat, ThreadKind};
Expand All @@ -28,7 +30,6 @@ use anyhow::Context;
use anyhow::Result;
use axum::extract::FromRequestParts;
use axum::http::request::Parts;
use rayon::ThreadPool;
use relay_cogs::Cogs;
use relay_config::Config;
#[cfg(feature = "processing")]
Expand Down Expand Up @@ -97,7 +98,7 @@ pub fn create_runtime(name: &'static str, threads: usize) -> relay_system::Runti
.build()
}

fn create_processor_pool(config: &Config) -> Result<ThreadPool> {
fn create_processor_pool(config: &Config) -> Result<EnvelopeProcessorServicePool> {
// Adjust thread count for small cpu counts to not have too many idle cores
// and distribute workload better.
let thread_count = match config.cpu_concurrency() {
Expand All @@ -107,17 +108,17 @@ fn create_processor_pool(config: &Config) -> Result<ThreadPool> {
};
relay_log::info!("starting {thread_count} envelope processing workers");

let pool = crate::utils::ThreadPoolBuilder::new("processor")
let pool = crate::utils::ThreadPoolBuilder::new("processor", tokio::runtime::Handle::current())
.num_threads(thread_count)
.max_concurrency(config.pool_concurrency())
.thread_kind(ThreadKind::Worker)
.runtime(tokio::runtime::Handle::current())
.build()?;

Ok(pool)
}

#[cfg(feature = "processing")]
fn create_store_pool(config: &Config) -> Result<ThreadPool> {
fn create_store_pool(config: &Config) -> Result<StoreServicePool> {
// Spawn a store worker for every 12 threads in the processor pool.
// This ratio was found empirically and may need adjustments in the future.
//
Expand All @@ -126,9 +127,9 @@ fn create_store_pool(config: &Config) -> Result<ThreadPool> {
let thread_count = config.cpu_concurrency().div_ceil(12);
relay_log::info!("starting {thread_count} store workers");

let pool = crate::utils::ThreadPoolBuilder::new("store")
let pool = crate::utils::ThreadPoolBuilder::new("store", tokio::runtime::Handle::current())
.num_threads(thread_count)
.runtime(tokio::runtime::Handle::current())
.max_concurrency(config.pool_concurrency())
.build()?;

Ok(pool)
Expand Down
26 changes: 18 additions & 8 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use bytes::Bytes;
use chrono::{DateTime, Utc};
use flate2::write::{GzEncoder, ZlibEncoder};
use flate2::Compression;
use futures::future::BoxFuture;
use futures::FutureExt;
use relay_base_schema::project::{ProjectId, ProjectKey};
use relay_cogs::{AppFeature, Cogs, FeatureWeights, ResourceId, Token};
use relay_common::time::UnixTimestamp;
Expand Down Expand Up @@ -60,10 +62,10 @@ use crate::services::upstream::{
};
use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers};
use crate::utils::{
self, InvalidProcessingGroupType, ManagedEnvelope, SamplingResult, ThreadPool, TypedEnvelope,
WorkerGroup,
self, InvalidProcessingGroupType, ManagedEnvelope, SamplingResult, TypedEnvelope,
};
use relay_base_schema::organization::OrganizationId;
use relay_threading::AsyncPool;
#[cfg(feature = "processing")]
use {
crate::services::store::{Store, StoreEnvelope},
Expand Down Expand Up @@ -1078,6 +1080,9 @@ impl FromMessage<SubmitClientReports> for EnvelopeProcessor {
}
}

/// The asynchronous thread pool used for scheduling processing tasks in the processor.
pub type EnvelopeProcessorServicePool = AsyncPool<BoxFuture<'static, ()>>;

/// Service implementing the [`EnvelopeProcessor`] interface.
///
/// This service handles messages in a worker pool with configurable concurrency.
Expand Down Expand Up @@ -1110,7 +1115,7 @@ impl Default for Addrs {
}

struct InnerProcessor {
workers: WorkerGroup,
pool: EnvelopeProcessorServicePool,
config: Arc<Config>,
global_config: GlobalConfigHandle,
project_cache: ProjectCacheHandle,
Expand All @@ -1130,7 +1135,7 @@ impl EnvelopeProcessorService {
/// Creates a multi-threaded envelope processor.
#[cfg_attr(feature = "processing", expect(clippy::too_many_arguments))]
pub fn new(
pool: ThreadPool,
pool: EnvelopeProcessorServicePool,
config: Arc<Config>,
global_config: GlobalConfigHandle,
project_cache: ProjectCacheHandle,
Expand Down Expand Up @@ -1160,7 +1165,7 @@ impl EnvelopeProcessorService {
};

let inner = InnerProcessor {
workers: WorkerGroup::new(pool),
pool,
global_config,
project_cache,
cogs,
Expand Down Expand Up @@ -3085,7 +3090,7 @@ impl EnvelopeProcessorService {
self.inner.rate_limiter.is_some()
}

fn handle_message(&self, message: EnvelopeProcessor) {
async fn handle_message(&self, message: EnvelopeProcessor) {
let ty = message.variant();
let feature_weights = self.feature_weights(&message);

Expand Down Expand Up @@ -3157,8 +3162,13 @@ impl Service for EnvelopeProcessorService {
while let Some(message) = rx.recv().await {
let service = self.clone();
self.inner
.workers
.spawn(move || service.handle_message(message))
.pool
.spawn_async(
async move {
service.handle_message(message).await;
}
.boxed(),
)
.await;
}
}
Expand Down
23 changes: 15 additions & 8 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ use std::sync::Arc;

use bytes::Bytes;
use chrono::{DateTime, Utc};
use futures::future::BoxFuture;
use futures::FutureExt;
use relay_base_schema::data_category::DataCategory;
use relay_base_schema::project::ProjectId;
use relay_common::time::UnixTimestamp;
use relay_config::Config;
use relay_event_schema::protocol::{EventId, VALID_PLATFORMS};

use crate::envelope::{AttachmentType, Envelope, Item, ItemType};
use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message};
use relay_metrics::{
Bucket, BucketView, BucketViewValue, BucketsView, ByNamespace, FiniteF64, GaugeValue,
Expand All @@ -24,20 +27,19 @@ use relay_metrics::{
use relay_quotas::Scoping;
use relay_statsd::metric;
use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
use relay_threading::AsyncPool;
use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
use serde_json::Deserializer;
use uuid::Uuid;

use crate::envelope::{AttachmentType, Envelope, Item, ItemType};

use crate::metrics::{ArrayEncoding, BucketEncoder, MetricOutcomes};
use crate::service::ServiceError;
use crate::services::global_config::GlobalConfigHandle;
use crate::services::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::services::processor::Processed;
use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
use crate::utils::{FormDataIter, ThreadPool, TypedEnvelope, WorkerGroup};
use crate::utils::{FormDataIter, TypedEnvelope};

/// Fallback name used for attachment items without a `filename` header.
const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";
Expand Down Expand Up @@ -91,6 +93,9 @@ pub struct StoreMetrics {
pub retention: u16,
}

/// The asynchronous thread pool used for scheduling storing tasks in the envelope store.
pub type StoreServicePool = AsyncPool<BoxFuture<'static, ()>>;

/// Service interface for the [`StoreEnvelope`] message.
#[derive(Debug)]
pub enum Store {
Expand Down Expand Up @@ -128,7 +133,7 @@ impl FromMessage<StoreMetrics> for Store {

/// Service implementing the [`Store`] interface.
pub struct StoreService {
workers: WorkerGroup,
pool: StoreServicePool,
config: Arc<Config>,
global_config: GlobalConfigHandle,
outcome_aggregator: Addr<TrackOutcome>,
Expand All @@ -138,15 +143,15 @@ pub struct StoreService {

impl StoreService {
pub fn create(
pool: ThreadPool,
pool: StoreServicePool,
config: Arc<Config>,
global_config: GlobalConfigHandle,
outcome_aggregator: Addr<TrackOutcome>,
metric_outcomes: MetricOutcomes,
) -> anyhow::Result<Self> {
let producer = Producer::create(&config)?;
Ok(Self {
workers: WorkerGroup::new(pool),
pool,
config,
global_config,
outcome_aggregator,
Expand Down Expand Up @@ -1049,8 +1054,10 @@ impl Service for StoreService {

while let Some(message) = rx.recv().await {
let service = Arc::clone(&this);
this.workers
.spawn(move || service.handle_message(message))
// For now, we run each task synchronously, in the future we might explore how to make
// the store async.
this.pool
.spawn_async(async move { service.handle_message(message) }.boxed())
.await;
}

Expand Down
9 changes: 4 additions & 5 deletions relay-server/src/testutils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ use crate::metrics::{MetricOutcomes, MetricStats};
use crate::service::create_redis_pools;
use crate::services::global_config::GlobalConfigHandle;
use crate::services::outcome::TrackOutcome;
use crate::services::processor::{self, EnvelopeProcessorService};
use crate::services::processor::{self, EnvelopeProcessorService, EnvelopeProcessorServicePool};
use crate::services::projects::cache::ProjectCacheHandle;
use crate::services::projects::project::ProjectInfo;
use crate::services::test_store::TestStore;
use crate::utils::{ThreadPool, ThreadPoolBuilder};
use crate::utils::ThreadPoolBuilder;

pub fn state_with_rule_and_condition(
sample_rate: Option<f64>,
Expand Down Expand Up @@ -174,10 +174,9 @@ pub fn processor_services() -> (Addr<TrackOutcome>, Addr<TestStore>) {
(outcome_aggregator, test_store)
}

fn create_processor_pool() -> ThreadPool {
ThreadPoolBuilder::new("processor")
fn create_processor_pool() -> EnvelopeProcessorServicePool {
ThreadPoolBuilder::new("processor", tokio::runtime::Handle::current())
.num_threads(1)
.runtime(tokio::runtime::Handle::current())
.build()
.unwrap()
}
Loading
Loading