Skip to content

Commit

Permalink
Add the contract to runBlocking for shared JVM/Native code
Browse files Browse the repository at this point in the history
Additionally, on Native, make thread keepalive checks a bit more
efficient.
  • Loading branch information
dkhalanskyjb committed Feb 26, 2025
1 parent d3f1f23 commit c038e59
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 160 deletions.
7 changes: 0 additions & 7 deletions kotlinx-coroutines-core/common/src/EventLoop.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,6 @@ internal abstract class EventLoop : CoroutineDispatcher() {
task.run()
return true
}
/**
* Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context
* parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one).
* By default, event loop implementation is thread-local and should not processed in the context
* (current thread's event loop should be processed instead).
*/
open fun shouldBeProcessedFromContext(): Boolean = false

/**
* Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package kotlinx.coroutines.internal

internal expect class ReentrantLock() {
fun tryLock(): Boolean
fun unlock()
}
internal expect class ReentrantLock()

internal expect inline fun <T> ReentrantLock.withLock(action: () -> T): T

Expand Down
50 changes: 49 additions & 1 deletion kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
@file:JvmMultifileClass
@file:JvmName("BuildersKt")
@file:Suppress("LEAKED_IN_PLACE_LAMBDA", "WRONG_INVOCATION_KIND")

package kotlinx.coroutines

import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
import kotlin.coroutines.*
import kotlin.jvm.JvmMultifileClass
import kotlin.jvm.JvmName

/**
* Runs a new coroutine and **blocks** the current thread until its completion.
Expand All @@ -20,5 +29,44 @@ import kotlin.coroutines.*
*
* Here, instead of releasing the thread on which `loadConfiguration` runs if `fetchConfigurationData` suspends, it will
* block, potentially leading to thread starvation issues.
*
* The default [CoroutineDispatcher] for this builder is an internal implementation of event loop that processes continuations
* in this blocked thread until the completion of this coroutine.
* See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`.
*
* When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of
* the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of another `runBlocking`,
* then this invocation uses the outer event loop.
*
* If this blocked thread is interrupted (see `Thread.interrupt`), then the coroutine job is cancelled and
* this `runBlocking` invocation throws `InterruptedException`.
*
* See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available
* for a newly created coroutine.
*
* @param context the context of the coroutine. The default value is an event loop on the current thread.
* @param block the coroutine code.
*/
public expect fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T
@OptIn(ExperimentalContracts::class)
public fun <T> runBlocking(
context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T
): T {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
val contextInterceptor = context[ContinuationInterceptor]
val eventLoop: EventLoop?
val newContext: CoroutineContext
if (contextInterceptor == null) {
// create or use private event loop if no dispatcher is specified
eventLoop = ThreadLocalEventLoop.eventLoop
newContext = GlobalScope.newCoroutineContext(context + eventLoop)
} else {
eventLoop = ThreadLocalEventLoop.currentOrNull()
newContext = GlobalScope.newCoroutineContext(context)
}
return runBlockingImpl(newContext, eventLoop, block)
}

/** We can't inline it, because an `expect fun` can't have contracts. */
internal expect fun <T> runBlockingImpl(
newContext: CoroutineContext, eventLoop: EventLoop?, block: suspend CoroutineScope.() -> T
): T
10 changes: 10 additions & 0 deletions kotlinx-coroutines-core/concurrent/test/RunBlockingTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,14 @@ class RunBlockingTest : TestBase() {
}
}
}

/** Will not compile if [runBlocking] doesn't have the "runs exactly once" contract. */
@Test
fun testContract() {
val rb: Int
runBlocking {
rb = 42
}
rb.hashCode() // unused
}
}
64 changes: 4 additions & 60 deletions kotlinx-coroutines-core/jvm/src/Builders.kt
Original file line number Diff line number Diff line change
@@ -1,71 +1,15 @@
@file:JvmMultifileClass
@file:JvmName("BuildersKt")
@file:OptIn(ExperimentalContracts::class)
@file:Suppress("LEAKED_IN_PLACE_LAMBDA", "WRONG_INVOCATION_KIND")

