Skip to content

Commit 23301a3

Browse files
committed
FuturesUnordered: Fix clear implementation
1 parent 02a4280 commit 23301a3

File tree

3 files changed

+43
-40
lines changed

3 files changed

+43
-40
lines changed

futures-util/src/stream/futures_unordered/mod.rs

+6-15
Original file line numberDiff line numberDiff line change
@@ -558,20 +558,7 @@ impl<Fut> Debug for FuturesUnordered<Fut> {
558558
impl<Fut> FuturesUnordered<Fut> {
559559
/// Clears the set, removing all futures.
560560
pub fn clear(&mut self) {
561-
self.clear_head_all();
562-
563-
// we just cleared all the tasks, and we have &mut self, so this is safe.
564-
unsafe { self.ready_to_run_queue.clear() };
565-
566-
self.is_terminated.store(false, Relaxed);
567-
}
568-
569-
fn clear_head_all(&mut self) {
570-
while !self.head_all.get_mut().is_null() {
571-
let head = *self.head_all.get_mut();
572-
let task = unsafe { self.unlink(head) };
573-
self.release_task(task);
574-
}
561+
*self = Self::new();
575562
}
576563
}
577564

@@ -581,7 +568,11 @@ impl<Fut> Drop for FuturesUnordered<Fut> {
581568
// associated with it. At the same time though there may be tons of
582569
// wakers flying around which contain `Task<Fut>` references
583570
// inside them. We'll let those naturally get deallocated.
584-
self.clear_head_all();
571+
while !self.head_all.get_mut().is_null() {
572+
let head = *self.head_all.get_mut();
573+
let task = unsafe { self.unlink(head) };
574+
self.release_task(task);
575+
}
585576

586577
// Note that at this point we could still have a bunch of tasks in the
587578
// ready to run queue. None of those tasks, however, have futures

futures-util/src/stream/futures_unordered/ready_to_run_queue.rs

+12-25
Original file line numberDiff line numberDiff line change
@@ -85,38 +85,25 @@ impl<Fut> ReadyToRunQueue<Fut> {
8585
pub(super) fn stub(&self) -> *const Task<Fut> {
8686
Arc::as_ptr(&self.stub)
8787
}
88-
89-
// Clear the queue of tasks.
90-
//
91-
// Note that each task has a strong reference count associated with it
92-
// which is owned by the ready to run queue. This method just pulls out
93-
// tasks and drops their refcounts.
94-
//
95-
// # Safety
96-
//
97-
// - All tasks **must** have had their futures dropped already (by FuturesUnordered::clear)
98-
// - The caller **must** guarantee unique access to `self`
99-
pub(crate) unsafe fn clear(&self) {
100-
loop {
101-
// SAFETY: We have the guarantee of mutual exclusion required by `dequeue`.
102-
match self.dequeue() {
103-
Dequeue::Empty => break,
104-
Dequeue::Inconsistent => abort("inconsistent in drop"),
105-
Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)),
106-
}
107-
}
108-
}
10988
}
11089

11190
impl<Fut> Drop for ReadyToRunQueue<Fut> {
11291
fn drop(&mut self) {
11392
// Once we're in the destructor for `Inner<Fut>` we need to clear out
11493
// the ready to run queue of tasks if there's anything left in there.
115-
116-
// All tasks have had their futures dropped already by the `FuturesUnordered`
117-
// destructor above, and we have &mut self, so this is safe.
94+
//
95+
// Note that each task has a strong reference count associated with it
96+
// which is owned by the ready to run queue. All tasks should have had
97+
// their futures dropped already by the `FuturesUnordered` destructor
98+
// above, so we're just pulling out tasks and dropping their refcounts.
11899
unsafe {
119-
self.clear();
100+
loop {
101+
match self.dequeue() {
102+
Dequeue::Empty => break,
103+
Dequeue::Inconsistent => abort("inconsistent in drop"),
104+
Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)),
105+
}
106+
}
120107
}
121108
}
122109
}

futures/tests/stream_futures_unordered.rs

+25
Original file line numberDiff line numberDiff line change
@@ -381,3 +381,28 @@ fn clear() {
381381
tasks.clear();
382382
assert!(!tasks.is_terminated());
383383
}
384+
385+
// https://github.com/rust-lang/futures-rs/issues/2529#issuecomment-997290279
386+
#[test]
387+
fn clear_in_loop() {
388+
const N: usize =
389+
if cfg!(miri) || option_env!("QEMU_LD_PREFIX").is_some() { 100 } else { 10_000 };
390+
futures::executor::block_on(async {
391+
async fn task() {
392+
let (s, r) = oneshot::channel();
393+
std::thread::spawn(|| {
394+
std::thread::sleep(std::time::Duration::from_micros(100));
395+
let _ = s.send(());
396+
});
397+
r.await.unwrap()
398+
}
399+
let mut futures = FuturesUnordered::new();
400+
for _ in 0..N {
401+
for _ in 0..24 {
402+
futures.push(task());
403+
}
404+
let _ = futures.next().await;
405+
futures.clear();
406+
}
407+
});
408+
}

0 commit comments

Comments
 (0)