13
13
// limitations under the License.
14
14
package com .google .devtools .build .lib .remote ;
15
15
16
- import static com .google .common .base .Preconditions .checkArgument ;
17
- import static com .google .common .base .Preconditions .checkState ;
16
+ import static com .google .common .collect .ImmutableList .toImmutableList ;
17
+ import static com .google .common .util .concurrent .Futures .immediateFailedFuture ;
18
+ import static com .google .common .util .concurrent .Futures .immediateFuture ;
18
19
import static com .google .common .util .concurrent .MoreExecutors .directExecutor ;
19
20
import static com .google .devtools .build .lib .remote .util .RxFutures .toCompletable ;
20
21
import static com .google .devtools .build .lib .remote .util .RxFutures .toSingle ;
25
26
import build .bazel .remote .execution .v2 .Digest ;
26
27
import build .bazel .remote .execution .v2 .Directory ;
27
28
import com .google .common .base .Throwables ;
29
+ import com .google .common .collect .ImmutableList ;
28
30
import com .google .common .collect .ImmutableSet ;
29
- import com .google .common .util .concurrent .Futures ;
30
31
import com .google .common .util .concurrent .ListenableFuture ;
32
+ import com .google .devtools .build .lib .profiler .Profiler ;
33
+ import com .google .devtools .build .lib .profiler .SilentCloseable ;
31
34
import com .google .devtools .build .lib .remote .common .RemoteActionExecutionContext ;
32
35
import com .google .devtools .build .lib .remote .common .RemoteCacheClient ;
33
36
import com .google .devtools .build .lib .remote .merkletree .MerkleTree ;
36
39
import com .google .devtools .build .lib .remote .util .DigestUtil ;
37
40
import com .google .devtools .build .lib .remote .util .RxUtils .TransferResult ;
38
41
import com .google .protobuf .Message ;
42
+ import io .reactivex .rxjava3 .annotations .NonNull ;
39
43
import io .reactivex .rxjava3 .core .Completable ;
44
+ import io .reactivex .rxjava3 .core .CompletableObserver ;
40
45
import io .reactivex .rxjava3 .core .Flowable ;
46
+ import io .reactivex .rxjava3 .core .Maybe ;
47
+ import io .reactivex .rxjava3 .core .Observable ;
41
48
import io .reactivex .rxjava3 .core .Single ;
49
+ import io .reactivex .rxjava3 .core .SingleEmitter ;
50
+ import io .reactivex .rxjava3 .disposables .Disposable ;
42
51
import io .reactivex .rxjava3 .subjects .AsyncSubject ;
43
52
import java .io .IOException ;
44
- import java .util .HashSet ;
53
+ import java .util .List ;
45
54
import java .util .Map ;
46
- import java .util .Set ;
47
- import java .util .concurrent .atomic .AtomicBoolean ;
48
- import javax .annotation .concurrent .GuardedBy ;
55
+ import java .util .concurrent .atomic .AtomicReference ;
49
56
50
57
/** A {@link RemoteCache} with additional functionality needed for remote execution. */
51
58
public class RemoteExecutionCache extends RemoteCache {
@@ -85,13 +92,10 @@ public void ensureInputsPresent(
85
92
return ;
86
93
}
87
94
88
- MissingDigestFinder missingDigestFinder = new MissingDigestFinder (context , allDigests .size ());
89
95
Flowable <TransferResult > uploads =
90
- Flowable .fromIterable (allDigests )
91
- .flatMapSingle (
92
- digest ->
93
- uploadBlobIfMissing (
94
- context , merkleTree , additionalInputs , force , missingDigestFinder , digest ));
96
+ createUploadTasks (context , merkleTree , additionalInputs , allDigests , force )
97
+ .flatMap (uploadTasks -> findMissingBlobs (context , uploadTasks ))
98
+ .flatMapPublisher (this ::waitForUploadTasks );
95
99
96
100
try {
97
101
mergeBulkTransfer (uploads ).blockingAwait ();
@@ -105,36 +109,6 @@ public void ensureInputsPresent(
105
109
}
106
110
}
107
111
108
- private Single <TransferResult > uploadBlobIfMissing (
109
- RemoteActionExecutionContext context ,
110
- MerkleTree merkleTree ,
111
- Map <Digest , Message > additionalInputs ,
112
- boolean force ,
113
- MissingDigestFinder missingDigestFinder ,
114
- Digest digest ) {
115
- Completable upload =
116
- casUploadCache .execute (
117
- digest ,
118
- Completable .defer (
119
- () ->
120
- // Only reach here if the digest is missing and is not being uploaded.
121
- missingDigestFinder
122
- .registerAndCount (digest )
123
- .flatMapCompletable (
124
- missingDigests -> {
125
- if (missingDigests .contains (digest )) {
126
- return toCompletable (
127
- () -> uploadBlob (context , digest , merkleTree , additionalInputs ),
128
- directExecutor ());
129
- } else {
130
- return Completable .complete ();
131
- }
132
- })),
133
- /* onIgnored= */ missingDigestFinder ::count ,
134
- force );
135
- return toTransferResult (upload );
136
- }
137
-
138
112
private ListenableFuture <Void > uploadBlob (
139
113
RemoteActionExecutionContext context ,
140
114
Digest digest ,
@@ -158,99 +132,159 @@ private ListenableFuture<Void> uploadBlob(
158
132
return cacheProtocol .uploadBlob (context , digest , message .toByteString ());
159
133
}
160
134
161
- return Futures . immediateFailedFuture (
135
+ return immediateFailedFuture (
162
136
new IOException (
163
137
format (
164
138
"findMissingDigests returned a missing digest that has not been requested: %s" ,
165
139
digest )));
166
140
}
167
141
168
- /**
169
- * A missing digest finder that initiates the request when the internal counter reaches an
170
- * expected count.
171
- */
172
- class MissingDigestFinder {
173
- private final int expectedCount ;
174
-
175
- private final AsyncSubject <ImmutableSet <Digest >> digestsSubject ;
176
- private final Single <ImmutableSet <Digest >> resultSingle ;
142
+ static class UploadTask {
143
+ Digest digest ;
144
+ AtomicReference <Disposable > disposable ;
145
+ SingleEmitter <Boolean > continuation ;
146
+ Completable completion ;
147
+ }
177
148
178
- @ GuardedBy ("this" )
179
- private final Set <Digest > digests ;
149
+ private Single <List <UploadTask >> createUploadTasks (
150
+ RemoteActionExecutionContext context ,
151
+ MerkleTree merkleTree ,
152
+ Map <Digest , Message > additionalInputs ,
153
+ Iterable <Digest > allDigests ,
154
+ boolean force ) {
155
+ return Single .using (
156
+ () -> Profiler .instance ().profile ("collect digests" ),
157
+ ignored ->
158
+ Flowable .fromIterable (allDigests )
159
+ .flatMapMaybe (
160
+ digest ->
161
+ maybeCreateUploadTask (context , merkleTree , additionalInputs , digest , force ))
162
+ .collect (toImmutableList ()),
163
+ SilentCloseable ::close );
164
+ }
180
165
181
- @ GuardedBy ("this" )
182
- private int currentCount = 0 ;
166
+ private Maybe <UploadTask > maybeCreateUploadTask (
167
+ RemoteActionExecutionContext context ,
168
+ MerkleTree merkleTree ,
169
+ Map <Digest , Message > additionalInputs ,
170
+ Digest digest ,
171
+ boolean force ) {
172
+ return Maybe .create (
173
+ emitter -> {
174
+ AsyncSubject <Void > completion = AsyncSubject .create ();
175
+ UploadTask uploadTask = new UploadTask ();
176
+ uploadTask .digest = digest ;
177
+ uploadTask .disposable = new AtomicReference <>();
178
+ uploadTask .completion =
179
+ Completable .fromObservable (
180
+ completion .doOnDispose (
181
+ () -> {
182
+ Disposable d = uploadTask .disposable .getAndSet (null );
183
+ if (d != null ) {
184
+ d .dispose ();
185
+ }
186
+ }));
187
+ Completable upload =
188
+ casUploadCache .execute (
189
+ digest ,
190
+ Single .<Boolean >create (
191
+ continuation -> {
192
+ uploadTask .continuation = continuation ;
193
+ emitter .onSuccess (uploadTask );
194
+ })
195
+ .flatMapCompletable (
196
+ shouldUpload -> {
197
+ if (!shouldUpload ) {
198
+ return Completable .complete ();
199
+ }
183
200
184
- MissingDigestFinder (RemoteActionExecutionContext context , int expectedCount ) {
185
- checkArgument (expectedCount > 0 , "expectedCount should be greater than 0" );
186
- this .expectedCount = expectedCount ;
187
- this .digestsSubject = AsyncSubject .create ();
188
- this .digests = new HashSet <>();
201
+ return toCompletable (
202
+ () ->
203
+ uploadBlob (
204
+ context , uploadTask .digest , merkleTree , additionalInputs ),
205
+ directExecutor ());
206
+ }),
207
+ /* onAlreadyRunning= */ () -> emitter .onSuccess (uploadTask ),
208
+ /* onAlreadyFinished= */ emitter ::onComplete ,
209
+ force );
210
+ upload .subscribe (
211
+ new CompletableObserver () {
212
+ @ Override
213
+ public void onSubscribe (@ NonNull Disposable d ) {
214
+ uploadTask .disposable .set (d );
215
+ }
189
216
190
- AtomicBoolean findMissingDigestsCalled = new AtomicBoolean (false );
191
- this .resultSingle =
192
- Single .fromObservable (
193
- digestsSubject
194
- .flatMapSingle (
195
- digests -> {
196
- boolean wasCalled = findMissingDigestsCalled .getAndSet (true );
197
- // Make sure we don't have re-subscription caused by refCount() below.
198
- checkState (!wasCalled , "FindMissingDigests is called more than once" );
199
- return toSingle (
200
- () -> findMissingDigests (context , digests ), directExecutor ());
201
- })
202
- // Use replay here because we could have a race condition that downstream hasn't
203
- // been added to the subscription list (to receive the upstream result) while
204
- // upstream is completed.
205
- .replay (1 )
206
- .refCount ());
207
- }
217
+ @ Override
218
+ public void onComplete () {
219
+ completion .onComplete ();
220
+ }
208
221
209
- /**
210
- * Register the {@code digest} and increase the counter.
211
- *
212
- * <p>Returned Single cannot be subscribed more than once.
213
- *
214
- * @return Single that emits the result of the {@code FindMissingDigest} request.
215
- */
216
- Single <ImmutableSet <Digest >> registerAndCount (Digest digest ) {
217
- AtomicBoolean subscribed = new AtomicBoolean (false );
218
- // count() will potentially trigger the findMissingDigests call. Adding and counting before
219
- // returning the Single could introduce a race that the result of findMissingDigests is
220
- // available but the consumer doesn't get it because it hasn't subscribed the returned
221
- // Single. In this case, it subscribes after upstream is completed resulting a re-run of
222
- // findMissingDigests (due to refCount()).
223
- //
224
- // Calling count() inside doOnSubscribe to ensure the consumer already subscribed to the
225
- // returned Single to avoid a re-execution of findMissingDigests.
226
- return resultSingle .doOnSubscribe (
227
- d -> {
228
- boolean wasSubscribed = subscribed .getAndSet (true );
229
- checkState (!wasSubscribed , "Single is subscribed more than once" );
230
- synchronized (this ) {
231
- digests .add (digest );
232
- }
233
- count ();
234
- });
235
- }
222
+ @ Override
223
+ public void onError (@ NonNull Throwable e ) {
224
+ Disposable d = uploadTask .disposable .get ();
225
+ if (d != null && d .isDisposed ()) {
226
+ return ;
227
+ }
236
228
237
- /** Increase the counter. */
238
- void count () {
239
- ImmutableSet <Digest > digestsResult = null ;
229
+ completion .onError (e );
230
+ }
231
+ });
232
+ });
233
+ }
240
234
241
- synchronized (this ) {
242
- if (currentCount < expectedCount ) {
243
- currentCount ++;
244
- if (currentCount == expectedCount ) {
245
- digestsResult = ImmutableSet .copyOf (digests );
246
- }
247
- }
248
- }
235
+ private Single <List <UploadTask >> findMissingBlobs (
236
+ RemoteActionExecutionContext context , List <UploadTask > uploadTasks ) {
237
+ return Single .using (
238
+ () -> Profiler .instance ().profile ("findMissingDigests" ),
239
+ ignored ->
240
+ Single .fromObservable (
241
+ Observable .fromSingle (
242
+ toSingle (
243
+ () -> {
244
+ ImmutableList <Digest > digestsToQuery =
245
+ uploadTasks .stream ()
246
+ .filter (uploadTask -> uploadTask .continuation != null )
247
+ .map (uploadTask -> uploadTask .digest )
248
+ .collect (toImmutableList ());
249
+ if (digestsToQuery .isEmpty ()) {
250
+ return immediateFuture (ImmutableSet .of ());
251
+ }
252
+ return findMissingDigests (context , digestsToQuery );
253
+ },
254
+ directExecutor ())
255
+ .map (
256
+ missingDigests -> {
257
+ for (UploadTask uploadTask : uploadTasks ) {
258
+ if (uploadTask .continuation != null ) {
259
+ uploadTask .continuation .onSuccess (
260
+ missingDigests .contains (uploadTask .digest ));
261
+ }
262
+ }
263
+ return uploadTasks ;
264
+ }))
265
+ // Use AsyncSubject so that if downstream is disposed, the
266
+ // findMissingDigests call is not cancelled (because it may be needed by
267
+ // other
268
+ // threads).
269
+ .subscribeWith (AsyncSubject .create ()))
270
+ .doOnDispose (
271
+ () -> {
272
+ for (UploadTask uploadTask : uploadTasks ) {
273
+ Disposable d = uploadTask .disposable .getAndSet (null );
274
+ if (d != null ) {
275
+ d .dispose ();
276
+ }
277
+ }
278
+ }),
279
+ SilentCloseable ::close );
280
+ }
249
281
250
- if (digestsResult != null ) {
251
- digestsSubject .onNext (digestsResult );
252
- digestsSubject .onComplete ();
253
- }
254
- }
282
+ private Flowable <TransferResult > waitForUploadTasks (List <UploadTask > uploadTasks ) {
283
+ return Flowable .using (
284
+ () -> Profiler .instance ().profile ("upload" ),
285
+ ignored ->
286
+ Flowable .fromIterable (uploadTasks )
287
+ .flatMapSingle (uploadTask -> toTransferResult (uploadTask .completion )),
288
+ SilentCloseable ::close );
255
289
}
256
290
}
0 commit comments