-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(server): Implement an HTTP outcomes producer #592
Conversation
8c75fb2
to
31f5a51
Compare
d0d7835
to
1151088
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, batching and HTTP implementation looks fine.
Up for discussion: There is a tiny bit of "duplication" going on since both implementations are repeating the same internal fields, and thus they need to share their implementation. Would it be possible to rather compose the actors or at least the structs?
Here is some shortened code to illustrate that using an Addr
:
mod http {
pub struct HttpOutcomeProducer {
// http stuff
}
impl Actor for HttpOutcomeProducer { ... }
impl Handler<TrackRawOutcome> for HttpOutcomeProducer { ... }
}
#[cfg(feature = "processing")]
mod processing {
pub struct KafkaOutcomeProducer {
http: Addr<HttpOutcomeProducer>,
// kafka stuff
}
impl Actor for KafkaOutcomeProducer { ... }
impl Handler<TrackRawOutcome> for KafkaOutcomeProducer {
fn handle() {
if self.config.processing_enabled() {
/* kafka stuff */
} else {
self.http.do_send(message)
}
}
}
}
#[cfg(feature = "processing")]
pub type OutcomeProducer = kafka::KafkaOutcomeProducer;
#[cfg(not(feature = "processing"))]
pub type OutcomeProducer = http::HttpOutcomeProducer;
The other option would be to hold HttpOutcomeProducer
directly, and call a http method on it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with Jan's comment about nesting HTTP outcome producer into the other one.
relay-server/src/actors/outcome.rs
Outdated
self.unsent_outcomes.push(message); | ||
if self.unsent_outcomes.len() >= self.config.max_outcome_batch_size() { | ||
if let Some(send_outcomes_future) = self.send_outcomes_future { | ||
context.cancel_future(send_outcomes_future); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not 100% sure if this may cancel a HTTP request mid-flight and run the risk of double-sending outcomes. Even if this is an actor future, such a future only has exclusive access to self
during a poll, not during the lifetime of the future.
Also doesn't this generally cause us to drop a preceding batch of outcomes if we run into the batch size limit again? Seems like that would happen instantly under sustained traffic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SpawnHandle
is removed from the struct in line 326
the moment that send_batch
runs before starting an HTTP request. self.send_outcomes_future
is only there to synchronize between the batch limit and the batch delay. Once the request starts, it will send whatever is queued, which is guaranteed to be lte the batch size limit.
I think for those reasons, neither of the two cases apply. Can you double-check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, so self.send_outcomes_future
is only being Some
if we're counting down from the timeout, not when the request is in flight. That makes sense and you're right. Can we somehow make it explicit that nothing that may suspend the future (such as more IO) may be added before line 326?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Additionally we can also rename the field to something like pending_flush
to make this more clear?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, changed send_outcomes_future
to pending_flush_handle
.
Didn't quite get what you are asking at:
Can we somehow make it explicit that nothing that may suspend the future (such as more IO) may be added before line 326?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think he meant to write into the comment that nothing should go before the check of pending_flush_handle
in send_batch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually wanted to request changes because I believe one of the points I made may be a critical bug
finish integration tests
b60de4d
to
c5885c4
Compare
995e817
to
bc6cc3b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
G2G once the linter is fixed, thanks a lot!
Relay installations that use the full (all features) binary may be configured to operate with processing disabled.
When operating with processing disabled they should forward the outcomes to the http endpoint of the upstream instead
of placing them on the outcomes kafka topic.