Skip to content

Commit c07257f

Browse files
io: simplify io readiness logic (#6966)
1 parent d08578f commit c07257f

File tree

1 file changed

+23
-54
lines changed

1 file changed

+23
-54
lines changed

tokio/src/runtime/io/scheduled_io.rs

+23-54
Original file line numberDiff line numberDiff line change
@@ -206,43 +206,23 @@ impl ScheduledIo {
206206
/// specific tick.
207207
/// - `f`: a closure returning a new readiness value given the previous
208208
/// readiness.
209-
pub(super) fn set_readiness(&self, tick: Tick, f: impl Fn(Ready) -> Ready) {
210-
let mut current = self.readiness.load(Acquire);
211-
212-
// If the io driver is shut down, then you are only allowed to clear readiness.
213-
debug_assert!(SHUTDOWN.unpack(current) == 0 || matches!(tick, Tick::Clear(_)));
214-
215-
loop {
216-
// Mask out the tick bits so that the modifying function doesn't see
217-
// them.
218-
let current_readiness = Ready::from_usize(current);
219-
let new = f(current_readiness);
220-
221-
let new_tick = match tick {
222-
Tick::Set => {
223-
let current = TICK.unpack(current);
224-
current.wrapping_add(1) % (TICK.max_value() + 1)
225-
}
226-
Tick::Clear(t) => {
227-
if TICK.unpack(current) as u8 != t {
228-
// Trying to clear readiness with an old event!
229-
return;
230-
}
231-
232-
t as usize
233-
}
209+
pub(super) fn set_readiness(&self, tick_op: Tick, f: impl Fn(Ready) -> Ready) {
210+
let _ = self.readiness.fetch_update(AcqRel, Acquire, |curr| {
211+
// If the io driver is shut down, then you are only allowed to clear readiness.
212+
debug_assert!(SHUTDOWN.unpack(curr) == 0 || matches!(tick_op, Tick::Clear(_)));
213+
214+
const MAX_TICK: usize = TICK.max_value() + 1;
215+
let tick = TICK.unpack(curr);
216+
217+
let new_tick = match tick_op {
218+
// Trying to clear readiness with an old event!
219+
Tick::Clear(t) if tick as u8 != t => return None,
220+
Tick::Clear(t) => t as usize,
221+
Tick::Set => tick.wrapping_add(1) % MAX_TICK,
234222
};
235-
let next = TICK.pack(new_tick, new.as_usize());
236-
237-
match self
238-
.readiness
239-
.compare_exchange(current, next, AcqRel, Acquire)
240-
{
241-
Ok(_) => return,
242-
// we lost the race, retry!
243-
Err(actual) => current = actual,
244-
}
245-
}
223+
let ready = Ready::from_usize(READINESS.unpack(curr));
224+
Some(TICK.pack(new_tick, f(ready).as_usize()))
225+
});
246226
}
247227

248228
/// Notifies all pending waiters that have registered interest in `ready`.
@@ -335,22 +315,16 @@ impl ScheduledIo {
335315
if ready.is_empty() && !is_shutdown {
336316
// Update the task info
337317
let mut waiters = self.waiters.lock();
338-
let slot = match direction {
318+
let waker = match direction {
339319
Direction::Read => &mut waiters.reader,
340320
Direction::Write => &mut waiters.writer,
341321
};
342322

343323
// Avoid cloning the waker if one is already stored that matches the
344324
// current task.
345-
match slot {
346-
Some(existing) => {
347-
if !existing.will_wake(cx.waker()) {
348-
existing.clone_from(cx.waker());
349-
}
350-
}
351-
None => {
352-
*slot = Some(cx.waker().clone());
353-
}
325+
match waker {
326+
Some(waker) => waker.clone_from(cx.waker()),
327+
None => *waker = Some(cx.waker().clone()),
354328
}
355329

356330
// Try again, in case the readiness was changed while we were
@@ -465,12 +439,11 @@ impl Future for Readiness<'_> {
465439
State::Init => {
466440
// Optimistically check existing readiness
467441
let curr = scheduled_io.readiness.load(SeqCst);
468-
let ready = Ready::from_usize(READINESS.unpack(curr));
469442
let is_shutdown = SHUTDOWN.unpack(curr) != 0;
470443

471444
// Safety: `waiter.interest` never changes
472445
let interest = unsafe { (*waiter.get()).interest };
473-
let ready = ready.intersection(interest);
446+
let ready = Ready::from_usize(READINESS.unpack(curr)).intersection(interest);
474447

475448
if !ready.is_empty() || is_shutdown {
476449
// Currently ready!
@@ -538,10 +511,7 @@ impl Future for Readiness<'_> {
538511
*state = State::Done;
539512
} else {
540513
// Update the waker, if necessary.
541-
if !w.waker.as_ref().unwrap().will_wake(cx.waker()) {
542-
w.waker = Some(cx.waker().clone());
543-
}
544-
514+
w.waker.as_mut().unwrap().clone_from(cx.waker());
545515
return Poll::Pending;
546516
}
547517

@@ -566,8 +536,7 @@ impl Future for Readiness<'_> {
566536

567537
// The readiness state could have been cleared in the meantime,
568538
// but we allow the returned ready set to be empty.
569-
let curr_ready = Ready::from_usize(READINESS.unpack(curr));
570-
let ready = curr_ready.intersection(w.interest);
539+
let ready = Ready::from_usize(READINESS.unpack(curr)).intersection(w.interest);
571540

572541
return Poll::Ready(ReadyEvent {
573542
tick,

0 commit comments

Comments
 (0)