Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add docs on task-specific buffering using multithreading #48542

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions base/threadingconstructs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,8 @@ For example, the above conditions imply that:
- Communicating between iterations using blocking primitives like `Channel`s is incorrect.
- Write only to locations not shared across iterations (unless a lock or atomic operation is
used).
- The value of [`threadid()`](@ref Threads.threadid) may change even within a single
iteration. See [`Task Migration`](@ref man-task-migration)
- Unless the `:static` schedule is used, the value of [`threadid()`](@ref Threads.threadid)
may change even within a single iteration. See [`Task Migration`](@ref man-task-migration).

## Schedulers

Expand Down
72 changes: 68 additions & 4 deletions doc/src/manual/multi-threading.md
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,68 @@ julia> a

Note that [`Threads.@threads`](@ref) does not have an optional reduction parameter like [`@distributed`](@ref).

### Using `@threads` without data races

Taking the example of a naive sum

```julia-repl
julia> function sum_single(a)
s = 0
for i in a
s += i
end
s
end
sum_single (generic function with 1 method)

julia> sum_single(1:1_000_000)
500000500000
```

Simply adding `@threads` exposes a data race with multiple threads reading and writing `s` at the same time.
```julia-repl
julia> function sum_multi_bad(a)
s = 0
Threads.@threads for i in a
s += i
end
s
end
sum_multi_bad (generic function with 1 method)

julia> sum_multi_bad(1:1_000_000)
70140554652
```

Note that the result is not `500000500000` as it should be, and will most likely change each evaluation.

To fix this, buffers that are specific to the task may be used to segment the sum into chunks that are race-free.
Here `sum_single` is reused, with its own internal buffer `s`, and vector `a` is split into `nthreads()`
chunks for parallel work via `nthreads()` `@spawn`-ed tasks.

```julia-repl
julia> function sum_multi_good(a)
Copy link

@torfjelde torfjelde Feb 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not an expert on multi-threaded parallelism, so I might just be wrong.

But this doesn't seem like it'll be particularly performant as the values of the WeakKeyDict won't be contiguous in memory, and so the reduction at the needs to gather these from likely different parts of the heap => slow reduction. In fact, when I tried running something like this on an example of mine it was incredibly slow; probably partially due to what I just mentioned, and partially due to the slowness of WeakKeyDict (I believe you can also use an IdDict which at least should be faster that WeakKeyDict).

An alternative is Atomic as mentioned below, or, even better, you can do a Vector{Atomic{T}} of some buffer_length. In the latter scenario you can then just randomly pick an index for each Task, and act on the corresponding Atomic{T} atomically, e.g. atomic_add!(buffer[rand(1:buffer_size)], i). If the work within each of the tasks is fairly uniform, then this random picking of index to add to should result in fairly even congestion for the different buffers.

I'm sure there are much, much better ways of doing this, but the above is a quick-and-easy way to implement a much faster version of this kind of map-reduce approach using a buffer.

chunks = Iterators.partition(a, length(a) ÷ Threads.nthreads())
tasks = map(chunks) do chunk
Threads.@spawn sum_single(chunk)
end
chunk_sums = fetch.(tasks)
return sum_single(chunk_sums)
end
sum_multi_good (generic function with 1 method)

julia> sum_multi_good(1:1_000_000)
500000500000
```
!!! Note
Buffers should not be managed based on `threadid()` i.e. `buffers = zeros(Threads.nthreads())` because concurrent tasks
can yield, meaning multiple concurrent tasks may use the same buffer on a given thread, introducing risk of data races.
Further, when more than one thread is available tasks may change thread at yield points, which is known as
[task migration](@ref man-task-migration).

Another option is the use of atomic operations on variables shared across tasks/threads, which may be more performant
depending on the characteristics of the operations.

## Atomic Operations

Julia supports accessing and modifying values *atomically*, that is, in a thread-safe way to avoid
Expand Down Expand Up @@ -390,11 +452,13 @@ threads in Julia:

## [Task Migration](@id man-task-migration)

After a task starts running on a certain thread (e.g. via [`@spawn`](@ref Threads.@spawn) or
[`@threads`](@ref Threads.@threads)), it may move to a different thread if the task yields.
After a task starts running on a certain thread it may move to a different thread if the task yields.

Such tasks may have been started with [`@spawn`](@ref Threads.@spawn) or [`@threads`](@ref Threads.@threads),
although the `:static` schedule option for `@threads` does freeze the threadid.

This means that [`threadid()`](@ref Threads.threadid) should not be treated as constant within a task, and therefore
should not be used to index into a vector of buffers or stateful objects.
This means that in most cases [`threadid()`](@ref Threads.threadid) should not be treated as constant within a task,
and therefore should not be used to index into a vector of buffers or stateful objects.

!!! compat "Julia 1.7"
Task migration was introduced in Julia 1.7. Before this tasks always remained on the same thread that they were
Expand Down