Skip to content

Commit 3f23533

Browse files
authored
add an ability to suspend/resume a thread in a GC-safe way (#51489)
This exposes the GC "stop the world" API to the user, for causing a thread to quickly stop executing Julia code. This adds two APIs (that will need to be exported and documented later): ``` julia> @CCall jl_safepoint_suspend_thread(#=tid=#1::Cint, #=magicnumber=#2::Cint)::Cint # roughly tkill(1, SIGSTOP) julia> @CCall jl_safepoint_resume_thread(#=tid=#1::Cint)::Cint # roughly tkill(1, SIGCONT) ``` You can even suspend yourself, if there is another task to resume you 10 seconds later: ``` julia> ccall(:jl_enter_threaded_region, Cvoid, ()) julia> t = @task let; Libc.systemsleep(10); print("\nhello from $(Threads.threadid())\n"); @CCall jl_safepoint_resume_thread(0::Cint)::Cint; end; ccall(:jl_set_task_tid, Cint, (Any, Cint), t, 1); schedule(t); julia> @time @CCall jl_safepoint_suspend_thread(0::Cint, 2::Cint)::Cint hello from 2 10 seconds (6 allocations: 264 bytes) 1 ``` The meaning of the magic number is actually the kind of stop that you want: ``` // n.b. suspended threads may still run in the GC or GC safe regions // but shouldn't be observable, depending on which enum the user picks (only 1 and 2 are typically recommended here) // waitstate = 0 : do not wait for suspend to finish // waitstate = 1 : wait for gc_state != 0 (JL_GC_STATE_WAITING or JL_GC_STATE_SAFE) // waitstate = 2 : wait for gc_state != 0 (JL_GC_STATE_WAITING or JL_GC_STATE_SAFE) and that GC is not running on that thread // waitstate = 3 : wait for full suspend (gc_state == JL_GC_STATE_WAITING) -- this may never happen if thread is sleeping currently // if another thread comes along and calls jl_safepoint_resume, we also return early // return new suspend count on success, 0 on failure ``` Only magic number 2 is currently meaningful to the user though. The difference between waitstate 1 and 2 is only relevant in C code which is calling this from JL_GC_STATE_SAFE, since otherwise it is a priori known that GC isn't running, else we too would be running the GC. But the distinction of those states might be useful if we have a concurrent collector. Very important warning: if the stopped thread is holding any locks (e.g. for codegen or types) that you then attempt to acquire, your thread will deadlock. This is very likely, unless you are very careful. A future update to this API may try to change the waitstate to give the option to wait for the thread to release internal or known locks.
1 parent 0ab032a commit 3f23533

10 files changed

+231
-72
lines changed

src/gc.c

+2
Original file line numberDiff line numberDiff line change
@@ -3527,6 +3527,7 @@ JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection)
35273527
if (!jl_safepoint_start_gc()) {
35283528
// either another thread is running GC, or the GC got disabled just now.
35293529
jl_gc_state_set(ptls, old_state, JL_GC_STATE_WAITING);
3530+
jl_safepoint_wait_thread_resume(); // block in thread-suspend now if requested, after clearing the gc_state
35303531
return;
35313532
}
35323533

@@ -3580,6 +3581,7 @@ JL_DLLEXPORT void jl_gc_collect(jl_gc_collection_t collection)
35803581
jl_safepoint_end_gc();
35813582
jl_gc_state_set(ptls, old_state, JL_GC_STATE_WAITING);
35823583
JL_PROBE_GC_END();
3584+
jl_safepoint_wait_thread_resume(); // block in thread-suspend now if requested, after clearing the gc_state
35833585

35843586
// Only disable finalizers on current thread
35853587
// Doing this on all threads is racy (it's impossible to check

src/jl_exported_funcs.inc

+2
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,8 @@
416416
XX(jl_rethrow_other) \
417417
XX(jl_running_on_valgrind) \
418418
XX(jl_safe_printf) \
419+
XX(jl_safepoint_suspend_thread) \
420+
XX(jl_safepoint_resume_thread) \
419421
XX(jl_SC_CLK_TCK) \
420422
XX(jl_set_ARGS) \
421423
XX(jl_set_const) \

