Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(gossipsub): switch internal async-channel, #570

Open
wants to merge 1 commit into
base: sigp-gossipsub
Choose a base branch
from

Conversation

jxs
Copy link
Member

@jxs jxs commented Mar 3, 2025

Description

This started with an attempt to solve libp2p#5751 using the previous internal async-channel.
After multiple ideas were discussed off band, replacing the async-channel with an internal more tailored priority queue seemed inevitable.
This priority queue allows us to implement the cancellation of in flight IDONTWANT's very cleanly with the retain_mut function.
Clearing the stale messages likwise becomes simpler as we also make use of retain_mut
And this has the added advantage of being able to only have a single priority queue and making the code simpler.
If a peer is not making progress we can assume it's not delivering High priority messages and we can penalize it.

Notes & open questions

I haven't performance tested this, but plan to do so with lighthouse if you agree this should be the path forward.
I am curious if iterating all the messages to remove the IDONTWANT'ed and stall ones affects the overall performance.
Will also add tests to the queue once the design is finished.

Change checklist

  • I have performed a self-review of my own code
  • I have made corresponding changes to the documentation
  • I have added tests that prove my fix is effective or that my feature works
  • A changelog entry has been made in the appropriate crates

Copy link
Member

@AgeManning AgeManning left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I like the new queue being able to modify it from the behaviour side.

However I think there is a drawback to this approach and also I didn't understand how this queue can remove the priority/non-priority logic.

In the current version, we have a priority and a non-priority queue. The priority is queue is reserved for messages that simply cannot fail, and timing is not important. For example, GRAFT/PRUNE/SUBSCRIBE/UNSUBSCRIBE. It's fine if these messages go out late, but its not okay if we just have some internal error and we never send them.

For example, if we have PRUNED someone from our mesh, but never tell them, the bi-directionality of the mesh is broken and peers can now never know if we are in other's peoples mesh's and a lot of the principles of the network break down.

If I've understood this PR, we are now grouping these messages into the same queue as normal publish/forward messages and this queue is bounded. We can now drop these priority messages if for example the user is sending lots of messages. This wasn't possible before and I think this is a big problem. I think we still need the priority queue, which is unbounded and cannot fail, so that these very important messages always get sent, albiet they could be sent late.

The second drawback to this approach is that I dont think we can actually stop true in-flight messages. We can remove messages that are being sent from the behaviour and awaiting for the handler to send out, but for large messages that we have started sending, we can't cancel them in the behaviour. I don't think this is a big issue tho, maybe its the queue that is the concern and not the actual sending of the messages.
When we were discussing this problem, I was imagining the handler when calling:

