Skip to content

Commit 90965b0

Browse files
authored
Stop remote blob upload if upload is complete. (bazelbuild#14467)
If a ByteStream/Write RPC fails, but ByteStream/QueryWriteStatus reveals the upload is in fact complete, avoid a NullPointerException. This CL is the dual fix of 78b89a0 for uploads. On bazel-6.0.0-pre.20211117.1, I observed: ``` java.lang.NullPointerException at com.google.devtools.build.lib.remote.Chunker.seek(Chunker.java:156) at com.google.devtools.build.lib.remote.ByteStreamUploader$AsyncUpload.lambda$start$0(ByteStreamUploader.java:416) at com.google.devtools.build.lib.remote.Retrier.executeAsync(Retrier.java:277) at com.google.devtools.build.lib.remote.Retrier.lambda$onExecuteAsyncFailure$1(Retrier.java:293) at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleAsyncTask.runInterruptibly(TrustedListenableFutureTask.java:160) at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleAsyncTask.runInterruptibly(TrustedListenableFutureTask.java:143) at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69) at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69) at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ``` Closes bazelbuild#14464. PiperOrigin-RevId: 417795715
1 parent 7deb940 commit 90965b0

File tree

2 files changed

+67
-5
lines changed

2 files changed

+67
-5
lines changed

src/main/java/com/google/devtools/build/lib/remote/ByteStreamUploader.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ public ListenableFuture<Void> uploadBlobAsync(
248248
checkState(!isShutdown, "Must not call uploadBlobs after shutdown.");
249249

250250
if (!forceUpload && uploadedBlobs.contains(HashCode.fromString(digest.getHash()))) {
251-
return Futures.immediateFuture(null);
251+
return immediateVoidFuture();
252252
}
253253

254254
ListenableFuture<Void> inProgress = uploadsInProgress.get(digest);
@@ -410,7 +410,7 @@ ListenableFuture<Void> start() {
410410
() ->
411411
retrier.executeAsync(
412412
() -> {
413-
if (chunker.getSize() == 0) {
413+
if (chunker.getSize() == committedOffset.get()) {
414414
return immediateVoidFuture();
415415
}
416416
try {
@@ -426,7 +426,7 @@ ListenableFuture<Void> start() {
426426
if (chunker.hasNext()) {
427427
return callAndQueryOnFailure(committedOffset, progressiveBackoff);
428428
}
429-
return Futures.immediateFuture(null);
429+
return immediateVoidFuture();
430430
},
431431
progressiveBackoff),
432432
callCredentialsProvider);
@@ -448,7 +448,7 @@ ListenableFuture<Void> start() {
448448
return Futures.immediateFailedFuture(new IOException(message));
449449
}
450450
}
451-
return Futures.immediateFuture(null);
451+
return immediateVoidFuture();
452452
},
453453
MoreExecutors.directExecutor());
454454
}
@@ -536,7 +536,7 @@ private ListenableFuture<Void> query(
536536
progressiveBackoff.reset();
537537
}
538538
committedOffset.set(committedSize);
539-
return Futures.immediateFuture(null);
539+
return immediateVoidFuture();
540540
},
541541
MoreExecutors.directExecutor());
542542
}

src/test/java/com/google/devtools/build/lib/remote/ByteStreamUploaderTest.java

+62
Original file line numberDiff line numberDiff line change
@@ -1517,6 +1517,68 @@ public void onCompleted() {
15171517
blockUntilInternalStateConsistent(uploader);
15181518
}
15191519

1520+
@Test
1521+
public void failureAfterUploadCompletes() throws Exception {
1522+
AtomicInteger numUploads = new AtomicInteger();
1523+
RemoteRetrier retrier =
1524+
TestUtils.newRemoteRetrier(
1525+
() -> mockBackoff, e -> e instanceof StatusRuntimeException, retryService);
1526+
ByteStreamUploader uploader =
1527+
new ByteStreamUploader(
1528+
INSTANCE_NAME,
1529+
new ReferenceCountedChannel(channelConnectionFactory),
1530+
CallCredentialsProvider.NO_CREDENTIALS,
1531+
/* callTimeoutSecs= */ 60,
1532+
retrier);
1533+
1534+
byte[] blob = new byte[CHUNK_SIZE - 1];
1535+
new Random().nextBytes(blob);
1536+
1537+
serviceRegistry.addService(
1538+
new ByteStreamImplBase() {
1539+
@Override
1540+
public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
1541+
numUploads.incrementAndGet();
1542+
return new StreamObserver<WriteRequest>() {
1543+
@Override
1544+
public void onNext(WriteRequest writeRequest) {}
1545+
1546+
@Override
1547+
public void onError(Throwable throwable) {
1548+
fail("onError should never be called.");
1549+
}
1550+
1551+
@Override
1552+
public void onCompleted() {
1553+
streamObserver.onNext(
1554+
WriteResponse.newBuilder().setCommittedSize(blob.length).build());
1555+
streamObserver.onError(Status.UNAVAILABLE.asException());
1556+
}
1557+
};
1558+
}
1559+
1560+
@Override
1561+
public void queryWriteStatus(
1562+
QueryWriteStatusRequest request, StreamObserver<QueryWriteStatusResponse> response) {
1563+
response.onNext(
1564+
QueryWriteStatusResponse.newBuilder()
1565+
.setCommittedSize(blob.length)
1566+
.setComplete(true)
1567+
.build());
1568+
response.onCompleted();
1569+
}
1570+
});
1571+
1572+
Chunker chunker = Chunker.builder().setInput(blob).setChunkSize(CHUNK_SIZE).build();
1573+
HashCode hash = HashCode.fromString(DIGEST_UTIL.compute(blob).getHash());
1574+
1575+
uploader.uploadBlob(context, hash, chunker, true);
1576+
1577+
blockUntilInternalStateConsistent(uploader);
1578+
1579+
assertThat(numUploads.get()).isEqualTo(1);
1580+
}
1581+
15201582
@Test
15211583
public void testCompressedUploads() throws Exception {
15221584
RemoteRetrier retrier =

0 commit comments

Comments
 (0)