src/julia.h

+2
Original file line numberDiff line numberDiff line change
@@ -1054,6 +1054,8 @@ JL_DLLEXPORT void *jl_gc_managed_malloc(size_t sz);
10541054
JL_DLLEXPORT void *jl_gc_managed_realloc(void *d, size_t sz, size_t oldsz,
10551055
int isaligned, jl_value_t *owner);
10561056
JL_DLLEXPORT void jl_gc_safepoint(void);
1057+
JL_DLLEXPORT int jl_safepoint_suspend_thread(int tid, int waitstate);
1058+
JL_DLLEXPORT int jl_safepoint_resume_thread(int tid) JL_NOTSAFEPOINT;
10571059

10581060
void *mtarraylist_get(small_arraylist_t *_a, size_t idx) JL_NOTSAFEPOINT;
10591061
size_t mtarraylist_length(small_arraylist_t *_a) JL_NOTSAFEPOINT;

src/julia_internal.h

+7-6
Original file line numberDiff line numberDiff line change
@@ -892,7 +892,7 @@ extern char *jl_safepoint_pages;
892892
STATIC_INLINE int jl_addr_is_safepoint(uintptr_t addr)
893893
{
894894
uintptr_t safepoint_addr = (uintptr_t)jl_safepoint_pages;
895-
return addr >= safepoint_addr && addr < safepoint_addr + jl_page_size * 3;
895+
return addr >= safepoint_addr && addr < safepoint_addr + jl_page_size * 4;
896896
}
897897
extern _Atomic(uint32_t) jl_gc_running;
898898
extern _Atomic(uint32_t) jl_gc_disable_counter;
@@ -918,7 +918,8 @@ void jl_safepoint_end_gc(void);
918918
// Wait for the GC to finish
919919
// This function does **NOT** modify the `gc_state` to inform the GC thread
920920
// The caller should set it **BEFORE** calling this function.
921-
void jl_safepoint_wait_gc(void);
921+
void jl_safepoint_wait_gc(void) JL_NOTSAFEPOINT;
922+
void jl_safepoint_wait_thread_resume(void) JL_NOTSAFEPOINT;
922923

