Skip to content

Commit 003e2d0

Browse files
coeuvrecopybara-github
authored andcommittedSep 22, 2021
Remote: Fixes a confusion that background upload counter could increase after build finished.
At the end of a build, the number of files waiting to be uploaded could increase as other ones finished. This PR fixes that. Also, changes to only emit profile block `upload outputs` for blocking uploads. Fixes bazelbuild#13655 (comment). Closes bazelbuild#13954. PiperOrigin-RevId: 398161750
1 parent de0c6bd commit 003e2d0

16 files changed

+158
-193
lines changed
 

‎src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java

+13-69
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,7 @@
3030
import com.google.common.util.concurrent.ListenableFuture;
3131
import com.google.common.util.concurrent.MoreExecutors;
3232
import com.google.common.util.concurrent.SettableFuture;
33-
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
34-
import com.google.devtools.build.lib.actions.ActionUploadFinishedEvent;
35-
import com.google.devtools.build.lib.actions.ActionUploadStartedEvent;
3633
import com.google.devtools.build.lib.concurrent.ThreadSafety;
37-
import com.google.devtools.build.lib.events.ExtendedEventHandler;
3834
import com.google.devtools.build.lib.exec.SpawnProgressEvent;
3935
import com.google.devtools.build.lib.remote.common.BulkTransferException;
4036
import com.google.devtools.build.lib.remote.common.LazyFileOutputStream;
@@ -66,7 +62,6 @@
6662
import java.util.concurrent.atomic.AtomicLong;
6763
import java.util.regex.Matcher;
6864
import java.util.regex.Pattern;
69-
import javax.annotation.Nullable;
7065

