Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spooler): Implement shutdown behavior in the spooler #3980

Merged
merged 20 commits into from
Sep 13, 2024
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- Remove the `generate-schema` tool. Relay no longer exposes JSON schema for the event protocol. Consult the Rust type documentation of the `relay-event-schema` crate instead. ([#3974](https://github.com/getsentry/relay/pull/3974))
- Allow creation of `SqliteEnvelopeBuffer` from config, and load existing stacks from db on startup. ([#3967](https://github.com/getsentry/relay/pull/3967))
- Only tag `user.geo.subregion` on frontend and mobile projects. ([#4013](https://github.com/getsentry/relay/pull/4013), [#4023](https://github.com/getsentry/relay/pull/4023))
- Implement graceful shutdown mechanism in the `EnvelopeBuffer`. ([#3980](https://github.com/getsentry/relay/pull/3980))

## 24.8.0

Expand Down
35 changes: 30 additions & 5 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::cmp::Ordering;
use std::collections::BTreeSet;
use std::convert::Infallible;
use std::error::Error;
use std::mem;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::Arc;
Expand Down Expand Up @@ -55,9 +56,11 @@ impl PolymorphicEnvelopeBuffer {
memory_checker: MemoryChecker,
) -> Result<Self, EnvelopeBufferError> {
let buffer = if config.spool_envelopes_path().is_some() {
relay_log::trace!("PolymorphicEnvelopeBuffer: initializing sqlite envelope buffer");
let buffer = EnvelopeBuffer::<SqliteStackProvider>::new(config).await?;
Self::Sqlite(buffer)
} else {
relay_log::trace!("PolymorphicEnvelopeBuffer: initializing memory envelope buffer");
let buffer = EnvelopeBuffer::<MemoryStackProvider>::new(memory_checker);
Self::InMemory(buffer)
};
Expand Down Expand Up @@ -137,6 +140,20 @@ impl PolymorphicEnvelopeBuffer {
Self::InMemory(buffer) => buffer.has_capacity(),
}
}

/// Shuts down the [`PolymorphicEnvelopeBuffer`].
pub async fn shutdown(&mut self) -> bool {
// Currently, we want to flush the buffer only for disk, since the in memory implementation
// tries to not do anything and pop as many elements as possible within the shutdown
// timeout.
let Self::Sqlite(buffer) = self else {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided to branch out here and not call any flush method on the memory impl, since I didn't want to make the flushing empty in the provider or stack, since it made no sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent choice!

nit: I would prefer an explicit match on all variants here. If we ever add, let's say, a CloudStorageEnvelopeBuffer, the compiler would then force us to implement the new match arm.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's a good point. I initially implemented it that way, but felt it was too verbose. But your argument makes it a more valid approach! Will fix

relay_log::trace!("PolymorphicEnvelopeBuffer: shutdown procedure not needed");
return false;
};
buffer.flush().await;

true
}
}

/// Error that occurs while interacting with the envelope buffer.
Expand Down Expand Up @@ -374,6 +391,19 @@ where
});
}

/// Returns `true` if the underlying storage has the capacity to store more envelopes.
pub fn has_capacity(&self) -> bool {
self.stack_provider.has_store_capacity()
}

/// Flushes the envelope buffer.
pub async fn flush(&mut self) {
let priority_queue = mem::take(&mut self.priority_queue);
self.stack_provider
.flush(priority_queue.into_iter().map(|(q, _)| q.value))
.await;
}

/// Pushes a new [`EnvelopeStack`] with the given [`Envelope`] inserted.
async fn push_stack(
&mut self,
Expand Down Expand Up @@ -413,11 +443,6 @@ where
Ok(())
}

/// Returns `true` if the underlying storage has the capacity to store more envelopes.
pub fn has_capacity(&self) -> bool {
self.stack_provider.has_store_capacity()
}

/// Pops an [`EnvelopeStack`] with the supplied [`EnvelopeBufferError`].
fn pop_stack(&mut self, project_key_pair: ProjectKeyPair) {
for project_key in project_key_pair.iter() {
Expand Down
4 changes: 4 additions & 0 deletions relay-server/src/services/buffer/envelope_stack/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,8 @@ impl EnvelopeStack for MemoryEnvelopeStack {
async fn pop(&mut self) -> Result<Option<Box<Envelope>>, Self::Error> {
Ok(self.0.pop())
}

fn flush(self) -> Vec<Box<Envelope>> {
self.0
}
}
4 changes: 4 additions & 0 deletions relay-server/src/services/buffer/envelope_stack/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,8 @@ pub trait EnvelopeStack: Send + std::fmt::Debug {

/// Pops the [`Envelope`] on top of the stack.
fn pop(&mut self) -> impl Future<Output = Result<Option<Box<Envelope>>, Self::Error>>;

/// Persists all envelopes in the [`EnvelopeStack`]s to external storage, if possible,
/// and consume the [`StackProvider`].
fn flush(self) -> Vec<Box<Envelope>>;
}
32 changes: 32 additions & 0 deletions relay-server/src/services/buffer/envelope_stack/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ impl EnvelopeStack for SqliteEnvelopeStack {

Ok(result)
}

fn flush(self) -> Vec<Box<Envelope>> {
self.batches_buffer.into_iter().flatten().collect()
}
}

#[cfg(test)]
Expand Down Expand Up @@ -461,4 +465,32 @@ mod tests {
}
assert_eq!(stack.batches_buffer_size, 0);
}

#[tokio::test]
async fn test_drain() {
let db = setup_db(true).await;
let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100));
let mut stack = SqliteEnvelopeStack::new(
envelope_store.clone(),
5,
1,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
);

let envelopes = mock_envelopes(5);

// We push 5 envelopes and check that there is nothing on disk.
for envelope in envelopes.clone() {
assert!(stack.push(envelope).await.is_ok());
}
assert_eq!(stack.batches_buffer_size, 5);
assert_eq!(envelope_store.total_count().await.unwrap(), 0);

// We drain the stack and make sure nothing was spooled to disk.
let drained_envelopes = stack.flush();
assert_eq!(drained_envelopes.into_iter().collect::<Vec<_>>().len(), 5);
assert_eq!(envelope_store.total_count().await.unwrap(), 0);
}
}
71 changes: 57 additions & 14 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
//! Types for buffering envelopes.

use std::error::Error;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

use relay_base_schema::project::ProjectKey;
use relay_config::Config;
use relay_system::Request;
use relay_system::SendError;
use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service};
use relay_system::{Controller, Request, Shutdown};
use tokio::sync::watch;
use tokio::time::timeout;

