From 9946628667169f56c7e8a604313ed9fb1271fd94 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 5 Jan 2023 19:31:10 +1100 Subject: [PATCH 1/2] Re-order for consistent libp2p protocols and v50 upgrade --- .../lighthouse_network/src/rpc/handler.rs | 110 +++++++++--------- 1 file changed, 55 insertions(+), 55 deletions(-) diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index 9d6229eb382..0f7f5cda813 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -327,61 +327,6 @@ where self.listen_protocol.clone() } - fn inject_fully_negotiated_inbound( - &mut self, - substream: >::Output, - _info: Self::InboundOpenInfo, - ) { - // only accept new peer requests when active - if !matches!(self.state, HandlerState::Active) { - return; - } - - let (req, substream) = substream; - let expected_responses = req.expected_responses(); - - // store requests that expect responses - if expected_responses > 0 { - if self.inbound_substreams.len() < MAX_INBOUND_SUBSTREAMS { - // Store the stream and tag the output. - let delay_key = self.inbound_substreams_delay.insert( - self.current_inbound_substream_id, - Duration::from_secs(RESPONSE_TIMEOUT), - ); - let awaiting_stream = InboundState::Idle(substream); - self.inbound_substreams.insert( - self.current_inbound_substream_id, - InboundInfo { - state: awaiting_stream, - pending_items: VecDeque::with_capacity(expected_responses as usize), - delay_key: Some(delay_key), - protocol: req.protocol(), - request_start_time: Instant::now(), - remaining_chunks: expected_responses, - }, - ); - } else { - self.events_out.push(Err(HandlerErr::Inbound { - id: self.current_inbound_substream_id, - proto: req.protocol(), - error: RPCError::HandlerRejected, - })); - return self.shutdown(None); - } - } - - // If we received a goodbye, shutdown the connection. - if let InboundRequest::Goodbye(_) = req { - self.shutdown(None); - } - - self.events_out.push(Ok(RPCReceived::Request( - self.current_inbound_substream_id, - req, - ))); - self.current_inbound_substream_id.0 += 1; - } - fn inject_fully_negotiated_outbound( &mut self, out: >::Output, @@ -438,6 +383,61 @@ where } } + fn inject_fully_negotiated_inbound( + &mut self, + substream: >::Output, + _info: Self::InboundOpenInfo, + ) { + // only accept new peer requests when active + if !matches!(self.state, HandlerState::Active) { + return; + } + + let (req, substream) = substream; + let expected_responses = req.expected_responses(); + + // store requests that expect responses + if expected_responses > 0 { + if self.inbound_substreams.len() < MAX_INBOUND_SUBSTREAMS { + // Store the stream and tag the output. + let delay_key = self.inbound_substreams_delay.insert( + self.current_inbound_substream_id, + Duration::from_secs(RESPONSE_TIMEOUT), + ); + let awaiting_stream = InboundState::Idle(substream); + self.inbound_substreams.insert( + self.current_inbound_substream_id, + InboundInfo { + state: awaiting_stream, + pending_items: VecDeque::with_capacity(std::cmp::min(expected_responses,128) as usize), + delay_key: Some(delay_key), + protocol: req.protocol(), + request_start_time: Instant::now(), + remaining_chunks: expected_responses, + }, + ); + } else { + self.events_out.push(Err(HandlerErr::Inbound { + id: self.current_inbound_substream_id, + proto: req.protocol(), + error: RPCError::HandlerRejected, + })); + return self.shutdown(None); + } + } + + // If we received a goodbye, shutdown the connection. + if let InboundRequest::Goodbye(_) = req { + self.shutdown(None); + } + + self.events_out.push(Ok(RPCReceived::Request( + self.current_inbound_substream_id, + req, + ))); + self.current_inbound_substream_id.0 += 1; + } + fn inject_event(&mut self, rpc_event: Self::InEvent) { match rpc_event { RPCSend::Request(id, req) => self.send_request(id, req), From d1cc8b4835ba01f1b99eefe6b30ebc7ce67303db Mon Sep 17 00:00:00 2001 From: Age Manning Date: Thu, 5 Jan 2023 19:37:14 +1100 Subject: [PATCH 2/2] Obligatory fmt --- beacon_node/lighthouse_network/src/rpc/handler.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/beacon_node/lighthouse_network/src/rpc/handler.rs b/beacon_node/lighthouse_network/src/rpc/handler.rs index 0f7f5cda813..a1743c15fb6 100644 --- a/beacon_node/lighthouse_network/src/rpc/handler.rs +++ b/beacon_node/lighthouse_network/src/rpc/handler.rs @@ -409,7 +409,10 @@ where self.current_inbound_substream_id, InboundInfo { state: awaiting_stream, - pending_items: VecDeque::with_capacity(std::cmp::min(expected_responses,128) as usize), + pending_items: VecDeque::with_capacity(std::cmp::min( + expected_responses, + 128, + ) as usize), delay_key: Some(delay_key), protocol: req.protocol(), request_start_time: Instant::now(),