diff --git a/src/client/legacy/client.rs b/src/client/legacy/client.rs index d3d56057..a560821d 100644 --- a/src/client/legacy/client.rs +++ b/src/client/legacy/client.rs @@ -834,6 +834,10 @@ where fn can_share(&self) -> bool { self.is_http2() } + + fn mark_as_reused(&mut self) { + self.conn_info.reuse(); + } } enum ClientConnectError { diff --git a/src/client/legacy/connect/mod.rs b/src/client/legacy/connect/mod.rs index 3309dbbb..f96ea773 100644 --- a/src/client/legacy/connect/mod.rs +++ b/src/client/legacy/connect/mod.rs @@ -101,6 +101,7 @@ pub struct Connected { pub(super) is_proxied: bool, pub(super) extra: Option, pub(super) poisoned: PoisonPill, + pub(super) is_reused: bool, } #[derive(Clone)] @@ -151,6 +152,7 @@ impl Connected { is_proxied: false, extra: None, poisoned: PoisonPill::healthy(), + is_reused: false, } } @@ -220,6 +222,16 @@ impl Connected { ); } + /// Determines if the connection is reused. + pub fn is_reused(&self) -> bool { + self.is_reused + } + + /// Mark the connection as reused. + pub fn reuse(&mut self) { + self.is_reused = true; + } + // Don't public expose that `Connected` is `Clone`, unsure if we want to // keep that contract... pub(super) fn clone(&self) -> Connected { @@ -228,6 +240,7 @@ impl Connected { is_proxied: self.is_proxied, extra: self.extra.clone(), poisoned: self.poisoned.clone(), + is_reused: self.is_reused, } } } diff --git a/src/client/legacy/pool.rs b/src/client/legacy/pool.rs index c57b7ff9..ba007f7c 100644 --- a/src/client/legacy/pool.rs +++ b/src/client/legacy/pool.rs @@ -41,6 +41,8 @@ pub trait Poolable: Unpin + Send + Sized + 'static { /// Allows for HTTP/2 to return a shared reservation. fn reserve(self) -> Reservation; fn can_share(&self) -> bool; + /// Mark the connection as reused. + fn mark_as_reused(&mut self); } pub trait Key: Eq + Hash + Clone + Debug + Unpin + Send + 'static {} @@ -266,7 +268,7 @@ impl Pool { } } - fn reuse(&self, key: &K, value: T) -> Pooled { + fn reuse(&self, key: &K, mut value: T) -> Pooled { debug!("reuse idle connection for {:?}", key); // TODO: unhack this // In Pool::pooled(), which is used for inserting brand new connections, @@ -283,6 +285,8 @@ impl Pool { } } + value.mark_as_reused(); + Pooled { is_reused: true, key: key.clone(), @@ -864,6 +868,8 @@ mod tests { fn can_share(&self) -> bool { false } + + fn mark_as_reused(&mut self) {} } fn c(key: K) -> Connecting { @@ -1074,6 +1080,8 @@ mod tests { fn can_share(&self) -> bool { false } + + fn mark_as_reused(&mut self) {} } #[test] @@ -1090,4 +1098,71 @@ mod tests { assert!(!pool.locked().idle.contains_key(&key)); } + + #[derive(Debug, PartialEq, Eq, Clone)] + struct CanMarkReused { + #[allow(unused)] + val: i32, + is_reused: bool, + } + + impl Poolable for CanMarkReused { + fn is_open(&self) -> bool { + true + } + + fn reserve(self) -> Reservation { + Reservation::Unique(self) + } + + fn can_share(&self) -> bool { + false + } + + fn mark_as_reused(&mut self) { + self.is_reused = true; + } + } + + #[tokio::test] + async fn mark_conn_as_reused() { + let pool = pool_no_timer(); + let key = host_key("foo"); + pool.pooled( + c(key.clone()), + CanMarkReused { + val: 42, + is_reused: false, + }, + ); + + let idle = pool + .locked() + .idle + .get(&key) + .expect("the just pooled connection should be idle") + .split_first() + .expect("should get the pooled connection") + .0 + .value + .clone(); + assert_eq!( + idle, + CanMarkReused { + val: 42, + is_reused: false + } + ); + + match pool.checkout(key).await { + Ok(pooled) => assert_eq!( + *pooled, + CanMarkReused { + val: 42, + is_reused: true + } + ), + Err(_) => panic!("not ready"), + }; + } } diff --git a/tests/legacy_client.rs b/tests/legacy_client.rs index 0f11d773..70ac2707 100644 --- a/tests/legacy_client.rs +++ b/tests/legacy_client.rs @@ -978,3 +978,73 @@ fn connection_poisoning() { assert_eq!(num_conns.load(Ordering::SeqCst), 2); assert_eq!(num_requests.load(Ordering::SeqCst), 5); } + +#[cfg(not(miri))] +#[test] +fn get_conn_reuse_info_via_connected() { + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + let rt = runtime(); + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))) + .unwrap(); + + let mut buf = [0; 4096]; + let mut n = sock.read(&mut buf).expect("read 1"); + assert_ne!(n, 0); + let mut http_headline = "GET /first HTTP/1.1\r\n"; + assert_eq!(s(&buf[..http_headline.len()]), http_headline); + + let resp = b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"; + sock.write_all(resp).expect("write 1"); + let _ = tx1.send(()); + + n = sock.read(&mut buf).expect("read 2"); + assert_ne!(n, 0); + http_headline = "GET /second HTTP/1.1\r\n"; + assert_eq!(s(&buf[..http_headline.len()]), http_headline); + + sock.write_all(resp).unwrap(); + let _ = tx2.send(()); + + // The stream is dropped and server exits... + }); + + let connector = DebugConnector::new(); + let connects = connector.connects.clone(); + let client = Client::builder(TokioExecutor::new()).build(connector); + let mut req = Request::builder() + .uri(format!("http://{addr}/first")) + .body(Empty::::new()) + .unwrap(); + let captured = capture_connection(&mut req); + + let res = client.request(req); + rt.block_on(future::join(res, rx1).map(|r| r.0)).unwrap(); + + let connected = captured.connection_metadata(); + // First request should establish a new connection. + assert_eq!(connects.load(Ordering::SeqCst), 1); + assert!(!connected.as_ref().is_some_and(|c| c.is_reused())); + + // sleep real quick to let the threadpool put connection in ready + // state and back into client pool + thread::sleep(Duration::from_millis(50)); + + let mut req = Request::builder() + .uri(format!("http://{addr}/second")) + .body(Empty::::new()) + .unwrap(); + let captured = capture_connection(&mut req); + let res = client.request(req); + rt.block_on(future::join(res, rx2).map(|r| r.0)).unwrap(); + + let connected = captured.connection_metadata(); + // Second request should reuse the connection. + assert_eq!(connects.load(Ordering::SeqCst), 1); + assert!(connected.as_ref().is_some_and(|c| c.is_reused())); +}