Skip to content

Commit 31f5a51

Browse files
committed
WIP first go at sending outcomes to upstream
1 parent aafc06f commit 31f5a51

File tree

3 files changed

+89
-25
lines changed

3 files changed

+89
-25
lines changed

relay-server/src/actors/outcome.rs

+86-8
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ use std::sync::Arc;
99
use std::time::Instant;
1010

1111
use actix::prelude::*;
12+
use actix_web::http::Method;
1213
use chrono::SecondsFormat;
14+
use futures::future::Future;
1315
use serde::{Deserialize, Serialize};
1416

1517
use relay_common::ProjectId;
@@ -18,6 +20,8 @@ use relay_filter::FilterStatKey;
1820
use relay_general::protocol::EventId;
1921
use relay_quotas::{ReasonCode, Scoping};
2022

23+
use crate::actors::upstream::SendQuery;
24+
use crate::actors::upstream::{UpstreamQuery, UpstreamRelay};
2125
use crate::ServerError;
2226

2327
// Choose the outcome module implementation (either the real one or the fake, no-op one).
@@ -27,6 +31,35 @@ pub use self::processing::*;
2731
// No-op outcome implementation
2832
#[cfg(not(feature = "processing"))]
2933
pub use self::non_processing::*;
34+
use std::borrow::Cow;
35+
36+
const MAX_OUTCOME_BATCH_SIZE: usize = 1000;
37+
const MAX_OUTCOME_BATCH_INTERVAL_MILLSEC: u64 = 500;
38+
39+
/// Defines the structure of the HTTP outcomes requests
40+
#[derive(Deserialize, Serialize, Debug, Default)]
41+
#[serde(default)]
42+
pub struct SendOutcomes {
43+
pub outcomes: Vec<TrackRawOutcome>,
44+
}
45+
46+
impl UpstreamQuery for SendOutcomes {
47+
type Response = SendOutcomes;
48+
49+
fn method(&self) -> Method {
50+
Method::POST
51+
}
52+
53+
fn path(&self) -> Cow<'static, str> {
54+
Cow::Borrowed("/api/0/relays/outcomes/")
55+
}
56+
}
57+
58+
/// Defines the structure of the HTTP outcomes responses for successful requests
59+
#[derive(Serialize, Debug)]
60+
pub struct SendOutcomesResponse {
61+
// nothing yet, future features will go here
62+
}
3063