7166
/**
7267
* A cache for storing artifacts (input and output) as well as the output of running an action.
@@ -85,7 +80,6 @@ public class RemoteCache extends AbstractReferenceCounted {
8580
private static final ListenableFuture<Void> COMPLETED_SUCCESS = immediateFuture(null);
8681
private static final ListenableFuture<byte[]> EMPTY_BYTES = immediateFuture(new byte[0]);
8782

88-
private final ExtendedEventHandler reporter;
8983
private final CountDownLatch closeCountDownLatch = new CountDownLatch(1);
9084
protected final AsyncTaskCache.NoResult<Digest> casUploadCache = AsyncTaskCache.NoResult.create();
9185

@@ -94,11 +88,9 @@ public class RemoteCache extends AbstractReferenceCounted {
9488
protected final DigestUtil digestUtil;
9589

9690
public RemoteCache(
97-
ExtendedEventHandler reporter,
9891
RemoteCacheClient cacheProtocol,
9992
RemoteOptions options,
10093
DigestUtil digestUtil) {
101-
this.reporter = reporter;
10294
this.cacheProtocol = cacheProtocol;
10395
this.options = options;
10496
this.digestUtil = digestUtil;
@@ -110,23 +102,6 @@ public CachedActionResult downloadActionResult(
110102
return getFromFuture(cacheProtocol.downloadActionResult(context, actionKey, inlineOutErr));
111103
}
112104

113-
private void postUploadStartedEvent(@Nullable ActionExecutionMetadata action, String resourceId) {
114-
if (action == null) {
115-
return;
116-
}
117-
118-
reporter.post(ActionUploadStartedEvent.create(action, resourceId));
119-
}
120-
121-
private void postUploadFinishedEvent(
122-
@Nullable ActionExecutionMetadata action, String resourceId) {
123-
if (action == null) {
124-
return;
125-
}
126-
127-
reporter.post(ActionUploadFinishedEvent.create(action, resourceId));
128-
}
129-
130105
/**
131106
* Returns a set of digests that the remote cache does not know about. The returned set is
132107
* guaranteed to be a subset of {@code digests}.
@@ -143,38 +118,14 @@ public ListenableFuture<ImmutableSet<Digest>> findMissingDigests(
143118
public ListenableFuture<Void> uploadActionResult(
144119
RemoteActionExecutionContext context, ActionKey actionKey, ActionResult actionResult) {
145120

146-
ActionExecutionMetadata action = context.getSpawnOwner();
147-
148121
Completable upload =
149-
Completable.using(
150-
() -> {
151-
String resourceId = "ac/" + actionKey.getDigest().getHash();
152-
postUploadStartedEvent(action, resourceId);
153-
return resourceId;
154-
},
155-
resourceId ->
156-
RxFutures.toCompletable(
157-
() -> cacheProtocol.uploadActionResult(context, actionKey, actionResult),
158-
directExecutor()),
159-
resourceId -> postUploadFinishedEvent(action, resourceId));
122+
RxFutures.toCompletable(
123+
() -> cacheProtocol.uploadActionResult(context, actionKey, actionResult),
124+
directExecutor());
160125

161126
return RxFutures.toListenableFuture(upload);
162127
}
163128

164-
private Completable doUploadFile(RemoteActionExecutionContext context, Digest digest, Path file) {
165-
ActionExecutionMetadata action = context.getSpawnOwner();
166-
return Completable.using(
167-
() -> {
168-
String resourceId = "cas/" + digest.getHash();
169-
postUploadStartedEvent(action, resourceId);
170-
return resourceId;
171-
},
172-
resourceId ->
173-
RxFutures.toCompletable(
174-
() -> cacheProtocol.uploadFile(context, digest, file), directExecutor()),
175-
resourceId -> postUploadFinishedEvent(action, resourceId));
176-
}
177-
178129
/**
179130
* Upload a local file to the remote cache.
180131
*
@@ -191,26 +142,15 @@ public final ListenableFuture<Void> uploadFile(
191142
return COMPLETED_SUCCESS;
192143
}
193144

194-
Completable upload = casUploadCache.executeIfNot(digest, doUploadFile(context, digest, file));
145+
Completable upload =
146+
casUploadCache.executeIfNot(
147+
digest,
148+
RxFutures.toCompletable(
149+
() -> cacheProtocol.uploadFile(context, digest, file), directExecutor()));
195150

196151
return RxFutures.toListenableFuture(upload);
197152
}
198153

199-
private Completable doUploadBlob(
200-
RemoteActionExecutionContext context, Digest digest, ByteString data) {
201-
ActionExecutionMetadata action = context.getSpawnOwner();
202-
return Completable.using(
203-
() -> {
204-
String resourceId = "cas/" + digest.getHash();
205-
postUploadStartedEvent(action, resourceId);
206-
return resourceId;
207-
},
208-
resourceId ->
209-
RxFutures.toCompletable(
210-
() -> cacheProtocol.uploadBlob(context, digest, data), directExecutor()),
211-
resourceId -> postUploadFinishedEvent(action, resourceId));
212-
}
213-
214154
/**
215155
* Upload sequence of bytes to the remote cache.
216156
*
@@ -227,7 +167,11 @@ public final ListenableFuture<Void> uploadBlob(
227167
return COMPLETED_SUCCESS;
228168
}
229169

230-
Completable upload = casUploadCache.executeIfNot(digest, doUploadBlob(context, digest, data));
170+
Completable upload =
171+
casUploadCache.executeIfNot(
172+
digest,
173+
RxFutures.toCompletable(
174+
() -> cacheProtocol.uploadBlob(context, digest, data), directExecutor()));
231175

232176
return RxFutures.toListenableFuture(upload);
233177
}

‎src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionCache.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.google.common.util.concurrent.Futures;
2323
import com.google.common.util.concurrent.ListenableFuture;
2424
import com.google.common.util.concurrent.MoreExecutors;
25-
import com.google.devtools.build.lib.events.ExtendedEventHandler;
2625
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
2726
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
2827
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
@@ -43,11 +42,10 @@
4342
public class RemoteExecutionCache extends RemoteCache {
4443

4544
public RemoteExecutionCache(
46-
ExtendedEventHandler reporter,
4745
RemoteCacheClient protocolImpl,
4846
RemoteOptions options,
4947
DigestUtil digestUtil) {
50-
super(reporter, protocolImpl, options, digestUtil);
48+
super(protocolImpl, options, digestUtil);
5149
}
5250

5351
/**

‎src/main/java/com/google/devtools/build/lib/remote/RemoteExecutionService.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import com.google.devtools.build.lib.events.Reporter;
8585
import com.google.devtools.build.lib.exec.SpawnRunner.SpawnExecutionContext;
8686
import com.google.devtools.build.lib.profiler.Profiler;
87+
import com.google.devtools.build.lib.profiler.ProfilerTask;
8788
import com.google.devtools.build.lib.profiler.SilentCloseable;
8889
import com.google.devtools.build.lib.remote.RemoteExecutionService.ActionResultMetadata.DirectoryMetadata;
8990
import com.google.devtools.build.lib.remote.RemoteExecutionService.ActionResultMetadata.FileMetadata;
@@ -1070,7 +1071,8 @@ public void uploadOutputs(RemoteAction action, SpawnResult spawnResult)
10701071
Single.using(
10711072
remoteCache::retain,
10721073
remoteCache ->
1073-
manifest.uploadAsync(action.getRemoteActionExecutionContext(), remoteCache),
1074+
manifest.uploadAsync(
1075+
action.getRemoteActionExecutionContext(), remoteCache, reporter),
10741076
RemoteCache::release)
10751077
.subscribeOn(scheduler)
10761078
.subscribe(
@@ -1087,7 +1089,10 @@ public void onError(@NonNull Throwable e) {
10871089
}
10881090
});
10891091
} else {
1090-
manifest.upload(action.getRemoteActionExecutionContext(), remoteCache);
1092+
try (SilentCloseable c =
1093+
Profiler.instance().profile(ProfilerTask.UPLOAD_TIME, "upload outputs")) {
1094+
manifest.upload(action.getRemoteActionExecutionContext(), remoteCache, reporter);
1095+
}
10911096
}
10921097
} catch (IOException e) {
10931098
reportUploadError(e);

‎src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,7 @@ private void initHttpAndDiskCache(
234234
handleInitFailure(env, e, Code.CACHE_INIT_FAILURE);
235235
return;
236236
}
237-
RemoteCache remoteCache =
238-
new RemoteCache(env.getReporter(), cacheClient, remoteOptions, digestUtil);
237+
RemoteCache remoteCache = new RemoteCache(cacheClient, remoteOptions, digestUtil);
239238
actionContextProvider =
240239
RemoteActionContextProvider.createForRemoteCaching(
241240
executorService, env, remoteCache, /* retryScheduler= */ null, digestUtil);
@@ -573,7 +572,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
573572
}
574573
execChannel.release();
575574
RemoteExecutionCache remoteCache =
576-
new RemoteExecutionCache(env.getReporter(), cacheClient, remoteOptions, digestUtil);
575+
new RemoteExecutionCache(cacheClient, remoteOptions, digestUtil);
577576
actionContextProvider =
578577
RemoteActionContextProvider.createForRemoteExecution(
579578
executorService,
@@ -609,8 +608,7 @@ public void beforeCommand(CommandEnvironment env) throws AbruptExitException {
609608
}
610609
}
611610

