16
16
17
17
import static com .google .common .base .Preconditions .checkNotNull ;
18
18
import static com .google .common .base .Preconditions .checkState ;
19
+ import static java .lang .Math .min ;
19
20
20
21
import com .google .common .annotations .VisibleForTesting ;
21
22
import com .google .common .base .Throwables ;
22
23
import com .google .common .io .ByteStreams ;
23
24
import com .google .devtools .build .lib .actions .ActionInput ;
24
25
import com .google .devtools .build .lib .actions .ActionInputHelper ;
25
26
import com .google .devtools .build .lib .actions .cache .VirtualActionInput ;
27
+ import com .google .devtools .build .lib .remote .zstd .ZstdCompressingInputStream ;
26
28
import com .google .devtools .build .lib .vfs .Path ;
27
29
import com .google .protobuf .ByteString ;
28
30
import java .io .ByteArrayInputStream ;
29
- import java .io .EOFException ;
30
31
import java .io .IOException ;
31
32
import java .io .InputStream ;
33
+ import java .io .PushbackInputStream ;
32
34
import java .util .NoSuchElementException ;
33
35
import java .util .Objects ;
34
36
import java .util .function .Supplier ;
@@ -55,6 +57,10 @@ static int getDefaultChunkSize() {
55
57
return defaultChunkSize ;
56
58
}
57
59
60
+ public boolean isCompressed () {
61
+ return compressed ;
62
+ }
63
+
58
64
/** A piece of a byte[] blob. */
59
65
public static final class Chunk {
60
66
@@ -98,19 +104,22 @@ public int hashCode() {
98
104
private final int chunkSize ;
99
105
private final Chunk emptyChunk ;
100
106
101
- private InputStream data ;
107
+ private ChunkerInputStream data ;
102
108
private long offset ;
103
109
private byte [] chunkCache ;
104
110
111
+ private final boolean compressed ;
112
+
105
113
// Set to true on the first call to next(). This is so that the Chunker can open its data source
106
114
// lazily on the first call to next(), as opposed to opening it in the constructor or on reset().
107
115
private boolean initialized ;
108
116
109
- Chunker (Supplier <InputStream > dataSupplier , long size , int chunkSize ) {
117
+ Chunker (Supplier <InputStream > dataSupplier , long size , int chunkSize , boolean compressed ) {
110
118
this .dataSupplier = checkNotNull (dataSupplier );
111
119
this .size = size ;
112
120
this .chunkSize = chunkSize ;
113
121
this .emptyChunk = new Chunk (ByteString .EMPTY , 0 );
122
+ this .compressed = compressed ;
114
123
}
115
124
116
125
public long getOffset () {
@@ -127,13 +136,9 @@ public long getSize() {
127
136
* <p>Closes any open resources (file handles, ...).
128
137
*/
129
138
public void reset () throws IOException {
130
- if (data != null ) {
131
- data .close ();
132
- }
133
- data = null ;
139
+ close ();
134
140
offset = 0 ;
135
141
initialized = false ;
136
- chunkCache = null ;
137
142
}
138
143
139
144
/**
@@ -148,6 +153,9 @@ public void seek(long toOffset) throws IOException {
148
153
maybeInitialize ();
149
154
ByteStreams .skipFully (data , toOffset - offset );
150
155
offset = toOffset ;
156
+ if (data .finished ()) {
157
+ close ();
158
+ }
151
159
}
152
160
153
161
/**
@@ -157,6 +165,27 @@ public boolean hasNext() {
157
165
return data != null || !initialized ;
158
166
}
159
167
168
+ /** Closes the input stream and reset chunk cache */
169
+ private void close () throws IOException {
170
+ if (data != null ) {
171
+ data .close ();
172
+ data = null ;
173
+ }
174
+ chunkCache = null ;
175
+ }
176
+
177
+ /** Attempts reading at most a full chunk and stores it in the chunkCache buffer */
178
+ private int read () throws IOException {
179
+ int count = 0 ;
180
+ while (count < chunkCache .length ) {
181
+ int c = data .read (chunkCache , count , chunkCache .length - count );
182
+ if (c < 0 ) {
183
+ break ;
184
+ }
185
+ count += c ;
186
+ }
187
+ return count ;
188
+ }
160
189
/**
161
190
* Returns the next {@link Chunk} or throws a {@link NoSuchElementException} if no data is left.
162
191
*
@@ -178,46 +207,40 @@ public Chunk next() throws IOException {
178
207
return emptyChunk ;
179
208
}
180
209
181
- // The cast to int is safe, because the return value is capped at chunkSize.
182
- int bytesToRead = (int ) Math .min (bytesLeft (), chunkSize );
183
- if (bytesToRead == 0 ) {
210
+ if (data .finished ()) {
184
211
chunkCache = null ;
185
212
data = null ;
186
213
throw new NoSuchElementException ();
187
214
}
188
215
189
216
if (chunkCache == null ) {
217
+ // If the output is compressed we can't know how many bytes there are yet to read,
218
+ // so we allocate the whole chunkSize, otherwise we try to compute the smallest possible value
219
+ // The cast to int is safe, because the return value is capped at chunkSize.
220
+ int cacheSize = compressed ? chunkSize : (int ) min (getSize () - getOffset (), chunkSize );
190
221
// Lazily allocate it in order to save memory on small data.
191
222
// 1) bytesToRead < chunkSize: There will only ever be one next() call.
192
223
// 2) bytesToRead == chunkSize: chunkCache will be set to its biggest possible value.
193
224
// 3) bytestoRead > chunkSize: Not possible, due to Math.min above.
194
- chunkCache = new byte [bytesToRead ];
225
+ chunkCache = new byte [cacheSize ];
195
226
}
196
227
197
228
long offsetBefore = offset ;
198
- try {
199
- ByteStreams .readFully (data , chunkCache , 0 , bytesToRead );
200
- } catch (EOFException e ) {
201
- throw new IllegalStateException ("Reached EOF, but expected "
202
- + bytesToRead + " bytes." , e );
203
- }
204
- offset += bytesToRead ;
205
229
206
- ByteString blob = ByteString . copyFrom ( chunkCache , 0 , bytesToRead );
230
+ int bytesRead = read ( );
207
231
208
- if (bytesLeft () == 0 ) {
209
- data .close ();
210
- data = null ;
211
- chunkCache = null ;
232
+ ByteString blob = ByteString .copyFrom (chunkCache , 0 , bytesRead );
233
+
234
+ // This has to happen after actualSize has been updated
235
+ // or the guard in getActualSize won't work.
236
+ offset += bytesRead ;
237
+ if (data .finished ()) {
238
+ close ();
212
239
}
213
240
214
241
return new Chunk (blob , offsetBefore );
215
242
}
216
243
217
- public long bytesLeft () {
218
- return getSize () - getOffset ();
219
- }
220
-
221
244
private void maybeInitialize () throws IOException {
222
245
if (initialized ) {
223
246
return ;
@@ -226,7 +249,10 @@ private void maybeInitialize() throws IOException {
226
249
checkState (offset == 0 );
227
250
checkState (chunkCache == null );
228
251
try {
229
- data = dataSupplier .get ();
252
+ data =
253
+ compressed
254
+ ? new ChunkerInputStream (new ZstdCompressingInputStream (dataSupplier .get ()))
255
+ : new ChunkerInputStream (dataSupplier .get ());
230
256
} catch (RuntimeException e ) {
231
257
Throwables .propagateIfPossible (e .getCause (), IOException .class );
232
258
throw e ;
@@ -242,6 +268,7 @@ public static Builder builder() {
242
268
public static class Builder {
243
269
private int chunkSize = getDefaultChunkSize ();
244
270
private long size ;
271
+ private boolean compressed ;
245
272
private Supplier <InputStream > inputStream ;
246
273
247
274
public Builder setInput (byte [] data ) {
@@ -251,6 +278,11 @@ public Builder setInput(byte[] data) {
251
278
return this ;
252
279
}
253
280
281
+ public Builder setCompressed (boolean compressed ) {
282
+ this .compressed = compressed ;
283
+ return this ;
284
+ }
285
+
254
286
public Builder setInput (long size , InputStream in ) {
255
287
checkState (inputStream == null );
256
288
checkNotNull (in );
@@ -305,7 +337,22 @@ public Builder setChunkSize(int chunkSize) {
305
337
306
338
public Chunker build () {
307
339
checkNotNull (inputStream );
308
- return new Chunker (inputStream , size , chunkSize );
340
+ return new Chunker (inputStream , size , chunkSize , compressed );
341
+ }
342
+ }
343
+
344
+ static class ChunkerInputStream extends PushbackInputStream {
345
+ ChunkerInputStream (InputStream in ) {
346
+ super (in );
347
+ }
348
+
349
+ public boolean finished () throws IOException {
350
+ int c = super .read ();
351
+ if (c == -1 ) {
352
+ return true ;
353
+ }
354
+ super .unread (c );
355
+ return false ;
309
356
}
310
357
}
311
358
}
0 commit comments