Skip to content

Commit e05345d

Browse files
Bencodescopybara-github
authored andcommitted
Add support for wrapping system streams in WorkRequestHandler
There are often [places](https://github.com/bazelbuild/bazel/blob/ea19c17075478092eb77580e6d3825d480126d3a/src/tools/android/java/com/google/devtools/build/android/ResourceProcessorBusyBox.java#L188) where persistent workers need to swap out the standard system streams to avoid tools poisoning the worker communication streams by writing logs/exceptions to it. This pull request extracts that pattern into an optional WorkerIO wrapper can be used to swap in and out the standard streams without the added boilerplate. Closes bazelbuild#14201. PiperOrigin-RevId: 498983983 Change-Id: Iefb956d38a5887d9e5bbf0821551eb0efa14fce9
1 parent fd93888 commit e05345d

File tree

3 files changed

+239
-24
lines changed

3 files changed

+239
-24
lines changed

src/main/java/com/google/devtools/build/lib/worker/WorkRequestHandler.java

+148-16
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,15 @@
1818
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
1919
import com.google.errorprone.annotations.CanIgnoreReturnValue;
2020
import com.sun.management.OperatingSystemMXBean;
21+
import java.io.ByteArrayInputStream;
22+
import java.io.ByteArrayOutputStream;
2123
import java.io.IOException;
24+
import java.io.InputStream;
2225
import java.io.PrintStream;
2326
import java.io.PrintWriter;
2427
import java.io.StringWriter;
2528
import java.lang.management.ManagementFactory;
29+
import java.nio.charset.StandardCharsets;
2630
import java.time.Duration;
2731
import java.util.List;
2832
import java.util.Optional;
@@ -317,8 +321,17 @@ public WorkRequestHandler build() {
317321
* then writing the corresponding {@link WorkResponse} to {@code out}. If there is an error
318322
* reading or writing the requests or responses, it writes an error message on {@code err} and
319323
* returns. If {@code in} reaches EOF, it also returns.
324+
*
325+
* <p>This function also wraps the system streams in a {@link WorkerIO} instance that prevents the
326+
* underlying tool from writing to {@link System#out} or reading from {@link System#in}, which
327+
* would corrupt the worker worker protocol. When the while loop exits, the original system
328+
* streams will be swapped back into {@link System}.
320329
*/
321330
public void processRequests() throws IOException {
331+
// Wrap the system streams into a WorkerIO instance to prevent unexpected reads and writes on
332+
// stdin/stdout.
333+
WorkerIO workerIO = WorkerIO.capture();
334+
322335
try {
323336
while (!shutdownWorker.get()) {
324337
WorkRequest request = messageProcessor.readWorkRequest();
@@ -328,31 +341,39 @@ public void processRequests() throws IOException {
328341
if (request.getCancel()) {
329342
respondToCancelRequest(request);
330343
} else {
331-
startResponseThread(request);
344+
startResponseThread(workerIO, request);
332345
}
333346
}
334347
} catch (IOException e) {
335348
stderr.println("Error reading next WorkRequest: " + e);
336349
e.printStackTrace(stderr);
337-
}
338-
// TODO(b/220878242): Give the outstanding requests a chance to send a "shutdown" response,
339-
// but also try to kill stuck threads. For now, we just interrupt the remaining threads.
340-
// We considered doing System.exit here, but that is hard to test and would deny the callers
341-
// of this method a chance to clean up. Instead, we initiate the cleanup of our resources here
342-
// and the caller can decide whether to wait for an orderly shutdown or now.
343-
for (RequestInfo ri : activeRequests.values()) {
344-
if (ri.thread.isAlive()) {
345-
try {
346-
ri.thread.interrupt();
347-
} catch (RuntimeException e) {
348-
// If we can't interrupt, we can't do much else.
350+
} finally {
351+
// TODO(b/220878242): Give the outstanding requests a chance to send a "shutdown" response,
352+
// but also try to kill stuck threads. For now, we just interrupt the remaining threads.
353+
// We considered doing System.exit here, but that is hard to test and would deny the callers
354+
// of this method a chance to clean up. Instead, we initiate the cleanup of our resources here
355+
// and the caller can decide whether to wait for an orderly shutdown or now.
356+
for (RequestInfo ri : activeRequests.values()) {
357+
if (ri.thread.isAlive()) {
358+
try {
359+
ri.thread.interrupt();
360+
} catch (RuntimeException e) {
361+
// If we can't interrupt, we can't do much else.
362+
}
349363
}
350364
}
365+
366+
try {
367+
// Unwrap the system streams placing the original streams back
368+
workerIO.close();
369+
} catch (Exception e) {
370+
stderr.println(e.getMessage());
371+
}
351372
}
352373
}
353374

354375
/** Starts a thread for the given request. */
355-
void startResponseThread(WorkRequest request) {
376+
void startResponseThread(WorkerIO workerIO, WorkRequest request) {
356377
Thread currentThread = Thread.currentThread();
357378
String threadName =
358379
request.getRequestId() > 0
@@ -381,7 +402,7 @@ void startResponseThread(WorkRequest request) {
381402
return;
382403
}
383404
try {
384-
respondToRequest(request, requestInfo);
405+
respondToRequest(workerIO, request, requestInfo);
385406
} catch (IOException e) {
386407
// IOExceptions here means a problem talking to the server, so we must shut down.
387408
if (!shutdownWorker.compareAndSet(false, true)) {
@@ -419,7 +440,8 @@ void startResponseThread(WorkRequest request) {
419440
* #callback} are reported with exit code 1.
420441
*/
421442
@VisibleForTesting
422-
void respondToRequest(WorkRequest request, RequestInfo requestInfo) throws IOException {
443+
void respondToRequest(WorkerIO workerIO, WorkRequest request, RequestInfo requestInfo)
444+
throws IOException {
423445
int exitCode;
424446
StringWriter sw = new StringWriter();
425447
try (PrintWriter pw = new PrintWriter(sw)) {
@@ -431,6 +453,16 @@ void respondToRequest(WorkRequest request, RequestInfo requestInfo) throws IOExc
431453
e.printStackTrace(pw);
432454
exitCode = 1;
433455
}
456+
457+
try {
458+
// Read out the captured string for the final WorkResponse output
459+
String captured = workerIO.readCapturedAsUtf8String().trim();
460+
if (!captured.isEmpty()) {
461+
pw.write(captured);
462+
}
463+
} catch (IOException e) {
464+
stderr.println(e.getMessage());
465+
}
434466
}
435467
Optional<WorkResponse.Builder> optBuilder = requestInfo.takeBuilder();
436468
if (optBuilder.isPresent()) {
@@ -541,4 +573,104 @@ private void maybePerformGc() {
541573
}
542574
}
543575
}
576+
577+
/**
578+
* Class that wraps the standard {@link System#in}, {@link System#out}, and {@link System#err}
579+
* with our own ByteArrayOutputStream that allows {@link WorkRequestHandler} to safely capture
580+
* outputs that can't be directly captured by the PrintStream associated with the work request.
581+
*
582+
* <p>This is most useful when integrating JVM tools that write exceptions and logs directly to
583+
* {@link System#out} and {@link System#err}, which would corrupt the persistent worker protocol.
584+
* We also redirect {@link System#in}, just in case a tool should attempt to read it.
585+
*
586+
* <p>WorkerIO implements {@link AutoCloseable} and will swap the original streams back into
587+
* {@link System} once close has been called.
588+
*/
589+
public static class WorkerIO implements AutoCloseable {
590+
private final InputStream originalInputStream;
591+
private final PrintStream originalOutputStream;
592+
private final PrintStream originalErrorStream;
593+
private final ByteArrayOutputStream capturedStream;
594+
private final AutoCloseable restore;
595+
596+
/**
597+
* Creates a new {@link WorkerIO} that allows {@link WorkRequestHandler} to capture standard
598+
* output and error streams that can't be directly captured by the PrintStream associated with
599+
* the work request.
600+
*/
601+
@VisibleForTesting
602+
WorkerIO(
603+
InputStream originalInputStream,
604+
PrintStream originalOutputStream,
605+
PrintStream originalErrorStream,
606+
ByteArrayOutputStream capturedStream,
607+
AutoCloseable restore) {
608+
this.originalInputStream = originalInputStream;
609+
this.originalOutputStream = originalOutputStream;
610+
this.originalErrorStream = originalErrorStream;
611+
this.capturedStream = capturedStream;
612+
this.restore = restore;
613+
}
614+
615+
/** Wraps the standard System streams and WorkerIO instance */
616+
public static WorkerIO capture() {
617+
// Save the original streams
618+
InputStream originalInputStream = System.in;
619+
PrintStream originalOutputStream = System.out;
620+
PrintStream originalErrorStream = System.err;
621+
622+
// Replace the original streams with our own instances
623+
ByteArrayOutputStream capturedStream = new ByteArrayOutputStream();
624+
PrintStream outputBuffer = new PrintStream(capturedStream, true);
625+
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(new byte[0]);
626+
System.setIn(byteArrayInputStream);
627+
System.setOut(outputBuffer);
628+
System.setErr(outputBuffer);
629+
630+
return new WorkerIO(
631+
originalInputStream,
632+
originalOutputStream,
633+
originalErrorStream,
634+
capturedStream,
635+
() -> {
636+
System.setIn(originalInputStream);
637+
System.setOut(originalOutputStream);
638+
System.setErr(originalErrorStream);
639+
outputBuffer.close();
640+
byteArrayInputStream.close();
641+
});
642+
}
643+
644+
/** Returns the original input stream most commonly provided by {@link System#in} */
645+
@VisibleForTesting
646+
InputStream getOriginalInputStream() {
647+
return originalInputStream;
648+
}
649+
650+
/** Returns the original output stream most commonly provided by {@link System#out} */
651+
@VisibleForTesting
652+
PrintStream getOriginalOutputStream() {
653+
return originalOutputStream;
654+
}
655+
656+
/** Returns the original error stream most commonly provided by {@link System#err} */
657+
@VisibleForTesting
658+
PrintStream getOriginalErrorStream() {
659+
return originalErrorStream;
660+
}
661+
662+
/** Returns the captured outputs as a UTF-8 string */
663+
@VisibleForTesting
664+
String readCapturedAsUtf8String() throws IOException {
665+
capturedStream.flush();
666+
String captureOutput = capturedStream.toString(StandardCharsets.UTF_8);
667+
capturedStream.reset();
668+
return captureOutput;
669+
}
670+
671+
@Override
672+
public void close() throws Exception {
673+
restore.close();
674+
}
675+
}
544676
}

src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java

+13-3
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public final class ExampleWorker {
5454
static final Pattern FLAG_FILE_PATTERN = Pattern.compile("(?:@|--?flagfile=)(.+)");
5555

5656
// A UUID that uniquely identifies this running worker process.
57-
static final UUID workerUuid = UUID.randomUUID();
57+
static final UUID WORKER_UUID = UUID.randomUUID();
5858

5959
// A counter that increases with each work unit processed.
6060
static int workUnitCounter = 1;
@@ -83,6 +83,9 @@ private static class InterruptableWorkRequestHandler extends WorkRequestHandler
8383
@Override
8484
@SuppressWarnings("SystemExitOutsideMain")
8585
public void processRequests() throws IOException {
86+
ByteArrayOutputStream captured = new ByteArrayOutputStream();
87+
WorkerIO workerIO = new WorkerIO(System.in, System.out, System.err, captured, captured);
88+
8689
while (true) {
8790
WorkRequest request = messageProcessor.readWorkRequest();
8891
if (request == null) {
@@ -100,12 +103,19 @@ public void processRequests() throws IOException {
100103
if (request.getCancel()) {
101104
respondToCancelRequest(request);
102105
} else {
103-
startResponseThread(request);
106+
startResponseThread(workerIO, request);
104107
}
105108
if (workerOptions.exitAfter > 0 && workUnitCounter > workerOptions.exitAfter) {
106109
System.exit(0);
107110
}
108111
}
112+
113+
try {
114+
// Unwrap the system streams placing the original streams back
115+
workerIO.close();
116+
} catch (Exception e) {
117+
workerIO.getOriginalErrorStream().println(e.getMessage());
118+
}
109119
}
110120
}
111121

@@ -241,7 +251,7 @@ private static void parseOptionsAndLog(List<String> args) throws Exception {
241251
List<String> outputs = new ArrayList<>();
242252

243253
if (options.writeUUID) {
244-
outputs.add("UUID " + workerUuid);
254+
outputs.add("UUID " + WORKER_UUID);
245255
}
246256

247257
if (options.writeCounter) {

0 commit comments

Comments
 (0)