Skip to content

Commit ba9e2f8

Browse files
benjaminpcopybara-github
authored andcommitted
Remove usage of gRPC Context cancellation in the remote execution client.
The gRPC remote execution client frequently "converts" gRPC calls into `ListenableFuture`s by setting a `SettableFuture` in the `onCompleted` or `onError` gRPC stub callbacks. If the future has direct executor callbacks, those callbacks will execute with the gRPC Context of the freshly completed call. That is problematic if the `Context` was canceled (canceling the call `Context` is good hygiene after completing a gRPC call), and the future callback goes to make further gRPC calls. Therefore, this change removes all usage of gRPC `Context` cancellation. It would be nice if there was instead some way to avoid leaking `Context`s between calls instead of having totally forswear `Context` cancellation. However, I can't see a good way to enforce proper isolation. Fixes bazelbuild#17298. Closes bazelbuild#17426. PiperOrigin-RevId: 507730469 Change-Id: Iea74acad4592952700e41d34672f6478de509d5e
1 parent 7a9a2f8 commit ba9e2f8

File tree

2 files changed

+96
-97
lines changed

2 files changed

+96
-97
lines changed

src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java

+13-21
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@
4242
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
4343
import com.google.devtools.build.lib.remote.util.Utils;
4444
import io.grpc.Channel;
45-
import io.grpc.Context;
46-
import io.grpc.Context.CancellableContext;
4745
import io.grpc.Status;
4846
import io.grpc.Status.Code;
4947
import io.grpc.StatusRuntimeException;
@@ -231,7 +229,6 @@ private ListenableFuture<Void> startAsyncUpload(
231229
ListenableFuture<Void> currUpload = newUpload.start();
232230
currUpload.addListener(
233231
() -> {
234-
newUpload.cancel();
235232
if (openedFilePermits != null) {
236233
openedFilePermits.release();
237234
}
@@ -249,7 +246,6 @@ private static final class AsyncUpload implements AsyncCallable<Long> {
249246
private final String resourceName;
250247
private final Chunker chunker;
251248
private final ProgressiveBackoff progressiveBackoff;
252-
private final CancellableContext grpcContext;
253249

254250
private long lastCommittedOffset = -1;
255251

@@ -269,7 +265,6 @@ private static final class AsyncUpload implements AsyncCallable<Long> {
269265
this.progressiveBackoff = new ProgressiveBackoff(retrier::newBackoff);
270266
this.resourceName = resourceName;
271267
this.chunker = chunker;
272-
this.grpcContext = Context.current().withCancellation();
273268
}
274269

275270
ListenableFuture<Void> start() {
@@ -369,13 +364,11 @@ private ListenableFuture<Long> query() {
369364
Futures.transform(
370365
channel.withChannelFuture(
371366
channel ->
372-
grpcContext.call(
373-
() ->
374-
bsFutureStub(channel)
375-
.queryWriteStatus(
376-
QueryWriteStatusRequest.newBuilder()
377-
.setResourceName(resourceName)
378-
.build()))),
367+
bsFutureStub(channel)
368+
.queryWriteStatus(
369+
QueryWriteStatusRequest.newBuilder()
370+
.setResourceName(resourceName)
371+
.build())),
379372
QueryWriteStatusResponse::getCommittedSize,
380373
MoreExecutors.directExecutor());
381374
return Futures.catchingAsync(
@@ -397,18 +390,10 @@ private ListenableFuture<Long> upload(long pos) {
397390
return channel.withChannelFuture(
398391
channel -> {
399392
SettableFuture<Long> uploadResult = SettableFuture.create();
400-
grpcContext.run(
401-
() ->
402-
bsAsyncStub(channel)
403-
.write(new Writer(resourceName, chunker, pos, uploadResult)));
393+
bsAsyncStub(channel).write(new Writer(resourceName, chunker, pos, uploadResult));
404394
return uploadResult;
405395
});
406396
}
407-
408-
void cancel() {
409-
grpcContext.cancel(
410-
Status.CANCELLED.withDescription("Cancelled by user").asRuntimeException());
411-
}
412397
}
413398

414399
private static final class Writer
@@ -432,6 +417,13 @@ private Writer(
432417
@Override
433418
public void beforeStart(ClientCallStreamObserver<WriteRequest> requestObserver) {
434419
this.requestObserver = requestObserver;
420+
uploadResult.addListener(
421+
() -> {
422+
if (uploadResult.isCancelled()) {
423+
requestObserver.cancel("cancelled by user", null);
424+
}
425+
},
426+
MoreExecutors.directExecutor());
435427
requestObserver.setOnReadyHandler(this);
436428
}
437429

src/main/java/com/google/devtools/build/lib/remote/GrpcCacheClient.java

+83-76
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.google.common.annotations.VisibleForTesting;
3535
import com.google.common.base.Ascii;
3636
import com.google.common.base.Preconditions;
37+
import com.google.common.base.VerifyException;
3738
import com.google.common.collect.ImmutableSet;
3839
import com.google.common.collect.Iterables;
3940
import com.google.common.flogger.GoogleLogger;
@@ -58,11 +59,11 @@
5859
import com.google.devtools.build.lib.vfs.Path;
5960
import com.google.protobuf.ByteString;
6061
import io.grpc.Channel;
61-
import io.grpc.Context;
6262
import io.grpc.Status;
6363
import io.grpc.Status.Code;
6464
import io.grpc.StatusRuntimeException;
65-
import io.grpc.stub.StreamObserver;
65+
import io.grpc.stub.ClientCallStreamObserver;
66+
import io.grpc.stub.ClientResponseObserver;
6667
import java.io.IOException;
6768
import java.io.OutputStream;
6869
import java.util.ArrayList;
@@ -371,81 +372,87 @@ private ListenableFuture<Long> requestRead(
371372
} catch (IOException e) {
372373
return Futures.immediateFailedFuture(e);
373374
}
374-
Context.CancellableContext grpcContext = Context.current().withCancellation();
375-
future.addListener(() -> grpcContext.cancel(null), MoreExecutors.directExecutor());
376-
grpcContext.run(
377-
() ->
378-
bsAsyncStub(context, channel)
379-
.read(
380-
ReadRequest.newBuilder()
381-
.setResourceName(resourceName)
382-
.setReadOffset(rawOut.getCount())
383-
.build(),
384-
new StreamObserver<ReadResponse>() {
385-
@Override
386-
public void onNext(ReadResponse readResponse) {
387-
ByteString data = readResponse.getData();
388-
try {
389-
data.writeTo(out);
390-
} catch (IOException e) {
391-
// Cancel the call.
392-
throw new RuntimeException(e);
393-
}
394-
// reset the stall backoff because we've made progress or been kept alive
395-
progressiveBackoff.reset();
396-
}
397-
398-
@Override
399-
public void onError(Throwable t) {
400-
if (rawOut.getCount() == digest.getSizeBytes()) {
401-
// If the file was fully downloaded, it doesn't matter if there was an
402-
// error at
403-
// the end of the stream.
404-
logger.atInfo().withCause(t).log(
405-
"ignoring error because file was fully received");
406-
onCompleted();
407-
return;
408-
}
409-
releaseOut();
410-
Status status = Status.fromThrowable(t);
411-
if (status.getCode() == Status.Code.NOT_FOUND) {
412-
future.setException(new CacheNotFoundException(digest));
413-
} else {
414-
future.setException(t);
415-
}
416-
}
417-
418-
@Override
419-
public void onCompleted() {
420-
try {
421-
try {
422-
out.flush();
423-
} finally {
424-
releaseOut();
425-
}
426-
if (digestSupplier != null) {
427-
Utils.verifyBlobContents(digest, digestSupplier.get());
428-
}
429-
} catch (IOException e) {
430-
future.setException(e);
431-
} catch (RuntimeException e) {
432-
logger.atWarning().withCause(e).log("Unexpected exception");
433-
future.setException(e);
434-
}
435-
future.set(rawOut.getCount());
375+
bsAsyncStub(context, channel)
376+
.read(
377+
ReadRequest.newBuilder()
378+
.setResourceName(resourceName)
379+
.setReadOffset(rawOut.getCount())
380+
.build(),
381+
new ClientResponseObserver<ReadRequest, ReadResponse>() {
382+
@Override
383+
public void beforeStart(ClientCallStreamObserver<ReadRequest> requestStream) {
384+
future.addListener(
385+
() -> {
386+
if (future.isCancelled()) {
387+
requestStream.cancel("canceled by user", null);
436388
}
437-
438-
private void releaseOut() {
439-
if (out instanceof ZstdDecompressingOutputStream) {
440-
try {
441-
((ZstdDecompressingOutputStream) out).closeShallow();
442-
} catch (IOException e) {
443-
logger.atWarning().withCause(e).log(
444-
"failed to cleanly close output stream");
445-
}
446-
}
447-
}
448-
}));
389+
},
390+
MoreExecutors.directExecutor());
391+
}
392+
393+
@Override
394+
public void onNext(ReadResponse readResponse) {
395+
ByteString data = readResponse.getData();
396+
try {
397+
data.writeTo(out);
398+
} catch (IOException e) {
399+
// Cancel the call.
400+
throw new VerifyException(e);
401+
}
402+
// reset the stall backoff because we've made progress or been kept alive
403+
progressiveBackoff.reset();
404+
}
405+
406+
@Override
407+
public void onError(Throwable t) {
408+
if (rawOut.getCount() == digest.getSizeBytes()) {
409+
// If the file was fully downloaded, it doesn't matter if there was an
410+
// error at
411+
// the end of the stream.
412+
logger.atInfo().withCause(t).log(
413+
"ignoring error because file was fully received");
414+
onCompleted();
415+
return;
416+
}
417+
releaseOut();
418+
Status status = Status.fromThrowable(t);
419+
if (status.getCode() == Status.Code.NOT_FOUND) {
420+
future.setException(new CacheNotFoundException(digest));
421+
} else {
422+
future.setException(t);
423+
}
424+
}
425+
426+
@Override
427+
public void onCompleted() {
428+
try {
429+
try {
430+
out.flush();
431+
} finally {
432+
releaseOut();
433+
}
434+
if (digestSupplier != null) {
435+
Utils.verifyBlobContents(digest, digestSupplier.get());
436+
}
437+
} catch (IOException e) {
438+
future.setException(e);
439+
} catch (RuntimeException e) {
440+
logger.atWarning().withCause(e).log("Unexpected exception");
441+
future.setException(e);
442+
}
443+
future.set(rawOut.getCount());
444+
}
445+
446+
private void releaseOut() {
447+
if (out instanceof ZstdDecompressingOutputStream) {
448+
try {
449+
((ZstdDecompressingOutputStream) out).closeShallow();
450+
} catch (IOException e) {
451+
logger.atWarning().withCause(e).log("failed to cleanly close output stream");
452+
}
453+
}
454+
}
455+
});
449456
return future;
450457
}
451458

0 commit comments

Comments
 (0)