Some(OutboundSubstreamState::PendingFlush(mut substream)) => {

If that message has been canceled, that we close the substream and stop sending the in-flight message. However, now that I think about it, closing the substream would constitute an error I think, so perhaps there is no actual way of stopping partially sent messages with the current gossipsub spec.

@@ -2046,9 +2050,8 @@ where
// before we add all the gossip from this heartbeat in order to gain a true measure of
// steady-state size of the queues.
if let Some(m) = &mut self.metrics {
for sender_queue in self.connected_peers.values().map(|v| &v.sender) {
m.observe_priority_queue_size(sender_queue.priority_queue_len());
m.observe_non_priority_queue_size(sender_queue.non_priority_queue_len());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR removes the priority/non-priority logic.

@@ -2858,7 +2862,7 @@ where
| RpcOut::Prune(_)
| RpcOut::Subscribe(_)
| RpcOut::Unsubscribe(_) => {
unreachable!("Channel for highpriority control messages is unbounded and should always be open.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer the case, because we've merged the priority and non-priority logic.

/// Try to add an item to the Queue, return Err if the queue is full.
pub(crate) fn try_push(&mut self, item: T) -> Result<(), T> {
let mut shared = self.shared.lock().unwrap();
if self.capacity == shared.queue.len() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if self.capacity == shared.queue.len() {
if shared.queue.len() >= self.capacity {

I'm usually always over-cautious with these, just a nit


/// Try to add an item to the Queue, return Err if the queue is full.
pub(crate) fn try_push(&mut self, item: T) -> Result<(), T> {
let mut shared = self.shared.lock().unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe an expect() here?

/// In other words, remove all elements e for which f(&e) returns false. The elements are visited in unsorted (and unspecified) order.
/// Returns the cleared items.
pub(crate) fn retain_mut<F: FnMut(&mut T) -> bool>(&mut self, mut f: F) -> Vec<T> {
let mut shared = self.shared.lock().unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expect here also?


impl<T> Drop for Queue<T> {
fn drop(&mut self) {
let mut shared = self.shared.lock().unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

expect?

topics: Default::default(),
dont_send: LinkedHashMap::new(),
messages: Queue::new(self.config.connection_handler_queue_len()),
});
// Add the new connection
connected_peer.connections.push(connection_id);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// This clones a reference to the Queue so any new handlers reference the same underlying queue. No data is actually cloned here.

peer.messages.retain_mut(|rpc| match rpc {
RpcOut::Publish { message_id, .. }
| RpcOut::Forward { message_id, .. } => {
message_ids.contains(message_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs the negation

Suggested change
message_ids.contains(message_id)
!message_ids.contains(message_id)

if let Poll::Ready(Some(mut message)) = self.send_queue.poll_next_unpin(cx) {
if let Poll::Ready(mut message) = Pin::new(&mut self.message_queue).poll_pop(cx)
{
// if let Poll::Ready(Some(mut message)) = self.send_queue.poll_next_unpin(cx) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// if let Poll::Ready(Some(mut message)) = self.send_queue.poll_next_unpin(cx) {

@AgeManning
Copy link
Member

I went back to look at this and realize the O(1) complexity in the binary heap for push(), which is really nice. It does the prioritization for us, negating the need for a second queue. 😍

The only thing I think we might need to modify is to allow priority messages to ignore the queue's capacity. We shouldn't be generating these messages in volumes that would cause significant memory concerns. If we wanted to cap the queue if this is a concern, we should drop the peer at some limit.

i.e We are never in a state where we are connected to a peer and threw away a priority message. If we are worried about memory, we should at worst case kick/drop/ban the peer before we throw away a priority message.

If we go this route, we should be able to bring back the unreachable statement regarding a failed priority message.

Copy link

@elenaf9 elenaf9 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the approach of filtering the IDONTWANT-ed messages from the queue directly!

But I am wondering if we really need a priority queue if only two priority levels are used.
What's the advantage of it, compared to having two separate FIFO queues for prio- and non-priority messages? The retain logic could still be implemented for them, but the push/pop operations would be faster, and we could directly use VecDequeue::retain_mut.

Comment on lines +414 to +423
// Remove stale messages from the queue.
let stale = self.message_queue.retain_mut(|rpc| match rpc {
RpcOut::Publish {
ref mut timeout, ..
}
| RpcOut::Forward {
ref mut timeout, ..
} => !timeout.poll_unpin(cx).is_ready(),
_ => true,
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that this is existing logic, but do we really need to remove stale messages from the queue here?
It results the the whole internal binary heap of the queue being rebuild every single time the handler is polled. Isn't it enough that the handler simply checks the timeout before sending out stale messages?

I know it shrinks the queue length, but I wonder how often in the real world a message actually times out, and whether in that case there is a larger issue at hand anyway. Do you have any data on that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point.

In terms of data, we often see non-priority messages being timed out on slow machines, or if bandwidth isn't adequate. But to your point, we can probably drop them when we pop them, without any harm.

Comment on lines +31 to +39
/// An async priority queue used to dispatch messages from the `NetworkBehaviour`
/// to the `ConnectionHandler`. Inspired by loole and flume.
#[derive(Debug)]
pub(crate) struct Queue<T> {
shared: Arc<Mutex<Shared<T>>>,
capacity: usize,
id: usize,
count: Arc<AtomicUsize>,
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: splitting the Queue API into a Sender and Receiver side would make it a bit clearer that the behavior is always the sender and the handler always the receiver.

Comment on lines +374 to +380
Self::Publish {
message: l_message, ..
},
Self::Publish {
message: r_message, ..
},
) => l_message == r_message,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just compare the message ids?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants