Skip to content

Commit 5aef53a

Browse files
authored
Remote: Don't blocking-get when acquiring gRPC connections. (#14420)
With recent change to limit the max number of gRPC connections by default, acquiring a connection could suspend a thread if there is no available connection. gRPC calls are scheduled to a dedicated background thread pool. Workers in the thread pool are responsible to acquire the connection before starting the RPC call. There could be a race condition that a worker thread handles some gRPC calls and then switches to a new call which will acquire new connections. If the number of connections reaches the max, the worker thread is suspended and doesn't have a chance to switch to previous calls. The connections held by previous calls are, hence, never released. This PR changes to not use blocking get when acquiring gRPC connections. Fixes #14363. Closes #14416. PiperOrigin-RevId: 416282883
1 parent bfc2413 commit 5aef53a

11 files changed

+188
-166
lines changed

src/main/java/com/google/devtools/build/lib/remote/BUILD

+2-1
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,10 @@ java_library(
138138
],
139139
deps = [
140140
"//src/main/java/com/google/devtools/build/lib/remote/grpc",
141+
"//src/main/java/com/google/devtools/build/lib/remote/util",
141142
"//third_party:guava",
142-
"//third_party:jsr305",
143143
"//third_party:netty",
144+
"//third_party:rxjava3",
144145
"//third_party/grpc:grpc-jar",
145146
],
146147
)

src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java