612-
RemoteCache remoteCache =
613-
new RemoteCache(env.getReporter(), cacheClient, remoteOptions, digestUtil);
611+
RemoteCache remoteCache = new RemoteCache(cacheClient, remoteOptions, digestUtil);
614612
actionContextProvider =
615613
RemoteActionContextProvider.createForRemoteCaching(
616614
executorService, env, remoteCache, retryScheduler, digestUtil);

‎src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnCache.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -198,9 +198,7 @@ public void store(SpawnResult result) throws ExecException, InterruptedException
198198
}
199199
}
200200

201-
try (SilentCloseable c = prof.profile(ProfilerTask.UPLOAD_TIME, "upload outputs")) {
202-
remoteExecutionService.uploadOutputs(action, result);
203-
}
201+
remoteExecutionService.uploadOutputs(action, result);
204202
}
205203

206204
@Override

‎src/main/java/com/google/devtools/build/lib/remote/RemoteSpawnRunner.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -575,9 +575,7 @@ SpawnResult execLocallyAndUpload(
575575
}
576576
}
577577

578-
try (SilentCloseable c = Profiler.instance().profile(UPLOAD_TIME, "upload outputs")) {
579-
remoteExecutionService.uploadOutputs(action, result);
580-
}
578+
remoteExecutionService.uploadOutputs(action, result);
581579
return result;
582580
}
583581