use crate::envelope::Envelope;
use crate::services::buffer::envelope_buffer::Peek;
Expand Down Expand Up @@ -186,18 +188,18 @@ impl EnvelopeBufferService {
&mut self,
buffer: &mut PolymorphicEnvelopeBuffer,
) -> Result<(), EnvelopeBufferError> {
relay_log::trace!("EnvelopeBufferService peek");
relay_log::trace!("EnvelopeBufferService: peeking the buffer");
match buffer.peek().await? {
Peek::Empty => {
relay_log::trace!("EnvelopeBufferService empty");
relay_log::trace!("EnvelopeBufferService: peek returned empty");
relay_statsd::metric!(
counter(RelayCounters::BufferTryPop) += 1,
peek_result = "empty"
);
self.sleep = Duration::MAX; // wait for reset by `handle_message`.
}
Peek::Ready(_) => {
relay_log::trace!("EnvelopeBufferService pop");
relay_log::trace!("EnvelopeBufferService: popping envelope");
relay_statsd::metric!(
counter(RelayCounters::BufferTryPop) += 1,
peek_result = "ready"
Expand All @@ -212,7 +214,7 @@ impl EnvelopeBufferService {
self.sleep = Duration::ZERO; // try next pop immediately
}
Peek::NotReady(stack_key, envelope) => {
relay_log::trace!("EnvelopeBufferService request update");
relay_log::trace!("EnvelopeBufferService: project(s) of envelope not ready, requesting project update");
relay_statsd::metric!(
counter(RelayCounters::BufferTryPop) += 1,
peek_result = "not_ready"
Expand Down Expand Up @@ -246,23 +248,56 @@ impl EnvelopeBufferService {
// projects was already triggered (see XXX).
// For better separation of concerns, this prefetch should be triggered from here
// once buffer V1 has been removed.
relay_log::trace!("EnvelopeBufferService push");
relay_log::trace!("EnvelopeBufferService: received push message");
self.push(buffer, envelope).await;
}
EnvelopeBuffer::NotReady(project_key, envelope) => {
relay_log::trace!("EnvelopeBufferService project not ready");
relay_log::trace!(
"EnvelopeBufferService: received project not ready message for project key {}",
&project_key
);
buffer.mark_ready(&project_key, false);
relay_statsd::metric!(counter(RelayCounters::BufferEnvelopesReturned) += 1);
self.push(buffer, envelope).await;
}
EnvelopeBuffer::Ready(project_key) => {
relay_log::trace!("EnvelopeBufferService project ready {}", &project_key);
relay_log::trace!(
"EnvelopeBufferService: received project ready message for project key {}",
&project_key
);
buffer.mark_ready(&project_key, true);
}
};
self.sleep = Duration::ZERO;
}

async fn handle_shutdown(
&mut self,
buffer: &mut PolymorphicEnvelopeBuffer,
message: Shutdown,
) -> bool {
// We gracefully shut down only if the shutdown has a timeout.
if let Some(shutdown_timeout) = message.timeout {
relay_log::trace!("EnvelopeBufferService: shutting down gracefully");

let shutdown_result =
timeout(shutdown_timeout, async { buffer.shutdown().await }).await;
match shutdown_result {
Ok(shutdown_result) => {
return shutdown_result;
}
Err(error) => {
relay_log::error!(
error = &error as &dyn Error,
"the envelope buffer didn't shut down in time, some envelopes might be lost",
);
}
}
}

false
}

async fn push(&mut self, buffer: &mut PolymorphicEnvelopeBuffer, envelope: Box<Envelope>) {
if let Err(e) = buffer.push(envelope).await {
relay_log::error!(
Expand Down Expand Up @@ -300,15 +335,17 @@ impl Service for EnvelopeBufferService {
};
buffer.initialize().await;

relay_log::info!("EnvelopeBufferService start");
let mut shutdown = Controller::shutdown_handle();

relay_log::info!("EnvelopeBufferService: starting");
let mut iteration = 0;
loop {
iteration += 1;
relay_log::trace!("EnvelopeBufferService loop iteration {iteration}");
relay_log::trace!("EnvelopeBufferService: loop iteration {iteration}");

tokio::select! {
// NOTE: we do not select a bias here.
// On the one hand, we might want to prioritize dequeing over enqueing
// On the one hand, we might want to prioritize dequeuing over enqueuing
// so we do not exceed the buffer capacity by starving the dequeue.
// on the other hand, prioritizing old messages violates the LIFO design.
Ok(()) = self.ready_to_pop(&mut buffer) => {
Expand All @@ -322,18 +359,24 @@ impl Service for EnvelopeBufferService {
Some(message) = rx.recv() => {
self.handle_message(&mut buffer, message).await;
}
shutdown = shutdown.notified() => {
// In case the shutdown was handled, we break out of the loop signaling that
// there is no need to process anymore envelopes.
if self.handle_shutdown(&mut buffer, shutdown).await {
break;
}
}
_ = global_config_rx.changed() => {
relay_log::trace!("EnvelopeBufferService received global config");
relay_log::trace!("EnvelopeBufferService: received global config");
self.sleep = Duration::ZERO; // Try to pop
}

else => break,
}

self.update_observable_state(&mut buffer);
}

relay_log::info!("EnvelopeBufferService stop");
relay_log::info!("EnvelopeBufferService: stopping");
});
}
}
Expand Down
8 changes: 8 additions & 0 deletions relay-server/src/services/buffer/stack_provider/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::services::buffer::stack_provider::{
InitializationState, StackCreationType, StackProvider,
};
use crate::utils::MemoryChecker;
use crate::EnvelopeStack;

#[derive(Debug)]
pub struct MemoryStackProvider {
Expand Down Expand Up @@ -41,4 +42,11 @@ impl StackProvider for MemoryStackProvider {
fn stack_type<'a>(&self) -> &'a str {
"memory"
}

async fn flush(&mut self, envelope_stacks: impl IntoIterator<Item = Self::Stack>) {
for envelope_stack in envelope_stacks {
// The flushed envelopes will be immediately dropped.
let _ = envelope_stack.flush();
}
}
}
6 changes: 6 additions & 0 deletions relay-server/src/services/buffer/stack_provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,10 @@ pub trait StackProvider: std::fmt::Debug {

/// Returns the string representation of the stack type offered by this [`StackProvider`].
fn stack_type<'a>(&self) -> &'a str;

/// Flushes the supplied [`EnvelopeStack`]s and consumes the [`StackProvider`].
fn flush(
&mut self,
envelope_stacks: impl IntoIterator<Item = Self::Stack>,
) -> impl Future<Output = ()>;
}
Loading
Loading