diff --git a/CHANGELOG.md b/CHANGELOG.md index fba96cee122..9aec942a3d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - Refactor tokio-based Addr from healthcheck to be generic. ([#1405](https://github.com/relay/pull/1405)) - Defer dropping of projects to a background thread to speed up project cache eviction. ([#1410](https://github.com/getsentry/relay/pull/1410)) +- Update store service to use generic Addr and minor changes to generic Addr. ([#1415](https://github.com/getsentry/relay/pull/1415)) **Features**: diff --git a/relay-server/src/actors/envelopes.rs b/relay-server/src/actors/envelopes.rs index 3b00dee2606..294aeac5d2b 100644 --- a/relay-server/src/actors/envelopes.rs +++ b/relay-server/src/actors/envelopes.rs @@ -35,8 +35,9 @@ use crate::utils::{self, EnvelopeContext, FutureExt as _, Semaphore}; #[cfg(feature = "processing")] use { - crate::actors::store::{StoreAddr, StoreEnvelope, StoreError, StoreForwarder}, + crate::actors::store::{StoreEnvelope, StoreError, StoreForwarder}, futures::{FutureExt, TryFutureExt}, + relay_system::Addr as ServiceAddr, tokio::runtime::Runtime, }; @@ -209,7 +210,7 @@ pub struct EnvelopeManager { captures: BTreeMap, processor: Addr, #[cfg(feature = "processing")] - store_forwarder: Option>, + store_forwarder: Option>, #[cfg(feature = "processing")] _runtime: Runtime, } @@ -222,11 +223,7 @@ impl EnvelopeManager { ) -> Result { // Enter the tokio runtime so we can start spawning tasks from the outside. #[cfg(feature = "processing")] - let runtime = tokio::runtime::Builder::new_multi_thread() - .worker_threads(1) - .enable_all() - .build() - .unwrap(); + let runtime = utils::tokio_runtime_with_actix(); #[cfg(feature = "processing")] let _guard = runtime.enter(); diff --git a/relay-server/src/actors/healthcheck.rs b/relay-server/src/actors/healthcheck.rs index 60790a85fa1..6a8bf07e438 100644 --- a/relay-server/src/actors/healthcheck.rs +++ b/relay-server/src/actors/healthcheck.rs @@ -1,6 +1,3 @@ -use std::error::Error; -use std::fmt; -use std::future::Future; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -15,90 +12,11 @@ use relay_system::{compat, Controller}; use crate::actors::upstream::{IsAuthenticated, IsNetworkOutage, UpstreamRelay}; use crate::statsd::RelayGauges; +use relay_system::{Addr, Service, ServiceMessage}; /// Singleton of the `Healthcheck` service. static ADDRESS: RwLock>> = RwLock::new(None); -/// Our definition of a service. -/// -/// Services are much like actors: they receive messages from an inbox and handles them one -/// by one. Services are free to concurrently process these messages or not, most probably -/// should. -/// -/// Messages always have a response which will be sent once the message is handled by the -/// service. -pub trait Service { - /// The envelope is what is sent to the inbox of this service. - /// - /// It is an enum of all the message types that can be handled by this service together - /// with the response [sender](oneshot::Sender) for each message. - type Envelope: Send + 'static; -} - -/// A message which can be sent to a service. -/// -/// Messages have an associated `Response` type and can be unconditionally converted into -/// the envelope type of their [`Service`]. -pub trait ServiceMessage { - type Response: Send + 'static; - - fn into_envelope(self) -> (S::Envelope, oneshot::Receiver); -} - -/// An error when [sending](Addr::send) a message to a service fails. -#[derive(Clone, Copy, Debug)] -pub struct SendError; - -impl fmt::Display for SendError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "failed to send message to service") - } -} - -impl Error for SendError {} - -/// The address for a [`Service`]. -/// -/// The address of a [`Service`] allows you to [send](Addr::send) messages to the service as -/// long as the service is running. It can be freely cloned. -#[derive(Debug)] -pub struct Addr { - tx: mpsc::UnboundedSender, -} - -// Manually derive clone since we do not require `S: Clone` and the Clone derive adds this -// constraint. -impl Clone for Addr { - fn clone(&self) -> Self { - Self { - tx: self.tx.clone(), - } - } -} - -impl Addr { - /// Sends an asynchronous message to the service and waits for the response. - /// - /// The result of the message does not have to be awaited. The message will be delivered and - /// handled regardless. The communication channel with the service is unbounded, so backlogs - /// could occur when sending too many messages. - /// - /// Sending the message can fail with `Err(SendError)` if the service has shut down. - // Note: this is written as returning `impl Future` instead of `async fn` in order not - // to capture the lifetime of `&self` in the returned future. - pub fn send(&self, message: M) -> impl Future> - where - M: ServiceMessage, - { - let (envelope, response_rx) = message.into_envelope(); - let res = self.tx.send(envelope); - async move { - res.map_err(|_| SendError)?; - response_rx.await.map_err(|_| SendError) - } - } -} - #[derive(Debug)] pub struct Healthcheck { is_shutting_down: AtomicBool, @@ -106,11 +24,11 @@ pub struct Healthcheck { } impl Service for Healthcheck { - type Envelope = HealthcheckEnvelope; + type Messages = HealthcheckMessages; } impl Healthcheck { - /// Returns the [`Addr`] of the [`Healthcheck`] actor. + /// Returns the [`Addr`] of the [`Healthcheck`] service. /// /// Prior to using this, the service must be started using [`Healthcheck::start`]. /// @@ -167,7 +85,7 @@ impl Healthcheck { /// Start this service, returning an [`Addr`] for communication. pub fn start(self) -> Addr { - let (tx, mut rx) = mpsc::unbounded_channel::(); + let (tx, mut rx) = mpsc::unbounded_channel::(); let addr = Addr { tx }; *ADDRESS.write() = Some(addr.clone()); @@ -178,14 +96,7 @@ impl Healthcheck { while let Some(message) = rx.recv().await { let service = main_service.clone(); - tokio::spawn(async move { - match message { - HealthcheckEnvelope::IsHealthy(msg, response_tx) => { - let response = service.handle_is_healthy(msg).await; - response_tx.send(response).ok() - } - }; - }); + tokio::spawn(async move { service.handle_message(message).await }); } }); @@ -202,6 +113,15 @@ impl Healthcheck { addr } + + async fn handle_message(&self, message: HealthcheckMessages) { + match message { + HealthcheckMessages::IsHealthy(msg, response_tx) => { + let response = self.handle_is_healthy(msg).await; + response_tx.send(response).ok() + } + }; + } } #[derive(Clone, Copy, Debug)] @@ -216,14 +136,14 @@ pub enum IsHealthy { impl ServiceMessage for IsHealthy { type Response = bool; - fn into_envelope(self) -> (HealthcheckEnvelope, oneshot::Receiver) { + fn into_messages(self) -> (HealthcheckMessages, oneshot::Receiver) { let (tx, rx) = oneshot::channel(); - (HealthcheckEnvelope::IsHealthy(self, tx), rx) + (HealthcheckMessages::IsHealthy(self, tx), rx) } } -/// All the message types which can be sent to the [`Healthcheck`] actor. +/// All the message types which can be sent to the [`Healthcheck`] service. #[derive(Debug)] -pub enum HealthcheckEnvelope { +pub enum HealthcheckMessages { IsHealthy(IsHealthy, oneshot::Sender), } diff --git a/relay-server/src/actors/store.rs b/relay-server/src/actors/store.rs index 54d55112602..71047e79b93 100644 --- a/relay-server/src/actors/store.rs +++ b/relay-server/src/actors/store.rs @@ -1,8 +1,7 @@ -//! This module contains the actor that forwards events and attachments to the Sentry store. -//! The actor uses kafka topics to forward data to Sentry +//! This module contains the service that forwards events and attachments to the Sentry store. +//! The service uses kafka topics to forward data to Sentry use std::collections::BTreeMap; -use std::fmt; use std::sync::Arc; use std::time::Instant; @@ -22,6 +21,7 @@ use relay_log::LogError; use relay_metrics::{Bucket, BucketValue, MetricNamespace, MetricResourceIdentifier}; use relay_quotas::Scoping; use relay_statsd::metric; +use relay_system::{Addr, Service, ServiceMessage}; use crate::envelope::{AttachmentType, Envelope, Item, ItemType}; use crate::service::{ServerError, ServerErrorKind}; @@ -51,51 +51,6 @@ pub enum StoreError { NoEventId, } -// TODO(tobias): Still need to unify with the message in the Healthcheck actor -/// Internal wrapper of a message sent through an `StoreAddr` with return channel. -#[derive(Debug)] -pub struct StoreMessage { - data: T, - responder: oneshot::Sender>, -} - -/// An error when [sending](StoreAddr::send) a message to a service fails. -#[derive(Clone, Copy, Debug)] -pub struct SendError; - -impl fmt::Display for SendError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "failed to send message to service") - } -} - -impl std::error::Error for SendError {} - -// TODO(tobias): Still need to unify with the Addr in the Healthcheck actor -/// Channel for sending public messages into a service. -/// -/// To send a message, use [`StoreAddr::send`]. -#[derive(Clone, Debug)] -pub struct StoreAddr { - tx: mpsc::UnboundedSender>, -} - -impl StoreAddr { - /// Sends an asynchronous message to the service and waits for the response. - /// - /// The result of the message does not have to be awaited. The message will be delivered and - /// handled regardless. The communication channel with the service is unbounded, so backlogs - /// could occur when sending too many messages. - /// - /// Sending the message can fail with `Err(SendError)` if the service has shut down. - pub async fn send(&self, data: T) -> Result, SendError> { - let (responder, rx) = oneshot::channel(); - let message = StoreMessage { data, responder }; - self.tx.send(message).map_err(|_| SendError)?; - rx.await.map_err(|_| SendError) - } -} - type Producer = Arc; struct Producers { @@ -130,14 +85,54 @@ impl Producers { KafkaTopic::ReplayRecordings => Some(&self.replay_recordings), } } + + pub fn create(config: &Arc) -> Result { + let mut reused_producers = BTreeMap::new(); + let producers = Producers { + attachments: make_producer(&**config, &mut reused_producers, KafkaTopic::Attachments)?, + events: make_producer(&**config, &mut reused_producers, KafkaTopic::Events)?, + transactions: make_producer( + &**config, + &mut reused_producers, + KafkaTopic::Transactions, + )?, + sessions: make_producer(&**config, &mut reused_producers, KafkaTopic::Sessions)?, + metrics_sessions: make_producer( + &**config, + &mut reused_producers, + KafkaTopic::MetricsSessions, + )?, + metrics_transactions: make_producer( + &**config, + &mut reused_producers, + KafkaTopic::MetricsTransactions, + )?, + profiles: make_producer(&**config, &mut reused_producers, KafkaTopic::Profiles)?, + replay_recordings: make_producer( + &**config, + &mut reused_producers, + KafkaTopic::ReplayRecordings, + )?, + replay_events: make_producer( + &**config, + &mut reused_producers, + KafkaTopic::ReplayEvents, + )?, + }; + Ok(producers) + } } -/// Actor for publishing events to Sentry through kafka topics. +/// Service for publishing events to Sentry through kafka topics. pub struct StoreForwarder { config: Arc, producers: Producers, } +impl Service for StoreForwarder { + type Messages = StoreMessages; +} + fn make_distinct_id(s: &str) -> Uuid { s.parse() .unwrap_or_else(|_| Uuid::new_v5(&NAMESPACE_DID, s.as_bytes())) @@ -176,7 +171,38 @@ fn make_producer<'a>( } impl StoreForwarder { - fn handle_store_evelope(&self, message: StoreEnvelope) -> Result<(), StoreError> { + pub fn start(self) -> Addr { + relay_log::info!("store forwarder started"); + + let (tx, mut rx) = mpsc::unbounded_channel::(); + + tokio::spawn(async move { + while let Some(message) = rx.recv().await { + self.handle_message(message); + } + + relay_log::info!("store forwarder stopped"); + }); + + Addr { tx } + } + + pub fn create(config: Arc) -> Result { + let producers = Producers::create(&config)?; + + Ok(Self { config, producers }) + } + + fn handle_message(&self, message: StoreMessages) { + match message { + StoreMessages::StoreEnvelope(msg, responder_tx) => { + let response = self.handle_store_envelope(msg); + responder_tx.send(response).ok(); + } + } + } + + fn handle_store_envelope(&self, message: StoreEnvelope) -> Result<(), StoreError> { let StoreEnvelope { envelope, start_time, @@ -314,61 +340,6 @@ impl StoreForwarder { Ok(()) } - pub fn start(self) -> StoreAddr { - relay_log::info!("store forwarder started"); - - let (tx, mut rx) = mpsc::unbounded_channel::>(); - - let service = Arc::new(self); - tokio::spawn(async move { - while let Some(message) = rx.recv().await { - let service = service.clone(); - - tokio::spawn(async move { - let response = service.handle_store_evelope(message.data); - message.responder.send(response).ok(); - }); - } - - relay_log::info!("store forwarder stopped"); - }); - - StoreAddr { tx } - } - - pub fn create(config: Arc) -> Result { - let mut reused_producers = BTreeMap::new(); - let producers = Producers { - attachments: make_producer(&*config, &mut reused_producers, KafkaTopic::Attachments)?, - events: make_producer(&*config, &mut reused_producers, KafkaTopic::Events)?, - transactions: make_producer(&*config, &mut reused_producers, KafkaTopic::Transactions)?, - sessions: make_producer(&*config, &mut reused_producers, KafkaTopic::Sessions)?, - metrics_sessions: make_producer( - &*config, - &mut reused_producers, - KafkaTopic::MetricsSessions, - )?, - metrics_transactions: make_producer( - &*config, - &mut reused_producers, - KafkaTopic::MetricsTransactions, - )?, - profiles: make_producer(&*config, &mut reused_producers, KafkaTopic::Profiles)?, - replay_recordings: make_producer( - &*config, - &mut reused_producers, - KafkaTopic::ReplayRecordings, - )?, - replay_events: make_producer( - &*config, - &mut reused_producers, - KafkaTopic::ReplayEvents, - )?, - }; - - Ok(Self { config, producers }) - } - fn produce(&self, topic: KafkaTopic, message: KafkaMessage) -> Result<(), StoreError> { let serialized = message.serialize()?; metric!( @@ -1030,7 +1001,7 @@ impl KafkaMessage { } } -/// Message sent to the StoreForwarder containing an event +/// Message sent to the [`StoreForwarder`] containing an [`Envelope`]. #[derive(Clone, Debug)] pub struct StoreEnvelope { pub envelope: Envelope, @@ -1038,6 +1009,21 @@ pub struct StoreEnvelope { pub scoping: Scoping, } +/// All the message types which can be sent to the [`StoreForwarder`]. +#[derive(Debug)] +pub enum StoreMessages { + StoreEnvelope(StoreEnvelope, oneshot::Sender>), +} + +impl ServiceMessage for StoreEnvelope { + type Response = Result<(), StoreError>; + + fn into_messages(self) -> (StoreMessages, oneshot::Receiver) { + let (tx, rx) = oneshot::channel(); + (StoreMessages::StoreEnvelope(self, tx), rx) + } +} + /// Determines if the given item is considered slow. /// /// Slow items must be routed to the `Attachments` topic. diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index 2d14172e831..16d204d9fe9 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -8,7 +8,6 @@ use failure::{Backtrace, Context, Fail}; use listenfd::ListenFd; use relay_aws_extension::AwsExtension; -use relay_common::clone; use relay_config::Config; use relay_metrics::Aggregator; use relay_redis::RedisPool; @@ -22,10 +21,10 @@ use crate::actors::processor::EnvelopeProcessor; use crate::actors::project_cache::ProjectCache; use crate::actors::relays::RelayCache; use crate::actors::upstream::UpstreamRelay; -use crate::endpoints; use crate::middlewares::{ AddCommonHeaders, ErrorHandlers, Metrics, ReadRequestMiddleware, SentryMiddleware, }; +use crate::{endpoints, utils}; /// Common error type for the relay server. #[derive(Debug)] @@ -120,12 +119,7 @@ impl ServiceState { let system = System::current(); let registry = system.registry(); - let runtime = tokio::runtime::Builder::new_multi_thread() - .worker_threads(1) - .enable_all() - .on_thread_start(clone!(system, || System::set_current(system.clone()))) - .build() - .unwrap(); + let runtime = utils::tokio_runtime_with_actix(); // Enter the tokio runtime so we can start spawning tasks from the outside. let _guard = runtime.enter(); diff --git a/relay-server/src/utils/actix.rs b/relay-server/src/utils/actix.rs index 792cd4128b3..0335de6fcc7 100644 --- a/relay-server/src/utils/actix.rs +++ b/relay-server/src/utils/actix.rs @@ -2,6 +2,9 @@ use ::actix::dev::{MessageResponse, ResponseChannel}; use ::actix::fut::IntoActorFuture; use ::actix::prelude::*; use futures01::prelude::*; +use tokio::runtime::Runtime; + +use relay_common::clone; pub enum Response { Reply(Result), @@ -137,3 +140,22 @@ impl Response { } } } + +/// Constructs a single threaded tokio [`Runtime`] containing a clone of the actix [`System`]. +/// +/// This is required if you need to send messages from the tokio runtime to actix +/// actors. +/// +/// # Panics +/// +/// The calling thread must have the actix system enabled, panics if this is invoked +/// in a thread where actix is not enabled. +pub fn tokio_runtime_with_actix() -> Runtime { + let system = System::current(); + tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .on_thread_start(clone!(system, || System::set_current(system.clone()))) + .build() + .unwrap() +} diff --git a/relay-system/src/lib.rs b/relay-system/src/lib.rs index 59722917766..3c40c996b71 100644 --- a/relay-system/src/lib.rs +++ b/relay-system/src/lib.rs @@ -16,5 +16,7 @@ pub mod compat; mod controller; +mod service; pub use self::controller::*; +pub use self::service::*; diff --git a/relay-system/src/service.rs b/relay-system/src/service.rs new file mode 100644 index 00000000000..b68eee2aac7 --- /dev/null +++ b/relay-system/src/service.rs @@ -0,0 +1,88 @@ +use std::fmt; +use std::future::Future; + +use tokio::sync::{mpsc, oneshot}; + +/// Our definition of a service. +/// +/// Services are much like actors: they receive messages from an inbox and handles them one +/// by one. Services are free to concurrently process these messages or not, most probably +/// should. +/// +/// Messages always have a response which will be sent once the message is handled by the +/// service. +pub trait Service { + /// The messages is what is sent to the inbox of this service. + /// + /// It is an enum of all the message types that can be handled by this service together + /// with the response [sender](oneshot::Sender) for each message. + type Messages: Send + 'static; +} + +/// A message which can be sent to a service. +/// +/// Messages have an associated `Response` type and can be unconditionally converted into +/// the messages type of their [`Service`]. +pub trait ServiceMessage { + /// The type of the `Response`. + type Response: Send + 'static; + + /// Creates and returns a wrapper containing the message and a Transmitter, also returns the + /// corresponding Receiver. + fn into_messages(self) -> (S::Messages, oneshot::Receiver); +} + +/// An error when [sending](Addr::send) a message to a service fails. +#[derive(Clone, Copy, Debug)] +pub struct SendError; + +impl fmt::Display for SendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "failed to send message to service") + } +} + +impl std::error::Error for SendError {} + +/// The address for a [`Service`]. +/// +/// The address of a [`Service`] allows you to [send](Addr::send) messages to the service as +/// long as the service is running. It can be freely cloned. +#[derive(Debug)] +pub struct Addr { + /// The transmitter of the channel used to communicate with the Service + pub tx: mpsc::UnboundedSender, +} + +// Manually derive clone since we do not require `S: Clone` and the Clone derive adds this +// constraint. +impl Clone for Addr { + fn clone(&self) -> Self { + Self { + tx: self.tx.clone(), + } + } +} + +impl Addr { + /// Sends an asynchronous message to the service and waits for the response. + /// + /// The result of the message does not have to be awaited. The message will be delivered and + /// handled regardless. The communication channel with the service is unbounded, so backlogs + /// could occur when sending too many messages. + /// + /// Sending the message can fail with `Err(SendError)` if the service has shut down. + // Note: this is written as returning `impl Future` instead of `async fn` in order not + // to capture the lifetime of `&self` in the returned future. + pub fn send(&self, message: M) -> impl Future> + where + M: ServiceMessage, + { + let (messages, response_rx) = message.into_messages(); + let res = self.tx.send(messages); + async move { + res.map_err(|_| SendError)?; + response_rx.await.map_err(|_| SendError) + } + } +}