+19-11
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.bytestream.ByteStreamGrpc;
2525
import com.google.bytestream.ByteStreamGrpc.ByteStreamFutureStub;
2626
import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest;
27+
import com.google.bytestream.ByteStreamProto.QueryWriteStatusResponse;
2728
import com.google.bytestream.ByteStreamProto.WriteRequest;
2829
import com.google.bytestream.ByteStreamProto.WriteResponse;
2930
import com.google.common.annotations.VisibleForTesting;
@@ -374,7 +375,7 @@ public ReferenceCounted touch(Object o) {
374375
private static class AsyncUpload {
375376

376377
private final RemoteActionExecutionContext context;
377-
private final Channel channel;
378+
private final ReferenceCountedChannel channel;
378379
private final CallCredentialsProvider callCredentialsProvider;
379380
private final long callTimeoutSecs;
380381
private final Retrier retrier;
@@ -385,7 +386,7 @@ private static class AsyncUpload {
385386

386387
AsyncUpload(
387388
RemoteActionExecutionContext context,
388-
Channel channel,
389+
ReferenceCountedChannel channel,
389390
CallCredentialsProvider callCredentialsProvider,
390391
long callTimeoutSecs,
391392
Retrier retrier,
@@ -452,7 +453,7 @@ ListenableFuture<Void> start() {
452453
MoreExecutors.directExecutor());
453454
}
454455

455-
private ByteStreamFutureStub bsFutureStub() {
456+
private ByteStreamFutureStub bsFutureStub(Channel channel) {
456457
return ByteStreamGrpc.newFutureStub(channel)
457458
.withInterceptors(
458459
TracingMetadataUtils.attachMetadataInterceptor(context.getRequestMetadata()))
@@ -463,7 +464,10 @@ private ByteStreamFutureStub bsFutureStub() {
463464
private ListenableFuture<Void> callAndQueryOnFailure(
464465
AtomicLong committedOffset, ProgressiveBackoff progressiveBackoff) {
465466
return Futures.catchingAsync(
466-
call(committedOffset),
467+
Futures.transform(
468+
channel.withChannelFuture(channel -> call(committedOffset, channel)),
469+
written -> null,
470+
MoreExecutors.directExecutor()),
467471
Exception.class,
468472
(e) -> guardQueryWithSuppression(e, committedOffset, progressiveBackoff),
469473
MoreExecutors.directExecutor());
@@ -500,10 +504,14 @@ private ListenableFuture<Void> query(
500504
AtomicLong committedOffset, ProgressiveBackoff progressiveBackoff) {
501505
ListenableFuture<Long> committedSizeFuture =
502506
Futures.transform(
503-
bsFutureStub()
504-
.queryWriteStatus(
505-
QueryWriteStatusRequest.newBuilder().setResourceName(resourceName).build()),
506-
(response) -> response.getCommittedSize(),
507+
channel.withChannelFuture(
508+
channel ->
509+
bsFutureStub(channel)
510+
.queryWriteStatus(
511+
QueryWriteStatusRequest.newBuilder()
512+
.setResourceName(resourceName)
513+
.build())),
514+
QueryWriteStatusResponse::getCommittedSize,
507515
MoreExecutors.directExecutor());
508516
ListenableFuture<Long> guardedCommittedSizeFuture =
509517
Futures.catchingAsync(
@@ -533,14 +541,14 @@ private ListenableFuture<Void> query(
533541
MoreExecutors.directExecutor());
534542
}
535543

536-
private ListenableFuture<Void> call(AtomicLong committedOffset) {
544+
private ListenableFuture<Long> call(AtomicLong committedOffset, Channel channel) {
537545
CallOptions callOptions =
538546
CallOptions.DEFAULT
539547
.withCallCredentials(callCredentialsProvider.getCallCredentials())
540548
.withDeadlineAfter(callTimeoutSecs, SECONDS);
541549
call = channel.newCall(ByteStreamGrpc.getWriteMethod(), callOptions);
542550

543-
SettableFuture<Void> uploadResult = SettableFuture.create();
551+
SettableFuture<Long> uploadResult = SettableFuture.create();
544552
ClientCall.Listener<WriteResponse> callListener =
545553
new ClientCall.Listener<WriteResponse>() {
546554

@@ -568,7 +576,7 @@ public void onMessage(WriteResponse response) {
568576
@Override
569577
public void onClose(Status status, Metadata trailers) {
570578
if (status.isOk()) {
571-
uploadResult.set(null);
579+
uploadResult.set(committedOffset.get());
572580
} else {
573581
uploadResult.setException(status.asRuntimeException());
574582
}

src/main/java/com/google/devtools/build/lib/remote/ExperimentalGrpcRemoteExecutor.java

+30-14
Original file line numberDiff line numberDiff line change
@@ -35,12 +35,13 @@
3535
import com.google.longrunning.Operation;
3636
import com.google.longrunning.Operation.ResultCase;
3737
import com.google.rpc.Status;
38+
import io.grpc.Channel;
3839
import io.grpc.Status.Code;
3940
import io.grpc.StatusRuntimeException;
41+
import io.reactivex.rxjava3.functions.Function;
4042
import java.io.IOException;
4143
import java.util.Iterator;
4244
import java.util.concurrent.atomic.AtomicBoolean;
43-
import java.util.function.Supplier;
4445
import javax.annotation.Nullable;
4546

4647
/**
@@ -73,7 +74,7 @@ public ExperimentalGrpcRemoteExecutor(
7374
this.retrier = retrier;
7475
}
7576

76-
private ExecutionBlockingStub executionBlockingStub(RequestMetadata metadata) {
77+
private ExecutionBlockingStub executionBlockingStub(RequestMetadata metadata, Channel channel) {
7778
return ExecutionGrpc.newBlockingStub(channel)
7879
.withInterceptors(TracingMetadataUtils.attachMetadataInterceptor(metadata))
7980
.withCallCredentials(callCredentialsProvider.getCallCredentials())
@@ -90,7 +91,8 @@ private static class Execution {
9091
// Count retry times for WaitExecution() calls and is reset when we receive any response from
9192
// the server that is not an error.
9293
private final ProgressiveBackoff waitExecutionBackoff;
93-
private final Supplier<ExecutionBlockingStub> executionBlockingStubSupplier;
94+
private final Function<ExecuteRequest, Iterator<Operation>> executeFunction;
95+
private final Function<WaitExecutionRequest, Iterator<Operation>> waitExecutionFunction;
9496

9597
// Last response (without error) we received from server.
9698
private Operation lastOperation;
@@ -100,14 +102,16 @@ private static class Execution {
100102
OperationObserver observer,
101103
RemoteRetrier retrier,
102104
CallCredentialsProvider callCredentialsProvider,
103-
Supplier<ExecutionBlockingStub> executionBlockingStubSupplier) {
105+
Function<ExecuteRequest, Iterator<Operation>> executeFunction,
106+
Function<WaitExecutionRequest, Iterator<Operation>> waitExecutionFunction) {
104107
this.request = request;
105108
this.observer = observer;
106109
this.retrier = retrier;
107110
this.callCredentialsProvider = callCredentialsProvider;
108111
this.executeBackoff = this.retrier.newBackoff();
109112
this.waitExecutionBackoff = new ProgressiveBackoff(this.retrier::newBackoff);
110-
this.executionBlockingStubSupplier = executionBlockingStubSupplier;
113+
this.executeFunction = executeFunction;
114+
this.waitExecutionFunction = waitExecutionFunction;
111115
}
112116

113117
ExecuteResponse start() throws IOException, InterruptedException {
@@ -168,9 +172,9 @@ ExecuteResponse execute() throws IOException {
168172
Preconditions.checkState(lastOperation == null);
169173

170174
try {
171-
Iterator<Operation> operationStream = executionBlockingStubSupplier.get().execute(request);
175+
Iterator<Operation> operationStream = executeFunction.apply(request);
172176
return handleOperationStream(operationStream);
173-
} catch (StatusRuntimeException e) {
177+
} catch (Throwable e) {
174178
// If lastOperation is not null, we know the execution request is accepted by the server. In
175179
// this case, we will fallback to WaitExecution() loop when the stream is broken.
176180
if (lastOperation != null) {
@@ -188,17 +192,20 @@ ExecuteResponse waitExecution() throws IOException {
188192
WaitExecutionRequest request =
189193
WaitExecutionRequest.newBuilder().setName(lastOperation.getName()).build();
190194
try {
191-
Iterator<Operation> operationStream =
192-
executionBlockingStubSupplier.get().waitExecution(request);
195+
Iterator<Operation> operationStream = waitExecutionFunction.apply(request);
193196
return handleOperationStream(operationStream);
194-
} catch (StatusRuntimeException e) {
197+
} catch (Throwable e) {
195198
// A NOT_FOUND error means Operation was lost on the server, retry Execute().
196199
//
197200
// However, we only retry Execute() if executeBackoff should retry. Also increase the retry
198201
// counter at the same time (done by nextDelayMillis()).
199-
if (e.getStatus().getCode() == Code.NOT_FOUND && executeBackoff.nextDelayMillis(e) >= 0) {
200-
lastOperation = null;
201-
return null;
202+
if (e instanceof StatusRuntimeException) {
203+
StatusRuntimeException sre = (StatusRuntimeException) e;
204+
if (sre.getStatus().getCode() == Code.NOT_FOUND
205+
&& executeBackoff.nextDelayMillis(sre) >= 0) {
206+
lastOperation = null;
207+
return null;
208+
}
202209
}
203210
throw new IOException(e);
204211
}
@@ -321,7 +328,16 @@ public ExecuteResponse executeRemotely(
321328
observer,
322329
retrier,
323330
callCredentialsProvider,
324-
() -> this.executionBlockingStub(context.getRequestMetadata()));
331+
(req) ->
332+
channel.withChannelBlocking(
333+
channel ->
334+
this.executionBlockingStub(context.getRequestMetadata(), channel)
335+
.execute(req)),
336+
(req) ->
337+
channel.withChannelBlocking(
338+
channel ->
339+
this.executionBlockingStub(context.getRequestMetadata(), channel)
340+
.waitExecution(req)));
325341
return execution.start();
326342
}
327343

src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java

+42-21
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import com.google.devtools.build.lib.remote.zstd.ZstdDecompressingOutputStream;
5757
import com.google.devtools.build.lib.vfs.Path;
5858
import com.google.protobuf.ByteString;
59+
import io.grpc.Channel;
5960
import io.grpc.Status;
6061
import io.grpc.Status.Code;
6162
import io.grpc.StatusRuntimeException;
@@ -122,7 +123,8 @@ private int computeMaxMissingBlobsDigestsPerMessage() {
122123
return (options.maxOutboundMessageSize - overhead) / digestSize;
123124
}
124125

125-
private ContentAddressableStorageFutureStub casFutureStub(RemoteActionExecutionContext context) {
126+
private ContentAddressableStorageFutureStub casFutureStub(
127+
RemoteActionExecutionContext context, Channel channel) {
126128
return ContentAddressableStorageGrpc.newFutureStub(channel)
127129
.withInterceptors(
128130
TracingMetadataUtils.attachMetadataInterceptor(context.getRequestMetadata()),
@@ -131,7 +133,7 @@ private ContentAddressableStorageFutureStub casFutureStub(RemoteActionExecutionC
131133
.withDeadlineAfter(options.remoteTimeout.getSeconds(), TimeUnit.SECONDS);
132134
}
133135

134-
private ByteStreamStub bsAsyncStub(RemoteActionExecutionContext context) {
136+
private ByteStreamStub bsAsyncStub(RemoteActionExecutionContext context, Channel channel) {
135137
return ByteStreamGrpc.newStub(channel)
136138
.withInterceptors(
137139
TracingMetadataUtils.attachMetadataInterceptor(context.getRequestMetadata()),
@@ -140,7 +142,8 @@ private ByteStreamStub bsAsyncStub(RemoteActionExecutionContext context) {
140142
.withDeadlineAfter(options.remoteTimeout.getSeconds(), TimeUnit.SECONDS);
141143
}
142144

143-
private ActionCacheFutureStub acFutureStub(RemoteActionExecutionContext context) {
145+
private ActionCacheFutureStub acFutureStub(
146+
RemoteActionExecutionContext context, Channel channel) {
144147
return ActionCacheGrpc.newFutureStub(channel)
145148
.withInterceptors(
146149
TracingMetadataUtils.attachMetadataInterceptor(context.getRequestMetadata()),
@@ -222,7 +225,11 @@ public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(
222225
private ListenableFuture<FindMissingBlobsResponse> getMissingDigests(
223226
RemoteActionExecutionContext context, FindMissingBlobsRequest request) {
224227
return Utils.refreshIfUnauthenticatedAsync(
225-
() -> retrier.executeAsync(() -> casFutureStub(context).findMissingBlobs(request)),
228+
() ->
229+
retrier.executeAsync(
230+
() ->
231+
channel.withChannelFuture(
232+
channel -> casFutureStub(context, channel).findMissingBlobs(request))),
226233
callCredentialsProvider);
227234
}
228235

@@ -254,7 +261,10 @@ public ListenableFuture<CachedActionResult> downloadActionResult(
254261
return Utils.refreshIfUnauthenticatedAsync(
255262
() ->
256263
retrier.executeAsync(
257-
() -> handleStatus(acFutureStub(context).getActionResult(request))),
264+
() ->
265+
handleStatus(
266+
channel.withChannelFuture(
267+
channel -> acFutureStub(context, channel).getActionResult(request)))),
258268
callCredentialsProvider);
259269
}
260270

@@ -267,13 +277,15 @@ public ListenableFuture<Void> uploadActionResult(
267277
retrier.executeAsync(
268278
() ->
269279
Futures.catchingAsync(
270-
acFutureStub(context)
271-
.updateActionResult(
272-
UpdateActionResultRequest.newBuilder()
273-
.setInstanceName(options.remoteInstanceName)
274-
.setActionDigest(actionKey.getDigest())
275-
.setActionResult(actionResult)
276-
.build()),
280+
channel.withChannelFuture(
281+
channel ->
282+
acFutureStub(context, channel)
283+
.updateActionResult(
284+
UpdateActionResultRequest.newBuilder()
285+
.setInstanceName(options.remoteInstanceName)
286+
.setActionDigest(actionKey.getDigest())
287+
.setActionResult(actionResult)
288+
.build())),
277289
StatusRuntimeException.class,
278290
(sre) -> Futures.immediateFailedFuture(new IOException(sre)),
279291
MoreExecutors.directExecutor())),
@@ -317,18 +329,26 @@ private ListenableFuture<Void> downloadBlob(
317329
@Nullable Supplier<Digest> digestSupplier) {
318330
AtomicLong offset = new AtomicLong(0);
319331
ProgressiveBackoff progressiveBackoff = new ProgressiveBackoff(retrier::newBackoff);
320-
ListenableFuture<Void> downloadFuture =
332+
ListenableFuture<Long> downloadFuture =
321333
Utils.refreshIfUnauthenticatedAsync(
322334
() ->
323335
retrier.executeAsync(
324336
() ->
325-
requestRead(
326-
context, offset, progressiveBackoff, digest, out, digestSupplier),
337+
channel.withChannelFuture(
338+
channel ->
339+
requestRead(
340+
context,
341+
offset,
342+
progressiveBackoff,
343+
digest,
344+
out,
345+
digestSupplier,
346+
channel)),
327347
progressiveBackoff),
328348
callCredentialsProvider);
329349

330350
return Futures.catchingAsync(
331-
downloadFuture,
351+
Futures.transform(downloadFuture, bytesWritten -> null, MoreExecutors.directExecutor()),
332352
StatusRuntimeException.class,
333353
(e) -> Futures.immediateFailedFuture(new IOException(e)),
334354
MoreExecutors.directExecutor());
@@ -343,17 +363,18 @@ public static String getResourceName(String instanceName, Digest digest, boolean
343363
return resourceName + DigestUtil.toString(digest);
344364
}
345365

346-
private ListenableFuture<Void> requestRead(
366+
private ListenableFuture<Long> requestRead(
347367
RemoteActionExecutionContext context,
348368
AtomicLong offset,
349369
ProgressiveBackoff progressiveBackoff,
350370
Digest digest,
351371
CountingOutputStream out,
352-
@Nullable Supplier<Digest> digestSupplier) {
372+
@Nullable Supplier<Digest> digestSupplier,
373+
Channel channel) {
353374
String resourceName =
354375
getResourceName(options.remoteInstanceName, digest, options.cacheCompression);
355-
SettableFuture<Void> future = SettableFuture.create();
356-
bsAsyncStub(context)
376+
SettableFuture<Long> future = SettableFuture.create();
377+
bsAsyncStub(context, channel)
357378
.read(
358379
ReadRequest.newBuilder()
359380
.setResourceName(resourceName)
@@ -400,7 +421,7 @@ public void onCompleted() {
400421
Utils.verifyBlobContents(digest, digestSupplier.get());
401422
}
402423
out.flush();
403-
future.set(null);
424+
future.set(offset.get());
404425
} catch (IOException e) {
405426
future.setException(e);
406427
} catch (RuntimeException e) {

src/main/java/com/google/devtools/build/lib/remote/GrpcRemoteExecutor.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.google.devtools.build.lib.remote.util.Utils;
3131
import com.google.longrunning.Operation;
3232
import com.google.rpc.Status;
33+
import io.grpc.Channel;
3334
import io.grpc.Status.Code;
3435
import io.grpc.StatusRuntimeException;
3536
import java.io.IOException;
@@ -57,7 +58,7 @@ public GrpcRemoteExecutor(
5758
this.retrier = retrier;
5859
}
5960

60-
private ExecutionBlockingStub execBlockingStub(RequestMetadata metadata) {
61+
private ExecutionBlockingStub execBlockingStub(RequestMetadata metadata, Channel channel) {
6162
return ExecutionGrpc.newBlockingStub(channel)
6263
.withInterceptors(TracingMetadataUtils.attachMetadataInterceptor(metadata))
6364
.withCallCredentials(callCredentialsProvider.getCallCredentials());
@@ -152,9 +153,17 @@ public ExecuteResponse executeRemotely(
152153
WaitExecutionRequest.newBuilder()
153154
.setName(operation.get().getName())
154155
.build();
155-
replies = execBlockingStub(context.getRequestMetadata()).waitExecution(wr);
156+
replies =
157+
channel.withChannelBlocking(
158+
channel ->
159+
execBlockingStub(context.getRequestMetadata(), channel)
160+
.waitExecution(wr));
156161
} else {
157-
replies = execBlockingStub(context.getRequestMetadata()).execute(request);
162+
replies =
163+
channel.withChannelBlocking(
164+
channel ->
165+
execBlockingStub(context.getRequestMetadata(), channel)
166+
.execute(request));
158167
}
159168
try {
160169
while (replies.hasNext()) {

0 commit comments

Comments
 (0)