Skip to content

Commit 662cf54

Browse files
coeuvrecopybara-github
authored andcommitted
Remote: Fix an issue that a failed action could lead to RuntimeException caused by InterruptedException thrown when acquiring gRPC connections. #13239
When --keep_going is not enabled, Bazel will cancel other executing actions if an action failed. An action which is executing remotely could in the state of waiting for a lock available to acquire the gRPC connection. SharedConnectionFactory uses ReentrantLock#lockInterruptibly to acquire the lock and will throw InterruptedException when the thread is interrupted which happens when the action is cancelled by Bazel. However, this InterruptedException is wrapped inside a RuntimeException results in a build error. ReentrantLock was choosen initially to implement a hand-over-hand locking algorithem but it's no longer necessary after a few iterations. This change replaces ReentrantLock with `synchronized` keyword so we won't throw InterruptedException when acquiring gRPC connections. Call sites can still throw InterruptedException to cancel an action execution. PiperOrigin-RevId: 365170212
1 parent b1f1511 commit 662cf54

File tree

3 files changed

+37
-56
lines changed

3 files changed

+37
-56
lines changed

src/main/java/com/google/devtools/build/lib/remote/grpc/SharedConnectionFactory.java

+5-17
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.ArrayList;
2626
import java.util.List;
2727
import java.util.concurrent.atomic.AtomicReference;
28-
import java.util.concurrent.locks.ReentrantLock;
2928
import javax.annotation.Nullable;
3029
import javax.annotation.concurrent.GuardedBy;
3130