package kotlinx.coroutines

import java.util.concurrent.locks.*
import kotlin.contracts.*
import kotlin.coroutines.*

/**
* Runs a new coroutine and **blocks** the current thread _interruptibly_ until its completion.
*
* It is designed to bridge regular blocking code to libraries that are written in suspending style, to be used in
* `main` functions and in tests.
*
* Calling [runBlocking] from a suspend function is redundant.
* For example, the following code is incorrect:
* ```
* suspend fun loadConfiguration() {
* // DO NOT DO THIS:
* val data = runBlocking { // <- redundant and blocks the thread, do not do that
* fetchConfigurationData() // suspending function
* }
* ```
*
* Here, instead of releasing the thread on which `loadConfiguration` runs if `fetchConfigurationData` suspends, it will
* block, potentially leading to thread starvation issues.
*
* The default [CoroutineDispatcher] for this builder is an internal implementation of event loop that processes continuations
* in this blocked thread until the completion of this coroutine.
* See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`.
*
* When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of
* the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of another `runBlocking`,
* then this invocation uses the outer event loop.
*
* If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and
* this `runBlocking` invocation throws [InterruptedException].
*
* See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available
* for a newly created coroutine.
*
* @param context the context of the coroutine. The default value is an event loop on the current thread.
* @param block the coroutine code.
*/
@Throws(InterruptedException::class)
public actual fun <T> runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
val currentThread = Thread.currentThread()
val contextInterceptor = context[ContinuationInterceptor]
val eventLoop: EventLoop?
val newContext: CoroutineContext
if (contextInterceptor == null) {
// create or use private event loop if no dispatcher is specified
eventLoop = ThreadLocalEventLoop.eventLoop
newContext = GlobalScope.newCoroutineContext(context + eventLoop)
} else {
// See if context's interceptor is an event loop that we shall use (to support TestContext)
// or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
?: ThreadLocalEventLoop.currentOrNull()
newContext = GlobalScope.newCoroutineContext(context)
}
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
internal actual fun <T> runBlockingImpl(
newContext: CoroutineContext, eventLoop: EventLoop?, block: suspend CoroutineScope.() -> T
): T {
val coroutine = BlockingCoroutine<T>(newContext, Thread.currentThread(), eventLoop)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
return coroutine.joinBlocking()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ Caused by: java.util.concurrent.CancellationException: Channel was cancelled
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt)
at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlockingImpl(Builders.kt)
at kotlinx.coroutines.BuildersKt.runBlockingImpl(Unknown Source)
at kotlinx.coroutines.BuildersKt__Builders_concurrentKt.runBlocking(Builders.concurrent.kt)
at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
at kotlinx.coroutines.testing.TestBase.runTest(TestBase.kt)
16 changes: 0 additions & 16 deletions kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt

This file was deleted.

90 changes: 19 additions & 71 deletions kotlinx-coroutines-core/native/src/Builders.kt
Original file line number Diff line number Diff line change
@@ -1,101 +1,49 @@
@file:OptIn(ExperimentalContracts::class, ObsoleteWorkersApi::class)
@file:Suppress("LEAKED_IN_PLACE_LAMBDA", "WRONG_INVOCATION_KIND")
@file:OptIn(ObsoleteWorkersApi::class)

package kotlinx.coroutines

import kotlinx.cinterop.*
import kotlin.contracts.*
import kotlin.coroutines.*
import kotlin.native.concurrent.*