923924
// Set pending sigint and enable the mechanisms to deliver the sigint.
924925
void jl_safepoint_enable_sigint(void);
@@ -946,8 +947,7 @@ JL_DLLEXPORT void jl_pgcstack_getkey(jl_get_pgcstack_func **f, jl_pgcstack_key_t
946947
extern pthread_mutex_t in_signal_lock;
947948
#endif
948949

949-
#if !defined(__clang_gcanalyzer__) && !defined(_OS_DARWIN_)
950-
static inline void jl_set_gc_and_wait(void)
950+
static inline void jl_set_gc_and_wait(void) // n.b. not used on _OS_DARWIN_
951951
{
952952
jl_task_t *ct = jl_current_task;
953953
// reading own gc state doesn't need atomic ops since no one else
@@ -956,8 +956,8 @@ static inline void jl_set_gc_and_wait(void)
956956
jl_atomic_store_release(&ct->ptls->gc_state, JL_GC_STATE_WAITING);
957957
jl_safepoint_wait_gc();
958958
jl_atomic_store_release(&ct->ptls->gc_state, state);
959+
jl_safepoint_wait_thread_resume(); // block in thread-suspend now if requested, after clearing the gc_state
959960
}
960-
#endif
961961

962962
// Query if a Julia object is if a permalloc region (due to part of a sys- pkg-image)
963963
STATIC_INLINE size_t n_linkage_blobs(void) JL_NOTSAFEPOINT
@@ -1397,7 +1397,8 @@ extern jl_mutex_t typecache_lock;
13971397
extern JL_DLLEXPORT jl_mutex_t jl_codegen_lock;
13981398

13991399
#if defined(__APPLE__)
1400-
void jl_mach_gc_end(void);
1400+
void jl_mach_gc_end(void) JL_NOTSAFEPOINT;
1401+
void jl_safepoint_resume_thread_mach(jl_ptls_t ptls2, int16_t tid2) JL_NOTSAFEPOINT;
14011402
#endif
14021403

14031404
// -- smallintset.c -- //

src/julia_threads.h

+15-14
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ typedef struct _jl_tls_states_t {
211211
int16_t tid;
212212
int8_t threadpoolid;
213213
uint64_t rngseed;
214-
volatile size_t *safepoint;
214+
_Atomic(volatile size_t *) safepoint; // may be changed to the suspend page by any thread
215215
_Atomic(int8_t) sleep_check_state; // read/write from foreign threads
216216
// Whether it is safe to execute GC at the same time.
217217
#define JL_GC_STATE_WAITING 1
@@ -225,9 +225,9 @@ typedef struct _jl_tls_states_t {
225225
// statements is prohibited from certain
226226
// callbacks (such as generated functions)
227227
// as it may make compilation undecidable
228-
int8_t in_pure_callback;
229-
int8_t in_finalizer;
230-
int8_t disable_gc;
228+
int16_t in_pure_callback;
229+
int16_t in_finalizer;
230+
int16_t disable_gc;
231231
// Counter to disable finalizer **on the current thread**
232232
int finalizers_inhibited;
233233
jl_thread_heap_t heap; // this is very large, and the offset is baked into codegen
@@ -264,6 +264,7 @@ typedef struct _jl_tls_states_t {
264264
void *signal_stack;
265265
#endif
266266
jl_thread_t system_id;
267+
_Atomic(int16_t) suspend_count;
267268
arraylist_t finalizers;
268269
jl_gc_page_stack_t page_metadata_allocd;
269270
jl_gc_page_stack_t page_metadata_buffered;
@@ -333,17 +334,17 @@ void jl_sigint_safepoint(jl_ptls_t tls);
333334
// This triggers a SegFault when we are in GC
334335
// Assign it to a variable to make sure the compiler emit the load
335336
// and to avoid Clang warning for -Wunused-volatile-lvalue
336-
#define jl_gc_safepoint_(ptls) do { \
337-
jl_signal_fence(); \
338-
size_t safepoint_load = *ptls->safepoint; \
339-
jl_signal_fence(); \
340-
(void)safepoint_load; \
337+
#define jl_gc_safepoint_(ptls) do { \
338+
jl_signal_fence(); \
339+
size_t safepoint_load = jl_atomic_load_relaxed(&ptls->safepoint)[0]; \
340+
jl_signal_fence(); \
341+
(void)safepoint_load; \
341342
} while (0)
342-
#define jl_sigint_safepoint(ptls) do { \
343-
jl_signal_fence(); \
344-
size_t safepoint_load = ptls->safepoint[-1]; \
345-
jl_signal_fence(); \
346-
(void)safepoint_load; \
343+
#define jl_sigint_safepoint(ptls) do { \
344+
jl_signal_fence(); \
345+
size_t safepoint_load = jl_atomic_load_relaxed(&ptls->safepoint)[-1]; \
346+
jl_signal_fence(); \
347+
(void)safepoint_load; \
347348
} while (0)
348349
#endif
349350
STATIC_INLINE int8_t jl_gc_state_set(jl_ptls_t ptls, int8_t state,

src/rtutils.c

+4-7
Original file line numberDiff line numberDiff line change
@@ -275,15 +275,12 @@ JL_DLLEXPORT void jl_eh_restore_state(jl_handler_t *eh)
275275
}
276276
ct->world_age = eh->world_age;
277277
ct->ptls->defer_signal = eh->defer_signal;
278-
if (old_gc_state != eh->gc_state) {
278+
if (old_gc_state != eh->gc_state)
279279
jl_atomic_store_release(&ct->ptls->gc_state, eh->gc_state);
280-
if (old_gc_state) {
281-
jl_gc_safepoint_(ct->ptls);
282-
}
283-
}
284-
if (old_defer_signal && !eh->defer_signal) {
280+
if (!eh->gc_state)
281+
jl_gc_safepoint_(ct->ptls);
282+
if (old_defer_signal && !eh->defer_signal)
285283
jl_sigint_safepoint(ct->ptls);
286-
}
287284
if (jl_atomic_load_relaxed(&jl_gc_have_pending_finalizers) &&
288285
unlocks && eh->locks_len == 0) {
289286
jl_gc_run_pending_finalizers(ct);

src/safepoint.c

+125-10
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ char *jl_safepoint_pages = NULL;
3030
// so that both safepoint load and pending signal load falls in this page.
3131
// The initialization of the `safepoint` pointer is done `ti_initthread`
3232
// in `threading.c`.
33-
uint8_t jl_safepoint_enable_cnt[3] = {0, 0, 0};
33+
// The fourth page is the count of suspended threads
34+
uint16_t jl_safepoint_enable_cnt[4] = {0, 0, 0, 0};
3435

3536
// This lock should be acquired before enabling/disabling the safepoint
3637
// or accessing one of the following variables:
@@ -48,12 +49,12 @@ uv_cond_t safepoint_cond;
4849
static void jl_safepoint_enable(int idx) JL_NOTSAFEPOINT
4950
{
5051
// safepoint_lock should be held
51-
assert(0 <= idx && idx < 3);
52+
assert(0 <= idx && idx <= 3);
5253
if (jl_safepoint_enable_cnt[idx]++ != 0) {
5354
// We expect this to be enabled at most twice
5455
// one for the GC, one for SIGINT.
5556
// Update this if this is not the case anymore in the future.
56-
assert(jl_safepoint_enable_cnt[idx] <= 2);
57+
assert(jl_safepoint_enable_cnt[idx] <= (idx == 3 ? INT16_MAX : 2));
5758
return;
5859
}
5960
// Now that we are requested to mprotect the page and it wasn't already.
@@ -62,14 +63,15 @@ static void jl_safepoint_enable(int idx) JL_NOTSAFEPOINT
6263
DWORD old_prot;
6364
VirtualProtect(pageaddr, jl_page_size, PAGE_NOACCESS, &old_prot);
6465
#else
65-
mprotect(pageaddr, jl_page_size, PROT_NONE);
66+
int r = mprotect(pageaddr, jl_page_size, PROT_NONE);
67+
(void)r; //if (r) perror("mprotect");
6668
#endif
6769
}
6870

6971
static void jl_safepoint_disable(int idx) JL_NOTSAFEPOINT
7072
{
7173
// safepoint_lock should be held
72-
assert(0 <= idx && idx < 3);
74+
assert(0 <= idx && idx <= 3);
7375
if (--jl_safepoint_enable_cnt[idx] != 0) {
7476
assert(jl_safepoint_enable_cnt[idx] > 0);
7577
return;
@@ -81,7 +83,8 @@ static void jl_safepoint_disable(int idx) JL_NOTSAFEPOINT
8183
DWORD old_prot;
8284
VirtualProtect(pageaddr, jl_page_size, PAGE_READONLY, &old_prot);
8385
#else
84-
mprotect(pageaddr, jl_page_size, PROT_READ);
86+
int r = mprotect(pageaddr, jl_page_size, PROT_READ);
87+
(void)r; //if (r) perror("mprotect");
8588
#endif
8689
}
8790

@@ -92,9 +95,9 @@ void jl_safepoint_init(void)
9295
// jl_page_size isn't available yet.
9396
size_t pgsz = jl_getpagesize();
9497
#ifdef _OS_WINDOWS_
95-
char *addr = (char*)VirtualAlloc(NULL, pgsz * 3, MEM_COMMIT, PAGE_READONLY);
98+
char *addr = (char*)VirtualAlloc(NULL, pgsz * 4, MEM_COMMIT, PAGE_READONLY);
9699
#else
97-
char *addr = (char*)mmap(0, pgsz * 3, PROT_READ,
100+
char *addr = (char*)mmap(0, pgsz * 4, PROT_READ,
98101
MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
99102
if (addr == MAP_FAILED)
100103
addr = NULL;
@@ -104,6 +107,18 @@ void jl_safepoint_init(void)
104107
jl_gc_debug_critical_error();
105108
abort();
106109
}
110+
// // If we able to skip past the faulting safepoint instruction conditionally,
111+
// // then we can make this safepoint page unconditional. But otherwise we
112+
// // only enable this page when required, though it gives us less
113+
// // fine-grained control over individual resume.
114+
// char *pageaddr = addr + pgsz * 3;
115+
//#ifdef _OS_WINDOWS_
116+
// DWORD old_prot;
117+
// VirtualProtect(pageaddr, pgsz, PAGE_NOACCESS, &old_prot);
118+
//#else
119+
// int r = mprotect(pageaddr, pgsz, PROT_NONE);
120+
// (void)r; //if (r) perror("mprotect");
121+
//#endif
107122
// The signal page is for the gc safepoint.
108123
// The page before it is the sigint pending flag.
109124
jl_safepoint_pages = addr;
@@ -113,6 +128,7 @@ int jl_safepoint_start_gc(void)
113128
{
114129
// The thread should have set this already
115130
assert(jl_atomic_load_relaxed(&jl_current_task->ptls->gc_state) == JL_GC_STATE_WAITING);
131+
jl_safepoint_wait_thread_resume(); // make sure we are permitted to run GC now (we might be required to stop instead)
116132
uv_mutex_lock(&safepoint_lock);
117133
// In case multiple threads enter the GC at the same time, only allow
118134
// one of them to actually run the collection. We can't just let the
@@ -148,15 +164,16 @@ void jl_safepoint_end_gc(void)
148164
jl_safepoint_disable(2);
149165
jl_safepoint_disable(1);
150166
jl_atomic_store_release(&jl_gc_running, 0);
151-
# ifdef __APPLE__
167+
# ifdef _OS_DARWIN_
152168
// This wakes up other threads on mac.
153169
jl_mach_gc_end();
154170
# endif
155171
uv_mutex_unlock(&safepoint_lock);
156172
uv_cond_broadcast(&safepoint_cond);
157173
}
158174

159-
void jl_safepoint_wait_gc(void)
175+
// this is the core of jl_set_gc_and_wait
176+
void jl_safepoint_wait_gc(void) JL_NOTSAFEPOINT
160177
{
161178
jl_task_t *ct = jl_current_task; (void)ct;
162179
JL_TIMING_SUSPEND_TASK(GC_SAFEPOINT, ct);
@@ -175,6 +192,104 @@ void jl_safepoint_wait_gc(void)
175192
}
176193
}
177194

195+
// equivalent to jl_set_gc_and_wait, but waiting on resume-thread lock instead
196+
void jl_safepoint_wait_thread_resume(void)
197+
{
198+
jl_task_t *ct = jl_current_task;
199+
// n.b. we do not permit a fast-path here that skips the lock acquire since
200+
// we otherwise have no synchronization point to ensure that this thread
201+
// will observe the change to the safepoint, even though the other thread
202+
// might have already observed our gc_state.
203+
// if (!jl_atomic_load_relaxed(&ct->ptls->suspend_count)) return;
204+
JL_TIMING_SUSPEND_TASK(USER, ct);
205+
int8_t state = jl_atomic_load_relaxed(&ct->ptls->gc_state);
206+
jl_atomic_store_release(&ct->ptls->gc_state, JL_GC_STATE_WAITING);
207+
uv_mutex_lock(&ct->ptls->sleep_lock);
208+
while (jl_atomic_load_relaxed(&ct->ptls->suspend_count))
209+
uv_cond_wait(&ct->ptls->wake_signal, &ct->ptls->sleep_lock);
210+
// must while still holding the mutex_unlock, so we know other threads in
211+
// jl_safepoint_suspend_thread will observe this thread in the correct GC
212+
// state, and not still stuck in JL_GC_STATE_WAITING
213+
jl_atomic_store_release(&ct->ptls->gc_state, state);
214+
uv_mutex_unlock(&ct->ptls->sleep_lock);
215+
}
216+
217+
// n.b. suspended threads may still run in the GC or GC safe regions
218+
// but shouldn't be observable, depending on which enum the user picks (only 1 and 2 are typically recommended here)
219+
// waitstate = 0 : do not wait for suspend to finish
220+
// waitstate = 1 : wait for gc_state != 0 (JL_GC_STATE_WAITING or JL_GC_STATE_SAFE)
221+
// waitstate = 2 : wait for gc_state != 0 (JL_GC_STATE_WAITING or JL_GC_STATE_SAFE) and that GC is not running on that thread
222+
// waitstate = 3 : wait for full suspend (gc_state == JL_GC_STATE_WAITING) -- this may never happen if thread is sleeping currently
223+
// if another thread comes along and calls jl_safepoint_resume, we also return early
224+
// return new suspend count on success, 0 on failure
225+
int jl_safepoint_suspend_thread(int tid, int waitstate)
226+
{
227+
if (0 > tid || tid >= jl_atomic_load_acquire(&jl_n_threads))
228+
return 0;
229+
jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid];
230+
uv_mutex_lock(&ptls2->sleep_lock);
231+
int16_t suspend_count = jl_atomic_load_relaxed(&ptls2->suspend_count) + 1;
232+
jl_atomic_store_relaxed(&ptls2->suspend_count, suspend_count);
233+
if (suspend_count == 1) { // first to suspend
234+
jl_safepoint_enable(3);
235+
jl_atomic_store_relaxed(&ptls2->safepoint, (size_t*)(jl_safepoint_pages + jl_page_size * 3 + sizeof(void*)));
236+
}
237+
uv_mutex_unlock(&ptls2->sleep_lock);
238+
if (waitstate) {
239+
// wait for suspend (or another thread to call resume)
240+
if (waitstate >= 2) {
241+
// We currently cannot distinguish if a thread is helping run GC or
242+
// not, so assume it is running GC and wait for GC to finish first.
243+
// It will be unable to reenter helping with GC because we have
244+
// changed its safepoint page.
245+
jl_set_gc_and_wait();
246+
}
247+
while (jl_atomic_load_acquire(&ptls2->suspend_count) != 0) {
248+
int8_t state2 = jl_atomic_load_acquire(&ptls2->gc_state);
249+
if (waitstate <= 2 && state2 != 0)
250+
break;
251+
if (waitstate == 3 && state2 == JL_GC_STATE_WAITING)
252+
break;
253+
jl_cpu_pause(); // yield?
254+
}
255+
}
256+
return suspend_count;
257+
}
258+
259+
// return old suspend count on success, 0 on failure
260+
// n.b. threads often do not resume until after all suspended threads have been resumed!
261+
int jl_safepoint_resume_thread(int tid) JL_NOTSAFEPOINT
262+
{
263+
if (0 > tid || tid >= jl_atomic_load_acquire(&jl_n_threads))
264+
return 0;
265+
jl_ptls_t ptls2 = jl_atomic_load_relaxed(&jl_all_tls_states)[tid];
266+
# ifdef _OS_DARWIN_
267+
uv_mutex_lock(&safepoint_lock);
268+
# endif
269+
uv_mutex_lock(&ptls2->sleep_lock);
270+
int16_t suspend_count = jl_atomic_load_relaxed(&ptls2->suspend_count);
271+
if (suspend_count == 1) { // last to unsuspend
272+
if (tid == 0)
273+
jl_atomic_store_relaxed(&ptls2->safepoint, (size_t*)(jl_safepoint_pages + jl_page_size));
274+
else
275+
jl_atomic_store_relaxed(&ptls2->safepoint, (size_t*)(jl_safepoint_pages + jl_page_size * 2 + sizeof(void*)));
276+
uv_cond_signal(&ptls2->wake_signal);
277+
#ifdef _OS_DARWIN_
278+
jl_safepoint_resume_thread_mach(ptls2, tid);
279+
#endif
280+
}
281+
if (suspend_count != 0) {
282+
jl_atomic_store_relaxed(&ptls2->suspend_count, suspend_count - 1);
283+
if (suspend_count == 1)
284+
jl_safepoint_disable(3);
285+
}
286+
uv_mutex_unlock(&ptls2->sleep_lock);
287+
# ifdef _OS_DARWIN_
288+
uv_mutex_unlock(&safepoint_lock);
289+
# endif
290+
return suspend_count;
291+
}
292+
178293
void jl_safepoint_enable_sigint(void)
179294
{
180295
uv_mutex_lock(&safepoint_lock);

0 commit comments

Comments
 (0)