Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the contract to runBlocking for shared JVM/Native code #4368

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion integration-testing/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,15 @@ sourceSets {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutinesVersion")
}
}

create("javaConsumersTest") {
compileClasspath += sourceSets.test.get().runtimeClasspath
runtimeClasspath += sourceSets.test.get().runtimeClasspath

dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutinesVersion")
}
}
}

kotlin {
Expand Down Expand Up @@ -199,16 +208,23 @@ tasks {
classpath = sourceSet.runtimeClasspath
}

create<Test>("javaConsumersTest") {
val sourceSet = sourceSets[name]
testClassesDirs = sourceSet.output.classesDirs
classpath = sourceSet.runtimeClasspath
}

check {
dependsOn(
"jvmCoreTest",
"debugDynamicAgentTest",
"mavenTest",
"debugAgentTest",
"coreAgentTest",
"javaConsumersTest",
":jpmsTest:check",
"smokeTest:build",
"java8Test:check"
"java8Test:check",
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import kotlinx.coroutines.BuildersKt;
import kotlinx.coroutines.Dispatchers;
import org.junit.Test;
import org.junit.Assert;

public class RunBlockingJavaTest {
Boolean entered = false;

/** This code will not compile if `runBlocking` doesn't declare `@Throws(InterruptedException::class)` */
@Test
public void testRunBlocking() {
try {
BuildersKt.runBlocking(Dispatchers.getIO(), (scope, continuation) -> {
entered = true;
return null;
});
} catch (InterruptedException e) {
}
Assert.assertTrue(entered);
}
}
2 changes: 2 additions & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public final class kotlinx/coroutines/BuildersKt {
public static synthetic fun launch$default (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/Job;
public static final fun runBlocking (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Ljava/lang/Object;
public static synthetic fun runBlocking$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun runBlockingK (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Ljava/lang/Object;
public static synthetic fun runBlockingK$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun withContext (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

Expand Down
7 changes: 0 additions & 7 deletions kotlinx-coroutines-core/common/src/EventLoop.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,6 @@ internal abstract class EventLoop : CoroutineDispatcher() {
task.run()
return true
}
/**
* Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context
* parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one).
* By default, event loop implementation is thread-local and should not processed in the context
* (current thread's event loop should be processed instead).
*/
open fun shouldBeProcessedFromContext(): Boolean = false

/**
* Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package kotlinx.coroutines.internal

internal expect class ReentrantLock() {
fun tryLock(): Boolean
fun unlock()
}
internal expect class ReentrantLock()

internal expect inline fun <T> ReentrantLock.withLock(action: () -> T): T

Expand Down
51 changes: 50 additions & 1 deletion kotlinx-coroutines-core/concurrent/src/Builders.concurrent.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
@file:Suppress("LEAKED_IN_PLACE_LAMBDA", "WRONG_INVOCATION_KIND")
@file:JvmMultifileClass
@file:JvmName("BuildersKt")

package kotlinx.coroutines

import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract
import kotlin.coroutines.*
import kotlin.jvm.JvmMultifileClass
import kotlin.jvm.JvmName

/**
* Runs a new coroutine and **blocks** the current thread until its completion.
Expand All @@ -20,5 +29,45 @@ import kotlin.coroutines.*
*
* Here, instead of releasing the thread on which `loadConfiguration` runs if `fetchConfigurationData` suspends, it will
* block, potentially leading to thread starvation issues.
*
* The default [CoroutineDispatcher] for this builder is an internal implementation of event loop that processes continuations
* in this blocked thread until the completion of this coroutine.
* See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`.
*
* When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of
* the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of another `runBlocking`,
* then this invocation uses the outer event loop.
*
* If this blocked thread is interrupted (see `Thread.interrupt`), then the coroutine job is cancelled and
* this `runBlocking` invocation throws `InterruptedException`.
*
* See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available
* for a newly created coroutine.
*
* @param context the context of the coroutine. The default value is an event loop on the current thread.
* @param block the coroutine code.
*/
public expect fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T
@OptIn(ExperimentalContracts::class)
@JvmName("runBlockingK")
public fun <T> runBlocking(
context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T
): T {
contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
val contextInterceptor = context[ContinuationInterceptor]
val eventLoop: EventLoop?
val newContext: CoroutineContext
if (contextInterceptor == null) {
// create or use private event loop if no dispatcher is specified
eventLoop = ThreadLocalEventLoop.eventLoop
newContext = GlobalScope.newCoroutineContext(context + eventLoop)
} else {
eventLoop = ThreadLocalEventLoop.currentOrNull()
newContext = GlobalScope.newCoroutineContext(context)
}
return runBlockingImpl(newContext, eventLoop, block)
}

/** We can't inline it, because an `expect fun` can't have contracts. */
internal expect fun <T> runBlockingImpl(
newContext: CoroutineContext, eventLoop: EventLoop?, block: suspend CoroutineScope.() -> T
): T
10 changes: 10 additions & 0 deletions kotlinx-coroutines-core/concurrent/test/RunBlockingTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,14 @@ class RunBlockingTest : TestBase() {
}
}
}

/** Will not compile if [runBlocking] doesn't have the "runs exactly once" contract. */
@Test
fun testContract() {
val rb: Int
runBlocking {
rb = 42
}
rb.hashCode() // unused
}
}
72 changes: 16 additions & 56 deletions kotlinx-coroutines-core/jvm/src/Builders.kt
Original file line number Diff line number Diff line change
@@ -1,71 +1,31 @@
@file:JvmMultifileClass
@file:JvmName("BuildersKt")
@file:OptIn(ExperimentalContracts::class)
@file:Suppress("LEAKED_IN_PLACE_LAMBDA", "WRONG_INVOCATION_KIND")

package kotlinx.coroutines

import java.util.concurrent.locks.*
import kotlin.contracts.*
import kotlin.coroutines.*

/**
* Runs a new coroutine and **blocks** the current thread _interruptibly_ until its completion.
* The same as [runBlocking], but for consumption from Java.
* From Kotlin's point of view, this function has the exact same signature as the regular [runBlocking].
* This is done so that it can not be called from Kotlin, despite the fact that it is public.
*
* It is designed to bridge regular blocking code to libraries that are written in suspending style, to be used in
* `main` functions and in tests.
* We do not expose this [runBlocking] in the documentation, because it is not supposed to be used from Kotlin.
*
* Calling [runBlocking] from a suspend function is redundant.
* For example, the following code is incorrect:
* ```
* suspend fun loadConfiguration() {
* // DO NOT DO THIS:
* val data = runBlocking { // <- redundant and blocks the thread, do not do that
* fetchConfigurationData() // suspending function
* }
* ```
*
* Here, instead of releasing the thread on which `loadConfiguration` runs if `fetchConfigurationData` suspends, it will
* block, potentially leading to thread starvation issues.
*
* The default [CoroutineDispatcher] for this builder is an internal implementation of event loop that processes continuations
* in this blocked thread until the completion of this coroutine.
* See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`.
*
* When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of
* the specified dispatcher while the current thread is blocked. If the specified dispatcher is an event loop of another `runBlocking`,
* then this invocation uses the outer event loop.
*
* If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and
* this `runBlocking` invocation throws [InterruptedException].
*
* See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging facilities that are available
* for a newly created coroutine.
*
* @param context the context of the coroutine. The default value is an event loop on the current thread.
* @param block the coroutine code.
* @suppress
*/
@Throws(InterruptedException::class)
public actual fun <T> runBlocking(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
val currentThread = Thread.currentThread()
val contextInterceptor = context[ContinuationInterceptor]
val eventLoop: EventLoop?
val newContext: CoroutineContext
if (contextInterceptor == null) {
// create or use private event loop if no dispatcher is specified
eventLoop = ThreadLocalEventLoop.eventLoop
newContext = GlobalScope.newCoroutineContext(context + eventLoop)
} else {
// See if context's interceptor is an event loop that we shall use (to support TestContext)
// or take an existing thread-local event loop if present to avoid blocking it (but don't create one)
eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
?: ThreadLocalEventLoop.currentOrNull()
newContext = GlobalScope.newCoroutineContext(context)
}
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
@kotlin.internal.LowPriorityInOverloadResolution
public fun <T> runBlocking(
context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T
): T = runBlocking(context, block)

@Throws(InterruptedException::class)
internal actual fun <T> runBlockingImpl(
newContext: CoroutineContext, eventLoop: EventLoop?, block: suspend CoroutineScope.() -> T
): T {
val coroutine = BlockingCoroutine<T>(newContext, Thread.currentThread(), eventLoop)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
return coroutine.joinBlocking()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ Caused by: java.util.concurrent.CancellationException: Channel was cancelled
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt)
at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt)
at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlockingImpl(Builders.kt)
at kotlinx.coroutines.BuildersKt.runBlockingImpl(Unknown Source)
at kotlinx.coroutines.BuildersKt__Builders_concurrentKt.runBlockingK(Builders.concurrent.kt)
at kotlinx.coroutines.BuildersKt.runBlockingK(Unknown Source)
at kotlinx.coroutines.testing.TestBase.runTest(TestBase.kt)
16 changes: 0 additions & 16 deletions kotlinx-coroutines-core/jvm/test/RunBlockingJvmTest.kt

This file was deleted.

Loading