/**
* Runs a new coroutine and **blocks** the current thread _interruptibly_ until its completion.
*
* It is designed to bridge regular blocking code to libraries that are written in suspending style, to be used in
* `main` functions and in tests.
*
* Calling [runBlocking] from a suspend function is redundant.
* For example, the following code is incorrect:
* ```
* suspend fun loadConfiguration() {
* // DO NOT DO THIS:
* val data = runBlocking { // <- redundant and blocks the thread, do not do that
* fetchConfigurationData() // suspending function
* }
* ```
*
* Here, instead of releasing the thread on which `loadConfiguration` runs if `fetchConfigurationData` suspends, it will
* block, potentially leading to thread starvation issues.
*
* The default [CoroutineDispatcher] for this builder is an internal implementation of event loop that processes continuations
* in this blocked thread until the completion of this coroutine.
* See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`.
*
* When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of
* the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of another `runBlocking`,
* then this invocation uses the outer event loop.
*
* If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and
* this `runBlocking` invocation throws [InterruptedException].
*
* See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available
* for a newly created coroutine.
*
* @param context the context of the coroutine. The default value is an event loop on the current thread.
* @param block the coroutine code.
*/
public actual fun <T> runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
val contextInterceptor = context[ContinuationInterceptor]
val eventLoop: EventLoop?
val newContext: CoroutineContext
if (contextInterceptor == null) {
// create or use private event loop if no dispatcher is specified
eventLoop = ThreadLocalEventLoop.eventLoop
newContext = GlobalScope.newCoroutineContext(context + eventLoop)
} else {
// See if context's interceptor is an event loop that we shall use (to support TestContext)
// or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
?: ThreadLocalEventLoop.currentOrNull()
newContext = GlobalScope.newCoroutineContext(context)
}
val coroutine = BlockingCoroutine<T>(newContext, eventLoop)
var completed = false
ThreadLocalKeepAlive.addCheck { !completed }
internal actual fun <T> runBlockingImpl(
newContext: CoroutineContext, eventLoop: EventLoop?, block: suspend CoroutineScope.() -> T
): T {
val coroutine = BlockingCoroutine<T>(newContext, Worker.current, eventLoop)
ThreadLocalKeepAlive.registerUsage()
try {
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
return coroutine.joinBlocking()
} finally {
completed = true
ThreadLocalKeepAlive.unregisterUsage()
}
}

@ThreadLocal
private object ThreadLocalKeepAlive {
/** If any of these checks passes, this means this [Worker] is still used. */
private var checks = mutableListOf<() -> Boolean>()
/** If larger than 0, this means this [Worker] is still used. */
private var usages = 0

/** Whether the worker currently tries to keep itself alive. */
private var keepAliveLoopActive = false

/** Adds another stopgap that must be passed before the [Worker] can be terminated. */
fun addCheck(terminationForbidden: () -> Boolean) {
checks.add(terminationForbidden)
/** Ensure that the worker is kept alive until the matching [unregisterUsage] is called. */
fun registerUsage() {
usages++
if (!keepAliveLoopActive) keepAlive()
}

/** Undo [registerUsage]. */
fun unregisterUsage() {
usages--
}

/**
* Send a ping to the worker to prevent it from terminating while this coroutine is running,
* ensuring that continuations don't get dropped and forgotten.
*/
private fun keepAlive() {
// only keep the checks that still forbid the termination
checks = checks.filter { it() }.toMutableList()
// if there are no checks left, we no longer keep the worker alive, it can be terminated
keepAliveLoopActive = checks.isNotEmpty()
keepAliveLoopActive = usages > 0
if (keepAliveLoopActive) {
Worker.current.executeAfter(afterMicroseconds = 100_000) {
keepAlive()
Expand All @@ -106,9 +54,9 @@ private object ThreadLocalKeepAlive {

private class BlockingCoroutine<T>(
parentContext: CoroutineContext,
private val joinWorker: Worker,
private val eventLoop: EventLoop?
) : AbstractCoroutine<T>(parentContext, true, true) {
private val joinWorker = Worker.current

override val isScopedCoroutine: Boolean get() = true

Expand Down

0 comments on commit c038e59

Please sign in to comment.