Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(actix): Update Store Actor [INGEST-1565] #1415

Merged
merged 17 commits into from
Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions relay-server/src/actors/envelopes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ use crate::utils::{self, EnvelopeContext, FutureExt as _, Semaphore};

#[cfg(feature = "processing")]
use {
crate::actors::store::{StoreAddr, StoreEnvelope, StoreError, StoreForwarder},
crate::actors::store::{StoreEnvelope, StoreError, StoreForwarder},
futures::{FutureExt, TryFutureExt},
relay_system::Addr as ServiceAddr,
tokio::runtime::Runtime,
};

Expand Down Expand Up @@ -209,7 +210,7 @@ pub struct EnvelopeManager {
captures: BTreeMap<EventId, CapturedEnvelope>,
processor: Addr<EnvelopeProcessor>,
#[cfg(feature = "processing")]
store_forwarder: Option<StoreAddr<StoreEnvelope>>,
store_forwarder: Option<ServiceAddr<StoreForwarder>>,
#[cfg(feature = "processing")]
_runtime: Runtime,
}
Expand All @@ -222,11 +223,7 @@ impl EnvelopeManager {
) -> Result<Self, ServerError> {
// Enter the tokio runtime so we can start spawning tasks from the outside.
#[cfg(feature = "processing")]
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.unwrap();
let runtime = utils::tokio_runtime_with_actix();

#[cfg(feature = "processing")]
let _guard = runtime.enter();
Expand Down
100 changes: 9 additions & 91 deletions relay-server/src/actors/healthcheck.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
use std::error::Error;
use std::fmt;
use std::future::Future;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

Expand All @@ -15,102 +12,23 @@ use relay_system::{compat, Controller};

use crate::actors::upstream::{IsAuthenticated, IsNetworkOutage, UpstreamRelay};
use crate::statsd::RelayGauges;
use relay_system::{Addr, Service, ServiceMessage};

/// Singleton of the `Healthcheck` service.
static ADDRESS: RwLock<Option<Addr<Healthcheck>>> = RwLock::new(None);

/// Our definition of a service.
///
/// Services are much like actors: they receive messages from an inbox and handles them one
/// by one. Services are free to concurrently process these messages or not, most probably
/// should.
///
/// Messages always have a response which will be sent once the message is handled by the
/// service.
pub trait Service {
/// The envelope is what is sent to the inbox of this service.
///
/// It is an enum of all the message types that can be handled by this service together
/// with the response [sender](oneshot::Sender) for each message.
type Envelope: Send + 'static;
}

/// A message which can be sent to a service.
///
/// Messages have an associated `Response` type and can be unconditionally converted into
/// the envelope type of their [`Service`].
pub trait ServiceMessage<S: Service> {
type Response: Send + 'static;

fn into_envelope(self) -> (S::Envelope, oneshot::Receiver<Self::Response>);
}

/// An error when [sending](Addr::send) a message to a service fails.
#[derive(Clone, Copy, Debug)]
pub struct SendError;

impl fmt::Display for SendError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "failed to send message to service")
}
}

impl Error for SendError {}

/// The address for a [`Service`].
///
/// The address of a [`Service`] allows you to [send](Addr::send) messages to the service as
/// long as the service is running. It can be freely cloned.
#[derive(Debug)]
pub struct Addr<S: Service> {
tx: mpsc::UnboundedSender<S::Envelope>,
}

// Manually derive clone since we do not require `S: Clone` and the Clone derive adds this
// constraint.
impl<S: Service> Clone for Addr<S> {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
}
}
}

impl<S: Service> Addr<S> {
/// Sends an asynchronous message to the service and waits for the response.
///
/// The result of the message does not have to be awaited. The message will be delivered and
/// handled regardless. The communication channel with the service is unbounded, so backlogs
/// could occur when sending too many messages.
///
/// Sending the message can fail with `Err(SendError)` if the service has shut down.
// Note: this is written as returning `impl Future` instead of `async fn` in order not
// to capture the lifetime of `&self` in the returned future.
pub fn send<M>(&self, message: M) -> impl Future<Output = Result<M::Response, SendError>>
where
M: ServiceMessage<S>,
{
let (envelope, response_rx) = message.into_envelope();
let res = self.tx.send(envelope);
async move {
res.map_err(|_| SendError)?;
response_rx.await.map_err(|_| SendError)
}
}
}

#[derive(Debug)]
pub struct Healthcheck {
is_shutting_down: AtomicBool,
config: Arc<Config>,
}

impl Service for Healthcheck {
type Envelope = HealthcheckEnvelope;
type Messages = HealthcheckMessages;
}

impl Healthcheck {
/// Returns the [`Addr`] of the [`Healthcheck`] actor.
/// Returns the [`Addr`] of the [`Healthcheck`] service.
///
/// Prior to using this, the service must be started using [`Healthcheck::start`].
///
Expand Down Expand Up @@ -167,7 +85,7 @@ impl Healthcheck {

/// Start this service, returning an [`Addr`] for communication.
pub fn start(self) -> Addr<Self> {
let (tx, mut rx) = mpsc::unbounded_channel::<HealthcheckEnvelope>();
let (tx, mut rx) = mpsc::unbounded_channel::<HealthcheckMessages>();

let addr = Addr { tx };
*ADDRESS.write() = Some(addr.clone());
Expand All @@ -180,7 +98,7 @@ impl Healthcheck {

tokio::spawn(async move {
match message {
HealthcheckEnvelope::IsHealthy(msg, response_tx) => {
HealthcheckMessages::IsHealthy(msg, response_tx) => {
let response = service.handle_is_healthy(msg).await;
response_tx.send(response).ok()
}
Expand Down Expand Up @@ -216,14 +134,14 @@ pub enum IsHealthy {
impl ServiceMessage<Healthcheck> for IsHealthy {
type Response = bool;

fn into_envelope(self) -> (HealthcheckEnvelope, oneshot::Receiver<Self::Response>) {
fn into_messages(self) -> (HealthcheckMessages, oneshot::Receiver<Self::Response>) {
let (tx, rx) = oneshot::channel();
(HealthcheckEnvelope::IsHealthy(self, tx), rx)
(HealthcheckMessages::IsHealthy(self, tx), rx)
}
}

/// All the message types which can be sent to the [`Healthcheck`] actor.
/// All the message types which can be sent to the [`Healthcheck`] service.
#[derive(Debug)]
pub enum HealthcheckEnvelope {
pub enum HealthcheckMessages {
IsHealthy(IsHealthy, oneshot::Sender<bool>),
}
Loading