Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal/task: use a lock-free queue #2290

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 63 additions & 76 deletions src/internal/task/queue.go
Original file line number Diff line number Diff line change
@@ -1,69 +1,63 @@
package task

import "runtime/interrupt"
import (
"sync/atomic"
"unsafe"
)

const asserts = false

// Queue is a FIFO container of tasks.
// The zero value is an empty queue.
type Queue struct {
head, tail *Task
// in is a stack used to buffer incoming tasks.
in Stack

// out is a singly linked list of tasks in oldest-first order.
// Once empty, it is refilled by dumping and flipping the input stack.
out *Task
}

// Push a task onto the queue.
// This is atomic.
func (q *Queue) Push(t *Task) {
i := interrupt.Disable()
if asserts && t.Next != nil {
interrupt.Restore(i)
panic("runtime: pushing a task to a queue with a non-nil Next pointer")
}
if q.tail != nil {
q.tail.Next = t
}
q.tail = t
t.Next = nil
if q.head == nil {
q.head = t
}
interrupt.Restore(i)
q.in.Push(t)
}

// Pop a task off of the queue.
// This cannot be called concurrently.
func (q *Queue) Pop() *Task {
i := interrupt.Disable()
t := q.head
if t == nil {
interrupt.Restore(i)
return nil
}
q.head = t.Next
if q.tail == t {
q.tail = nil
}
t.Next = nil
interrupt.Restore(i)
return t
}
next := q.out
if next == nil {
// Dump the input stack.
s := q.in.dump()

// Flip it.
var prev *Task
for t := s.top; t != nil; {
next := t.Next
t.Next = prev
prev = t
t = next
}
if prev == nil {
// The queue is empty.
return nil
}

// Append pops the contents of another queue and pushes them onto the end of this queue.
func (q *Queue) Append(other *Queue) {
i := interrupt.Disable()
if q.head == nil {
q.head = other.head
} else {
q.tail.Next = other.head
// Save it in the output list.
next = prev
}
q.tail = other.tail
other.head, other.tail = nil, nil
interrupt.Restore(i)

q.out = next.Next
next.Next = nil
return next
}

// Empty checks if the queue is empty.
// This cannot be called concurrently with Pop.
func (q *Queue) Empty() bool {
i := interrupt.Disable()
empty := q.head == nil
interrupt.Restore(i)
return empty
return q.out == nil && atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&q.in.top))) == nil
}

// Stack is a LIFO container of tasks.
Expand All @@ -74,51 +68,44 @@ type Stack struct {
}

// Push a task onto the stack.
// This is atomic.
func (s *Stack) Push(t *Task) {
i := interrupt.Disable()
if asserts && t.Next != nil {
interrupt.Restore(i)
panic("runtime: pushing a task to a stack with a non-nil Next pointer")
}
s.top, t.Next = t, s.top
interrupt.Restore(i)
topPtr := (*unsafe.Pointer)(unsafe.Pointer(&s.top))
doPush:
top := atomic.LoadPointer(topPtr)
t.Next = (*Task)(top)
if !atomic.CompareAndSwapPointer(topPtr, top, unsafe.Pointer(t)) {
goto doPush
}
}

// Pop a task off of the stack.
// This is atomic.
func (s *Stack) Pop() *Task {
i := interrupt.Disable()
t := s.top
if t != nil {
s.top = t.Next
t.Next = nil
}
interrupt.Restore(i)
return t
}

// tail follows the chain of tasks.
// If t is nil, returns nil.
// Otherwise, returns the task in the chain where the Next field is nil.
func (t *Task) tail() *Task {
if t == nil {
topPtr := (*unsafe.Pointer)(unsafe.Pointer(&s.top))
doPop:
top := atomic.LoadPointer(topPtr)
if top == nil {
return nil
}
for t.Next != nil {
t = t.Next
t := (*Task)(top)
next := atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&t.Next)))
if !atomic.CompareAndSwapPointer(topPtr, top, next) {
goto doPop
}
t.Next = nil
return t
}

// Queue moves the contents of the stack into a queue.
// Elements can be popped from the queue in the same order that they would be popped from the stack.
func (s *Stack) Queue() Queue {
i := interrupt.Disable()
head := s.top
s.top = nil
q := Queue{
head: head,
tail: head.tail(),
// dump the contents of the stack to another stack.
func (s *Stack) dump() Stack {
return Stack{
top: (*Task)(atomic.SwapPointer(
(*unsafe.Pointer)(unsafe.Pointer(&s.top)),
nil,
)),
}
interrupt.Restore(i)
return q
}