‎src/main/java/com/google/devtools/build/lib/remote/UploadManifest.java

+80-10
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,20 @@
2828
import build.bazel.remote.execution.v2.Tree;
2929
import com.google.common.annotations.VisibleForTesting;
3030
import com.google.common.base.Preconditions;
31+
import com.google.common.collect.ImmutableList;
32+
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
33+
import com.google.devtools.build.lib.actions.ActionUploadFinishedEvent;
34+
import com.google.devtools.build.lib.actions.ActionUploadStartedEvent;
3135
import com.google.devtools.build.lib.actions.ExecException;
3236
import com.google.devtools.build.lib.actions.UserExecException;
37+
import com.google.devtools.build.lib.events.ExtendedEventHandler;
3338
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
3439
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
3540
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.ActionKey;
3641
import com.google.devtools.build.lib.remote.common.RemotePathResolver;
3742
import com.google.devtools.build.lib.remote.options.RemoteOptions;
3843
import com.google.devtools.build.lib.remote.util.DigestUtil;
44+
import com.google.devtools.build.lib.remote.util.RxUtils;
3945
import com.google.devtools.build.lib.server.FailureDetails.FailureDetail;
4046
import com.google.devtools.build.lib.server.FailureDetails.RemoteExecution;
4147
import com.google.devtools.build.lib.server.FailureDetails.RemoteExecution.Code;
@@ -56,6 +62,7 @@
5662
import java.util.HashMap;
5763
import java.util.List;
5864
import java.util.Map;
65+
import java.util.stream.Collectors;
5966
import javax.annotation.Nullable;
6067

6168
/** UploadManifest adds output metadata to a {@link ActionResult}. */
@@ -341,10 +348,11 @@ ActionResult getActionResult() {
341348
}
342349

