From 088f61f75ae63ca8d1b377f097f8603dc605903f Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Wed, 26 Feb 2025 14:32:03 +0100 Subject: [PATCH] Add the contract to runBlocking for shared JVM/Native code Additionally, on Native, make thread keepalive checks a bit more efficient. --- .../common/src/EventLoop.common.kt | 7 -- .../common/src/internal/Concurrent.common.kt | 5 +- .../concurrent/src/Builders.concurrent.kt | 50 ++++++++++- .../concurrent/test/RunBlockingTest.kt | 10 +++ kotlinx-coroutines-core/jvm/src/Builders.kt | 64 +------------ .../channels/testSendToChannel.txt | 4 +- .../jvm/test/RunBlockingJvmTest.kt | 16 ---- .../native/src/Builders.kt | 90 ++++--------------- 8 files changed, 86 insertions(+), 160 deletions(-) delete mode 100644 kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt diff --git a/kotlinx-coroutines-core/common/src/EventLoop.common.kt b/kotlinx-coroutines-core/common/src/EventLoop.common.kt index 84291a1b69..3c37159556 100644 --- a/kotlinx-coroutines-core/common/src/EventLoop.common.kt +++ b/kotlinx-coroutines-core/common/src/EventLoop.common.kt @@ -65,13 +65,6 @@ internal abstract class EventLoop : CoroutineDispatcher() { task.run() return true } - /** - * Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context - * parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one). - * By default, event loop implementation is thread-local and should not processed in the context - * (current thread's event loop should be processed instead). - */ - open fun shouldBeProcessedFromContext(): Boolean = false /** * Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded] diff --git a/kotlinx-coroutines-core/common/src/internal/Concurrent.common.kt b/kotlinx-coroutines-core/common/src/internal/Concurrent.common.kt index 0be8a104db..72dd3ef834 100644 --- a/kotlinx-coroutines-core/common/src/internal/Concurrent.common.kt +++ b/kotlinx-coroutines-core/common/src/internal/Concurrent.common.kt @@ -1,9 +1,6 @@ package kotlinx.coroutines.internal -internal expect class ReentrantLock() { - fun tryLock(): Boolean - fun unlock() -} +internal expect class ReentrantLock() internal expect inline fun ReentrantLock.withLock(action: () -> T): T diff --git a/kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt b/kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt index 7c0581b9d9..5ca1acf830 100644 --- a/kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt +++ b/kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt @@ -1,6 +1,15 @@ +@file:JvmMultifileClass +@file:JvmName("BuildersKt") +@file:Suppress("LEAKED_IN_PLACE_LAMBDA", "WRONG_INVOCATION_KIND") + package kotlinx.coroutines +import kotlin.contracts.ExperimentalContracts +import kotlin.contracts.InvocationKind +import kotlin.contracts.contract import kotlin.coroutines.* +import kotlin.jvm.JvmMultifileClass +import kotlin.jvm.JvmName /** * Runs a new coroutine and **blocks** the current thread until its completion. @@ -20,5 +29,44 @@ import kotlin.coroutines.* * * Here, instead of releasing the thread on which `loadConfiguration` runs if `fetchConfigurationData` suspends, it will * block, potentially leading to thread starvation issues. + * + * The default [CoroutineDispatcher] for this builder is an internal implementation of event loop that processes continuations + * in this blocked thread until the completion of this coroutine. + * See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`. + * + * When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of + * the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of another `runBlocking`, + * then this invocation uses the outer event loop. + * + * If this blocked thread is interrupted (see `Thread.interrupt`), then the coroutine job is cancelled and + * this `runBlocking` invocation throws `InterruptedException`. + * + * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available + * for a newly created coroutine. + * + * @param context the context of the coroutine. The default value is an event loop on the current thread. + * @param block the coroutine code. */ -public expect fun runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T +@OptIn(ExperimentalContracts::class) +public fun runBlocking( + context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T +): T { + contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) } + val contextInterceptor = context[ContinuationInterceptor] + val eventLoop: EventLoop? + val newContext: CoroutineContext + if (contextInterceptor == null) { + // create or use private event loop if no dispatcher is specified + eventLoop = ThreadLocalEventLoop.eventLoop + newContext = GlobalScope.newCoroutineContext(context + eventLoop) + } else { + eventLoop = ThreadLocalEventLoop.currentOrNull() + newContext = GlobalScope.newCoroutineContext(context) + } + return runBlockingImpl(newContext, eventLoop, block) +} + +/** We can't inline it, because an `expect fun` can't have contracts. */ +internal expect fun runBlockingImpl( + newContext: CoroutineContext, eventLoop: EventLoop?, block: suspend CoroutineScope.() -> T +): T diff --git a/kotlinx-coroutines-core/concurrent/test/RunBlockingTest.kt b/kotlinx-coroutines-core/concurrent/test/RunBlockingTest.kt index 43f7976ffa..f4512e52ed 100644 --- a/kotlinx-coroutines-core/concurrent/test/RunBlockingTest.kt +++ b/kotlinx-coroutines-core/concurrent/test/RunBlockingTest.kt @@ -194,4 +194,14 @@ class RunBlockingTest : TestBase() { } } } + + /** Will not compile if [runBlocking] doesn't have the "runs exactly once" contract. */ + @Test + fun testContract() { + val rb: Int + runBlocking { + rb = 42 + } + rb.hashCode() // unused + } } diff --git a/kotlinx-coroutines-core/jvm/src/Builders.kt b/kotlinx-coroutines-core/jvm/src/Builders.kt index 3fed0ad9cb..3650e729ca 100644 --- a/kotlinx-coroutines-core/jvm/src/Builders.kt +++ b/kotlinx-coroutines-core/jvm/src/Builders.kt @@ -1,71 +1,15 @@ @file:JvmMultifileClass @file:JvmName("BuildersKt") -@file:OptIn(ExperimentalContracts::class) -@file:Suppress("LEAKED_IN_PLACE_LAMBDA", "WRONG_INVOCATION_KIND") package kotlinx.coroutines -import java.util.concurrent.locks.* -import kotlin.contracts.* import kotlin.coroutines.* -/** - * Runs a new coroutine and **blocks** the current thread _interruptibly_ until its completion. - * - * It is designed to bridge regular blocking code to libraries that are written in suspending style, to be used in - * `main` functions and in tests. - * - * Calling [runBlocking] from a suspend function is redundant. - * For example, the following code is incorrect: - * ``` - * suspend fun loadConfiguration() { - * // DO NOT DO THIS: - * val data = runBlocking { // <- redundant and blocks the thread, do not do that - * fetchConfigurationData() // suspending function - * } - * ``` - * - * Here, instead of releasing the thread on which `loadConfiguration` runs if `fetchConfigurationData` suspends, it will - * block, potentially leading to thread starvation issues. - * - * The default [CoroutineDispatcher] for this builder is an internal implementation of event loop that processes continuations - * in this blocked thread until the completion of this coroutine. - * See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`. - * - * When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of - * the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of another `runBlocking`, - * then this invocation uses the outer event loop. - * - * If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and - * this `runBlocking` invocation throws [InterruptedException]. - * - * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available - * for a newly created coroutine. - * - * @param context the context of the coroutine. The default value is an event loop on the current thread. - * @param block the coroutine code. - */ @Throws(InterruptedException::class) -public actual fun runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T { - contract { - callsInPlace(block, InvocationKind.EXACTLY_ONCE) - } - val currentThread = Thread.currentThread() - val contextInterceptor = context[ContinuationInterceptor] - val eventLoop: EventLoop? - val newContext: CoroutineContext - if (contextInterceptor == null) { - // create or use private event loop if no dispatcher is specified - eventLoop = ThreadLocalEventLoop.eventLoop - newContext = GlobalScope.newCoroutineContext(context + eventLoop) - } else { - // See if context's interceptor is an event loop that we shall use (to support TestContext) - // or take an existing thread-local event loop if present to avoid blocking it (but don't create one) - eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() } - ?: ThreadLocalEventLoop.currentOrNull() - newContext = GlobalScope.newCoroutineContext(context) - } - val coroutine = BlockingCoroutine(newContext, currentThread, eventLoop) +internal actual fun runBlockingImpl( + newContext: CoroutineContext, eventLoop: EventLoop?, block: suspend CoroutineScope.() -> T +): T { + val coroutine = BlockingCoroutine(newContext, Thread.currentThread(), eventLoop) coroutine.start(CoroutineStart.DEFAULT, coroutine, block) return coroutine.joinBlocking() } diff --git a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testSendToChannel.txt b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testSendToChannel.txt index af6e564210..2d89c5fae2 100644 --- a/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testSendToChannel.txt +++ b/kotlinx-coroutines-core/jvm/test-resources/stacktraces/channels/testSendToChannel.txt @@ -15,6 +15,8 @@ Caused by: java.util.concurrent.CancellationException: Channel was cancelled at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt) at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt) at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt) - at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt) + at kotlinx.coroutines.BuildersKt__BuildersKt.runBlockingImpl(Builders.kt) + at kotlinx.coroutines.BuildersKt.runBlockingImpl(Unknown Source) + at kotlinx.coroutines.BuildersKt__Builders_concurrentKt.runBlocking(Builders.concurrent.kt) at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source) at kotlinx.coroutines.testing.TestBase.runTest(TestBase.kt) diff --git a/kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt b/kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt deleted file mode 100644 index cc2291e6c1..0000000000 --- a/kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt +++ /dev/null @@ -1,16 +0,0 @@ -package kotlinx.coroutines - -import kotlinx.coroutines.testing.* -import org.junit.* - -class RunBlockingJvmTest : TestBase() { - @Test - fun testContract() { - val rb: Int - runBlocking { - rb = 42 - } - rb.hashCode() // unused - } -} - diff --git a/kotlinx-coroutines-core/native/src/Builders.kt b/kotlinx-coroutines-core/native/src/Builders.kt index 4f94f19b53..58c33ada3e 100644 --- a/kotlinx-coroutines-core/native/src/Builders.kt +++ b/kotlinx-coroutines-core/native/src/Builders.kt @@ -1,101 +1,49 @@ -@file:OptIn(ExperimentalContracts::class, ObsoleteWorkersApi::class) -@file:Suppress("LEAKED_IN_PLACE_LAMBDA", "WRONG_INVOCATION_KIND") +@file:OptIn(ObsoleteWorkersApi::class) package kotlinx.coroutines -import kotlinx.cinterop.* -import kotlin.contracts.* import kotlin.coroutines.* import kotlin.native.concurrent.* -/** - * Runs a new coroutine and **blocks** the current thread _interruptibly_ until its completion. - * - * It is designed to bridge regular blocking code to libraries that are written in suspending style, to be used in - * `main` functions and in tests. - * - * Calling [runBlocking] from a suspend function is redundant. - * For example, the following code is incorrect: - * ``` - * suspend fun loadConfiguration() { - * // DO NOT DO THIS: - * val data = runBlocking { // <- redundant and blocks the thread, do not do that - * fetchConfigurationData() // suspending function - * } - * ``` - * - * Here, instead of releasing the thread on which `loadConfiguration` runs if `fetchConfigurationData` suspends, it will - * block, potentially leading to thread starvation issues. - * - * The default [CoroutineDispatcher] for this builder is an internal implementation of event loop that processes continuations - * in this blocked thread until the completion of this coroutine. - * See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`. - * - * When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of - * the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of another `runBlocking`, - * then this invocation uses the outer event loop. - * - * If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and - * this `runBlocking` invocation throws [InterruptedException]. - * - * See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available - * for a newly created coroutine. - * - * @param context the context of the coroutine. The default value is an event loop on the current thread. - * @param block the coroutine code. - */ -public actual fun runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T { - contract { - callsInPlace(block, InvocationKind.EXACTLY_ONCE) - } - val contextInterceptor = context[ContinuationInterceptor] - val eventLoop: EventLoop? - val newContext: CoroutineContext - if (contextInterceptor == null) { - // create or use private event loop if no dispatcher is specified - eventLoop = ThreadLocalEventLoop.eventLoop - newContext = GlobalScope.newCoroutineContext(context + eventLoop) - } else { - // See if context's interceptor is an event loop that we shall use (to support TestContext) - // or take an existing thread-local event loop if present to avoid blocking it (but don't create one) - eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() } - ?: ThreadLocalEventLoop.currentOrNull() - newContext = GlobalScope.newCoroutineContext(context) - } - val coroutine = BlockingCoroutine(newContext, eventLoop) - var completed = false - ThreadLocalKeepAlive.addCheck { !completed } +internal actual fun runBlockingImpl( + newContext: CoroutineContext, eventLoop: EventLoop?, block: suspend CoroutineScope.() -> T +): T { + val coroutine = BlockingCoroutine(newContext, Worker.current, eventLoop) + ThreadLocalKeepAlive.registerUsage() try { coroutine.start(CoroutineStart.DEFAULT, coroutine, block) return coroutine.joinBlocking() } finally { - completed = true + ThreadLocalKeepAlive.unregisterUsage() } } @ThreadLocal private object ThreadLocalKeepAlive { - /** If any of these checks passes, this means this [Worker] is still used. */ - private var checks = mutableListOf<() -> Boolean>() + /** If larger than 0, this means this [Worker] is still used. */ + private var usages = 0 /** Whether the worker currently tries to keep itself alive. */ private var keepAliveLoopActive = false - /** Adds another stopgap that must be passed before the [Worker] can be terminated. */ - fun addCheck(terminationForbidden: () -> Boolean) { - checks.add(terminationForbidden) + /** Ensure that the worker is kept alive until the matching [unregisterUsage] is called. */ + fun registerUsage() { + usages++ if (!keepAliveLoopActive) keepAlive() } + /** Undo [registerUsage]. */ + fun unregisterUsage() { + usages-- + } + /** * Send a ping to the worker to prevent it from terminating while this coroutine is running, * ensuring that continuations don't get dropped and forgotten. */ private fun keepAlive() { - // only keep the checks that still forbid the termination - checks = checks.filter { it() }.toMutableList() // if there are no checks left, we no longer keep the worker alive, it can be terminated - keepAliveLoopActive = checks.isNotEmpty() + keepAliveLoopActive = usages > 0 if (keepAliveLoopActive) { Worker.current.executeAfter(afterMicroseconds = 100_000) { keepAlive() @@ -106,9 +54,9 @@ private object ThreadLocalKeepAlive { private class BlockingCoroutine( parentContext: CoroutineContext, + private val joinWorker: Worker, private val eventLoop: EventLoop? ) : AbstractCoroutine(parentContext, true, true) { - private val joinWorker = Worker.current override val isScopedCoroutine: Boolean get() = true