3164
/// Tracks an outcome of an event.
3265
///
@@ -315,6 +348,8 @@ mod processing {
315348

316349
use crate::metrics::RelayCounters;
317350
use crate::service::ServerErrorKind;
351+
use crate::utils::run_later;
352+
use std::time::Duration;
318353

319354
type ThreadedProducer = rdkafka::producer::ThreadedProducer<DefaultProducerContext>;
320355

@@ -329,10 +364,16 @@ mod processing {
329364
pub struct OutcomeProducer {
330365
config: Arc<Config>,
331366
producer: Option<ThreadedProducer>,
367+
upstream: Addr<UpstreamRelay>,
368+
unsent_outcomes: Vec<TrackRawOutcome>,
369+
send_scheduled: bool,
332370
}
333371

334372
impl OutcomeProducer {
335-
pub fn create(config: Arc<Config>) -> Result<Self, ServerError> {
373+
pub fn create(
374+
config: Arc<Config>,
375+
upstream: Addr<UpstreamRelay>,
376+
) -> Result<Self, ServerError> {
336377
let future_producer = if config.processing_enabled() {
337378
let mut client_config = ClientConfig::new();
338379
for config_p in config.kafka_config() {
@@ -349,10 +390,13 @@ mod processing {
349390
Ok(Self {
350391
config,
351392
producer: future_producer,
393+
upstream,
394+
unsent_outcomes: Vec::new(),
395+
send_scheduled: false,
352396
})
353397
}
354398

355-
fn send_kafka_message(&mut self, message: TrackRawOutcome) -> Result<(), OutcomeError> {
399+
fn send_kafka_message(&self, message: TrackRawOutcome) -> Result<(), OutcomeError> {
356400
log::trace!("Tracking outcome: {:?}", message);
357401

358402
let producer = match self.producer {
@@ -385,8 +429,38 @@ mod processing {
385429
}
386430
}
387431

388-
fn send_http_message(&mut self, _message: TrackRawOutcome) -> Result<(), OutcomeError> {
389-
unimplemented!()
432+
fn send_batch(&mut self, context: &mut Context<Self>) {
433+
let request = SendOutcomes {
434+
outcomes: self.unsent_outcomes.drain(..).collect(),
435+
};
436+
self.send_scheduled = false;
437+
438+
self.upstream
439+
.send(SendQuery(request))
440+
.map(|_| ())
441+
.map_err(|_| ())
442+
.into_actor(self)
443+
.spawn(context);
444+
}
445+
446+
fn send_http_message(
447+
&mut self,
448+
message: TrackRawOutcome,
449+
context: &mut Context<Self>,
450+
) -> Result<(), OutcomeError> {
451+
self.unsent_outcomes.push(message);
452+
if self.unsent_outcomes.len() >= MAX_OUTCOME_BATCH_SIZE {
453+
self.send_batch(context)
454+
} else if !self.send_scheduled {
455+
self.send_scheduled = true;
456+
run_later(
457+
Duration::from_millis(MAX_OUTCOME_BATCH_INTERVAL_MILLSEC),
458+
Self::send_batch,
459+
)
460+
.spawn(context)
461+
}
462+
463+
Ok(())
390464
}
391465
}
392466

@@ -417,11 +491,11 @@ mod processing {
417491

418492
impl Handler<TrackRawOutcome> for OutcomeProducer {
419493
type Result = Result<(), OutcomeError>;
420-
fn handle(&mut self, message: TrackRawOutcome, _ctx: &mut Self::Context) -> Self::Result {
494+
fn handle(&mut self, message: TrackRawOutcome, ctx: &mut Self::Context) -> Self::Result {
421495
if self.config.processing_enabled() {
422496
self.send_kafka_message(message)
423497
} else if self.config.emit_outcomes() {
424-
self.send_http_message(message)
498+
self.send_http_message(message, ctx)
425499
} else {
426500
Ok(()) // processing not enabled and emit_outcomes disabled
427501
}
@@ -440,11 +514,15 @@ mod non_processing {
440514

441515
pub struct OutcomeProducer {
442516
config: Arc<Config>,
517+
upstream: Addr<UpstreamRelay>,
443518
}
444519

445520
impl OutcomeProducer {
446-
pub fn create(config: Arc<Config>) -> Result<Self, ServerError> {
447-
Ok(Self { config })
521+
pub fn create(
522+
config: Arc<Config>,
523+
upstream: Addr<UpstreamRelay>,
524+
) -> Result<Self, ServerError> {
525+
Ok(Self { config, upstream })
448526
}
449527
}
450528

relay-server/src/endpoints/outcomes.rs

+2-16
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,17 @@
11
use actix_web::HttpResponse;
2-
use serde::{Deserialize, Serialize};
32

4-
use crate::actors::outcome::TrackRawOutcome;
3+
use crate::actors::outcome::{SendOutcomes, SendOutcomesResponse};
54
use crate::extractors::{CurrentServiceState, SignedJson};
65
use crate::service::ServiceApp;
76

8-
/// Defines the structure of the HTTP outcomes requests
9-
#[derive(Deserialize, Debug, Default)]
10-
#[serde(default)]
11-
struct SendOutcomes {
12-
pub outcomes: Vec<TrackRawOutcome>,
13-
}
14-
15-
/// Defines the structure of the HTTP outcomes responses for successful requests
16-
#[derive(Serialize, Debug)]
17-
struct OutcomesResponse {
18-
// nothing yet, future features will go here
19-
}
20-
217
fn send_outcomes(state: CurrentServiceState, body: SignedJson<SendOutcomes>) -> HttpResponse {
228
if !body.relay.internal || !state.config().emit_outcomes() {
239
return HttpResponse::Forbidden().finish();
2410
}
2511
for outcome in body.inner.outcomes {
2612
state.outcome_producer().do_send(outcome);
2713
}
28-
HttpResponse::Accepted().json(OutcomesResponse {})
14+
HttpResponse::Accepted().json(SendOutcomesResponse {})
2915
}
3016

3117
pub fn configure_app(app: ServiceApp) -> ServiceApp {

relay-server/src/service.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ impl ServiceState {
121121
pub fn start(config: Arc<Config>) -> Result<Self, ServerError> {
122122
let upstream_relay = Arbiter::start(clone!(config, |_| UpstreamRelay::new(config)));
123123

124-
let outcome_producer = OutcomeProducer::create(config.clone())?;
124+
let outcome_producer = OutcomeProducer::create(config.clone(), upstream_relay.clone())?;
125125
let outcome_producer = Arbiter::start(move |_| outcome_producer);
126126

127127
let redis_pool = match config.redis() {

0 commit comments

Comments
 (0)