@@ -44,10 +43,9 @@ public class SharedConnectionFactory implements ConnectionPool {
4443
private final ConnectionFactory factory;
4544

4645
@Nullable
47-
@GuardedBy("connectionLock")
46+
@GuardedBy("this")
4847
private AsyncSubject<Connection> connectionAsyncSubject = null;
4948

50-
private final ReentrantLock connectionLock = new ReentrantLock();
5149
private final AtomicReference<Disposable> connectionCreationDisposable =
5250
new AtomicReference<>(null);
5351

@@ -70,9 +68,7 @@ public void close() throws IOException {
7068
d.dispose();
7169
}
7270

73-
try {
74-
connectionLock.lockInterruptibly();
75-
71+
synchronized (this) {
7672
if (connectionAsyncSubject != null) {
7773
Connection connection = connectionAsyncSubject.getValue();
7874
if (connection != null) {
@@ -83,16 +79,11 @@ public void close() throws IOException {
8379
connectionAsyncSubject.onError(new IllegalStateException("closed"));
8480
}
8581
}
86-
} catch (InterruptedException e) {
87-
throw new IOException(e);
88-
} finally {
89-
connectionLock.unlock();
9082
}
9183
}
9284

93-
private AsyncSubject<Connection> createUnderlyingConnectionIfNot() throws InterruptedException {
94-
connectionLock.lockInterruptibly();
95-
try {
85+
private AsyncSubject<Connection> createUnderlyingConnectionIfNot() {
86+
synchronized (this) {
9687
if (connectionAsyncSubject == null || connectionAsyncSubject.hasThrowable()) {
9788
connectionAsyncSubject =
9889
factory
@@ -103,14 +94,11 @@ private AsyncSubject<Connection> createUnderlyingConnectionIfNot() throws Interr
10394
}
10495

10596
return connectionAsyncSubject;
106-
} finally {
107-
connectionLock.unlock();
10897
}
10998
}
11099

111100
private Single<? extends Connection> acquireConnection() {
112-
return Single.fromCallable(this::createUnderlyingConnectionIfNot)
113-
.flatMap(Single::fromObservable);
101+
return Single.fromObservable(createUnderlyingConnectionIfNot());
114102
}
115103

116104
/**

src/test/java/com/google/devtools/build/lib/remote/grpc/BUILD

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ java_test(
2222
deps = [
2323
"//src/main/java/com/google/devtools/build/lib/remote/grpc",
2424
"//src/test/java/com/google/devtools/build/lib:test_runner",
25+
"//src/test/java/com/google/devtools/build/lib/remote/util",
2526
"//third_party:guava",
2627
"//third_party:junit4",
2728
"//third_party:mockito",

src/test/java/com/google/devtools/build/lib/remote/grpc/SharedConnectionFactoryTest.java

+31-39
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,14 @@
2020
import static org.mockito.Mockito.when;
2121

2222
import com.google.devtools.build.lib.remote.grpc.SharedConnectionFactory.SharedConnection;
23+
import com.google.devtools.build.lib.remote.util.RxNoGlobalErrorsRule;
2324
import io.reactivex.rxjava3.core.Single;
2425
import io.reactivex.rxjava3.observers.TestObserver;
25-
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
2626
import java.io.IOException;
2727
import java.util.concurrent.Semaphore;
2828
import java.util.concurrent.atomic.AtomicBoolean;
2929
import java.util.concurrent.atomic.AtomicInteger;
3030
import java.util.concurrent.atomic.AtomicReference;
31-
import org.junit.After;
3231
import org.junit.Before;
3332
import org.junit.Rule;
3433
import org.junit.Test;
@@ -42,28 +41,16 @@
4241
@RunWith(JUnit4.class)
4342
public class SharedConnectionFactoryTest {
4443
@Rule public final MockitoRule mockito = MockitoJUnit.rule();
45-
46-
private final AtomicReference<Throwable> rxGlobalThrowable = new AtomicReference<>(null);
44+
@Rule public final RxNoGlobalErrorsRule rxNoGlobalErrorsRule = new RxNoGlobalErrorsRule();
4745

4846
@Mock private Connection connection;
4947
@Mock private ConnectionFactory connectionFactory;
5048

5149
@Before
5250
public void setUp() {
53-
RxJavaPlugins.setErrorHandler(rxGlobalThrowable::set);
54-
5551
when(connectionFactory.create()).thenAnswer(invocation -> Single.just(connection));
5652
}
5753

58-
@After
59-
public void tearDown() throws Throwable {
60-
// Make sure rxjava didn't receive global errors
61-
Throwable t = rxGlobalThrowable.getAndSet(null);
62-
if (t != null) {
63-
throw t;
64-
}
65-
}
66-
6754
@Test
6855
public void create_smoke() {
6956
SharedConnectionFactory factory = new SharedConnectionFactory(connectionFactory, 1);
@@ -125,32 +112,37 @@ public void create_belowMaxConcurrency_shareConnections() {
125112

126113
@Test
127114
public void create_concurrentCreate_shareConnections() throws InterruptedException {
128-
SharedConnectionFactory factory = new SharedConnectionFactory(connectionFactory, 2);
129-
Semaphore semaphore = new Semaphore(0);
130-
AtomicBoolean finished = new AtomicBoolean(false);
131-
Thread t =
132-
new Thread(
133-
() -> {
134-
factory
135-
.create()
136-
.doOnSuccess(
137-
conn -> {
138-
assertThat(conn.getUnderlyingConnection()).isEqualTo(connection);
139-
semaphore.release();
140-
Thread.sleep(Integer.MAX_VALUE);
141-
finished.set(true);
142-
})
143-
.blockingSubscribe();
144-
145-
finished.set(true);
146-
});
147-
t.start();
148-
semaphore.acquire();
115+
int maxConcurrency = 10;
116+
SharedConnectionFactory factory =
117+
new SharedConnectionFactory(connectionFactory, maxConcurrency);
118+
AtomicReference<Throwable> error = new AtomicReference<>(null);
119+
Runnable runnable =
120+
() -> {
121+
try {
122+
TestObserver<SharedConnection> observer = factory.create().test();
123+
124+
observer
125+
.assertNoErrors()
126+
.assertValue(conn -> conn.getUnderlyingConnection() == connection)
127+
.assertComplete();
128+
} catch (Throwable e) {
129+
error.set(e);
130+
}
131+
};
132+
Thread[] threads = new Thread[maxConcurrency];
133+
for (int i = 0; i < threads.length; ++i) {
134+
threads[i] = new Thread(runnable);
135+
}
149136

150-
TestObserver<SharedConnection> observer = factory.create().test();
137+
for (Thread thread : threads) {
138+
thread.start();
139+
}
140+
for (Thread thread : threads) {
141+
thread.join();
142+
}
151143

152-
observer.assertValue(conn -> conn.getUnderlyingConnection() == connection).assertComplete();
153-
assertThat(finished.get()).isFalse();
144+
assertThat(error.get()).isNull();
145+
verify(connectionFactory, times(1)).create();
154146
}
155147

156148
@Test

0 commit comments

Comments
 (0)