343350
/** Uploads outputs and action result (if exit code is 0) to remote cache. */
344-
public ActionResult upload(RemoteActionExecutionContext context, RemoteCache remoteCache)
351+
public ActionResult upload(
352+
RemoteActionExecutionContext context, RemoteCache remoteCache, ExtendedEventHandler reporter)
345353
throws IOException, InterruptedException {
346354
try {
347-
return uploadAsync(context, remoteCache).blockingGet();
355+
return uploadAsync(context, remoteCache, reporter).blockingGet();
348356
} catch (RuntimeException e) {
349357
throwIfInstanceOf(e.getCause(), InterruptedException.class);
350358
throwIfInstanceOf(e.getCause(), IOException.class);
@@ -368,29 +376,91 @@ private Completable upload(
368376
return toCompletable(() -> remoteCache.uploadBlob(context, digest, blob), directExecutor());
369377
}
370378

379+
private static void reportUploadStarted(
380+
ExtendedEventHandler reporter,
381+
@Nullable ActionExecutionMetadata action,
382+
String prefix,
383+
Iterable<Digest> digests) {
384+
if (action != null) {
385+
for (Digest digest : digests) {
386+
reporter.post(ActionUploadStartedEvent.create(action, prefix + digest.getHash()));
387+
}
388+
}
389+
}
390+
391+
private static void reportUploadFinished(
392+
ExtendedEventHandler reporter,
393+
@Nullable ActionExecutionMetadata action,
394+
String resourceIdPrefix,
395+
Iterable<Digest> digests) {
396+
if (action != null) {
397+
for (Digest digest : digests) {
398+
reporter.post(
399+
ActionUploadFinishedEvent.create(action, resourceIdPrefix + digest.getHash()));
400+
}
401+
}
402+
}
403+
371404
/**
372405
* Returns a {@link Single} which upon subscription will upload outputs and action result (if exit
373406
* code is 0) to remote cache.
374407
*/
375408
public Single<ActionResult> uploadAsync(
376-
RemoteActionExecutionContext context, RemoteCache remoteCache) {
409+
RemoteActionExecutionContext context,
410+
RemoteCache remoteCache,
411+
ExtendedEventHandler reporter) {
377412
Collection<Digest> digests = new ArrayList<>();
378413
digests.addAll(digestToFile.keySet());
379414
digests.addAll(digestToBlobs.keySet());
380415

381-
Completable uploadOutputs =
382-
mergeBulkTransfer(
383-
toSingle(() -> remoteCache.findMissingDigests(context, digests), directExecutor())
384-
.flatMapPublisher(Flowable::fromIterable)
385-
.flatMapSingle(digest -> toTransferResult(upload(context, remoteCache, digest))));
416+
ActionExecutionMetadata action = context.getSpawnOwner();
417+
418+
String outputPrefix = "cas/";
419+
Flowable<RxUtils.TransferResult> bulkTransfers =
420+
toSingle(() -> remoteCache.findMissingDigests(context, digests), directExecutor())
421+
.doOnSubscribe(d -> reportUploadStarted(reporter, action, outputPrefix, digests))
422+
.doOnError(error -> reportUploadFinished(reporter, action, outputPrefix, digests))
423+
.doOnDispose(() -> reportUploadFinished(reporter, action, outputPrefix, digests))
424+
.doOnSuccess(
425+
missingDigests -> {
426+
List<Digest> existedDigests =
427+
digests.stream()
428+
.filter(digest -> !missingDigests.contains(digest))
429+
.collect(Collectors.toList());
430+
reportUploadFinished(reporter, action, outputPrefix, existedDigests);
431+
})
432+
.flatMapPublisher(Flowable::fromIterable)
433+
.flatMapSingle(
434+
digest ->
435+
toTransferResult(upload(context, remoteCache, digest))
436+
.doFinally(
437+
() ->
438+
reportUploadFinished(
439+
reporter, action, outputPrefix, ImmutableList.of(digest))));
440+
Completable uploadOutputs = mergeBulkTransfer(bulkTransfers);
386441

387442
ActionResult actionResult = result.build();
388443
Completable uploadActionResult = Completable.complete();
389444
if (actionResult.getExitCode() == 0 && actionKey != null) {
445+
String actionResultPrefix = "ac/";
390446
uploadActionResult =
391447
toCompletable(
392-
() -> remoteCache.uploadActionResult(context, actionKey, actionResult),
393-
directExecutor());
448+
() -> remoteCache.uploadActionResult(context, actionKey, actionResult),
449+
directExecutor())
450+
.doOnSubscribe(
451+
d ->
452+
reportUploadStarted(
453+
reporter,
454+
action,
455+
actionResultPrefix,
456+
ImmutableList.of(actionKey.getDigest())))
457+
.doFinally(
458+
() ->
459+
reportUploadFinished(
460+
reporter,
461+
action,
462+
actionResultPrefix,
463+
ImmutableList.of(actionKey.getDigest())));
394464
}
395465

396466
return Completable.concatArray(uploadOutputs, uploadActionResult).toSingleDefault(actionResult);

0 commit comments

Comments
 (0)
Please sign in to comment.