Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(actix): New Register [INGEST-1494] #1421

Merged
merged 19 commits into from
Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- Refactor tokio-based Addr from healthcheck to be generic. ([#1405](https://github.com/relay/pull/1405))
- Defer dropping of projects to a background thread to speed up project cache eviction. ([#1410](https://github.com/getsentry/relay/pull/1410))
- Update store service to use generic Addr and minor changes to generic Addr. ([#1415](https://github.com/getsentry/relay/pull/1415))
- Added new Register for the Services that is initialized later than the current. ([#1421](https://github.com/getsentry/relay/pull/1421))

**Features**:

Expand Down
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@ clap = "2.33.1"
failure = "0.1.8"
flate2 = "1.0.19"
fragile = { version = "1.2.1", features = ["slab"] } # used for vendoring sentry-actix
futures01 = { version = "0.1.28", package = "futures" }
futures = { version = "0.3", package = "futures", features = ["compat"] }
futures01 = { version = "0.1.28", package = "futures" }
hashbrown = "0.12.3"
itertools = "0.8.2"
json-forensics = { version = "*", git = "https://github.com/getsentry/rust-json-forensics" }
lazy_static = "1.4.0"
listenfd = "0.3.3"
minidump = { version = "0.10.0", optional = true }
native-tls = { version = "0.2.4", optional = true }
once_cell = "1.13.1"
parking_lot = "0.12.1"
rdkafka = { version = "0.24", optional = true }
rdkafka-sys = { version = "2.1.0", optional = true }
Expand Down
8 changes: 2 additions & 6 deletions relay-server/src/actors/healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use actix::SystemService;
use parking_lot::RwLock;
use tokio::sync::{mpsc, oneshot};

use relay_config::{Config, RelayMode};
Expand All @@ -11,12 +10,10 @@ use relay_statsd::metric;
use relay_system::{compat, Controller};

use crate::actors::upstream::{IsAuthenticated, IsNetworkOutage, UpstreamRelay};
use crate::service::REGISTRY;
use crate::statsd::RelayGauges;
use relay_system::{Addr, Service, ServiceMessage};

/// Singleton of the `Healthcheck` service.
static ADDRESS: RwLock<Option<Addr<Healthcheck>>> = RwLock::new(None);

#[derive(Debug)]
pub struct Healthcheck {
is_shutting_down: AtomicBool,
Expand All @@ -36,7 +33,7 @@ impl Healthcheck {
///
/// Panics if the service was not started using [`Healthcheck::start`] prior to this being used.
pub fn from_registry() -> Addr<Self> {
Copy link
Member

@jan-auer jan-auer Aug 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These static from_registry functions were a workaround to replace the trait method on actix::SystemService. Now that all our services are expected to be in the registry, we could move this to a mandatory Service::from_registry trait method which enforces that this is implemented for every service in the same way. Also, this means that we no longer have to copy-paste the doc comment.

Alternatively, we might not even need from_registry anymore. It's now as simple as:

Registry::get().healthcheck.send(_);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a ::from_registry() forces you to import the service type, this variation makes everyone import the Registry. The only argument I'm able to come up is that importing the service type makes it easier to discover in the code where the service is being used, so that would argue for keeping ::from_regsitry(). +1 to making it a trait method though.

ADDRESS.read().as_ref().unwrap().clone()
REGISTRY.get().unwrap().healthcheck.clone()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could make this method generic on Service somehow. In C++ you could do std::get<T> to access a tuple by type, in Rust I would not know how to do it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah we wondered this as well but could not figure it out without dynamic dispatch (TypeId) but that would slow this down. I'm not sure how to do this at compile time, but it so feels like this should be possible.

}

/// Creates a new instance of the Healthcheck service.
Expand Down Expand Up @@ -88,7 +85,6 @@ impl Healthcheck {
let (tx, mut rx) = mpsc::unbounded_channel::<HealthcheckMessages>();

let addr = Addr { tx };
*ADDRESS.write() = Some(addr.clone());

let service = Arc::new(self);
let main_service = service.clone();
Expand Down
13 changes: 12 additions & 1 deletion relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ use actix_web::{server, App};
use failure::ResultExt;
use failure::{Backtrace, Context, Fail};
use listenfd::ListenFd;
use once_cell::race::OnceBox;

use relay_aws_extension::AwsExtension;
use relay_config::Config;
use relay_metrics::Aggregator;
use relay_redis::RedisPool;
use relay_system::Addr;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please merge this input with the one below.

use relay_system::{Configure, Controller};

use crate::actors::envelopes::{BufferGuard, EnvelopeManager};
Expand All @@ -26,6 +28,8 @@ use crate::middlewares::{
};
use crate::{endpoints, utils};

pub static REGISTRY: OnceBox<Registry> = OnceBox::new();
Copy link
Member

@jan-auer jan-auer Aug 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make this non-public and hide the implementation detail that this uses a OnceBox. Especially, this is considered infallible, and the unwrap should be hidden behind an accessor method:

impl Registry {
    fn get() -> &Self {
        // some comment on why we think this is safe (once it is safe)
        REGISTRY.get().unwrap()
    }
}


/// Common error type for the relay server.
#[derive(Debug)]
pub struct ServerError {
Expand Down Expand Up @@ -105,6 +109,11 @@ impl From<Context<ServerErrorKind>> for ServerError {
}
}

#[derive(Clone, Debug)]
pub struct Registry {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add documentation to this type.

pub healthcheck: Addr<Healthcheck>,
}

/// Server state.
#[derive(Clone)]
pub struct ServiceState {
Expand Down Expand Up @@ -150,7 +159,7 @@ impl ServiceState {
let project_cache = Arbiter::start(|_| project_cache);
registry.set(project_cache.clone());

Healthcheck::new(config.clone()).start(); // TODO(tobias): Registry is implicit
let healthcheck = Healthcheck::new(config.clone()).start();
registry.set(RelayCache::new(config.clone()).start());

let aggregator = Aggregator::new(config.aggregator_config(), project_cache.recipient());
Expand All @@ -162,6 +171,8 @@ impl ServiceState {
}
}

REGISTRY.set(Box::new(Registry { healthcheck })).unwrap();

Ok(ServiceState {
buffer_guard: buffer,
config,
Expand Down