Skip to content

Commit ede18db

Browse files
authored
Make DelegatingAsyncDisruptorAppender more resilient to exceptions + flush Flushable appenders at end of batch (#457)
* Make DelegatingAsyncDisruptorAppender more resilient to exceptions thrown by child appenders - Wrap calls to appenders within a try/catch block and make sure exceptions thrown by one does not prevent other appenders from receiving the event (gh456). - Flush appenders at end of batch only if they are started (gh456). - OutputStreamAppender may occasionally return a null OutputStream (gh455) - Flush appenders implementing java.io.Flushable (gh454)
1 parent 5026ff0 commit ede18db

File tree

2 files changed

+291
-49
lines changed

2 files changed

+291
-49
lines changed

src/main/java/net/logstash/logback/appender/DelegatingAsyncDisruptorAppender.java

+66-21
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,24 @@
1313
*/
1414
package net.logstash.logback.appender;
1515

16+
import java.io.Flushable;
17+
import java.io.IOException;
18+
import java.io.OutputStream;
1619
import java.util.Iterator;
1720
import java.util.concurrent.BlockingQueue;
1821

19-
import ch.qos.logback.core.OutputStreamAppender;
20-
import net.logstash.logback.appender.listener.AppenderListener;
22+
import com.lmax.disruptor.EventHandler;
23+
import com.lmax.disruptor.RingBuffer;
24+
2125
import ch.qos.logback.access.spi.IAccessEvent;
2226
import ch.qos.logback.classic.spi.ILoggingEvent;
2327
import ch.qos.logback.core.Appender;
2428
import ch.qos.logback.core.AsyncAppenderBase;
29+
import ch.qos.logback.core.OutputStreamAppender;
2530
import ch.qos.logback.core.spi.AppenderAttachable;
2631
import ch.qos.logback.core.spi.AppenderAttachableImpl;
2732
import ch.qos.logback.core.spi.DeferredProcessingAware;
28-
29-
import com.lmax.disruptor.EventHandler;
30-
import com.lmax.disruptor.RingBuffer;
33+
import net.logstash.logback.appender.listener.AppenderListener;
3134

3235
/**
3336
* An {@link AsyncDisruptorAppender} that delegates appending of an event
@@ -37,6 +40,8 @@
3740
* <ul>
3841
* <li>it uses a {@link RingBuffer} instead of a {@link BlockingQueue}</li>
3942
* <li>it allows any number of delegate appenders, instead of just one</li>
43+
* <li>it flushes appenders of type {@link OutputStreamAppender} or {@link Flushable} at the end of a batch</li>
44+
* <li>it is resilient to exceptions and guarantees that all appenders are invoked</li>
4045
* </ul>
4146
*
4247
* @param <Event> type of event ({@link ILoggingEvent} or {@link IAccessEvent}).
@@ -49,28 +54,68 @@ public abstract class DelegatingAsyncDisruptorAppender<Event extends DeferredPro
4954
private final AppenderAttachableImpl<Event> appenders = new AppenderAttachableImpl<Event>();
5055

5156
private class DelegatingEventHandler implements EventHandler<LogEvent<Event>> {
57+
/**
58+
* Whether exceptions should be reported with a error status or not.
59+
*/
60+
private boolean silentError;
5261

5362
@Override
5463
public void onEvent(LogEvent<Event> logEvent, long sequence, boolean endOfBatch) throws Exception {
55-
appenders.appendLoopOnAppenders(logEvent.event);
56-
57-
/*
58-
* Optimization:
59-
*
60-
* If any of the delegate appenders are instances of OutputStreamAppender,
61-
* then flush the OutputStreams at the end of the batch.
62-
*/
63-
if (endOfBatch) {
64-
for (Iterator<Appender<Event>> iter = appenders.iteratorForAppenders(); iter.hasNext(); ) {
65-
Appender<Event> appender = iter.next();
66-
if (appender instanceof OutputStreamAppender) {
67-
OutputStreamAppender outputStreamAppender = (OutputStreamAppender) appender;
68-
if (!outputStreamAppender.isImmediateFlush()) {
69-
outputStreamAppender.getOutputStream().flush();
70-
}
64+
65+
boolean exceptionThrown = false;
66+
for(Iterator<Appender<Event>> it=appenders.iteratorForAppenders(); it.hasNext(); ) {
67+
Appender<Event> appender = it.next();
68+
69+
try {
70+
appender.doAppend(logEvent.event);
71+
72+
/*
73+
* Optimization:
74+
*
75+
* If any of the delegate appenders are instances of OutputStreamAppender or Flushable,
76+
* then flush them at the end of the batch.
77+
*/
78+
if (endOfBatch) {
79+
flushAppender(appender);
80+
}
81+
}
82+
catch(Exception e) {
83+
exceptionThrown = true;
84+
if (!this.silentError) {
85+
addError(String.format("Unable to forward event to appender [%s]: %s", appender.getName(), e.getMessage()), e);
7186
}
7287
}
7388
}
89+
90+
this.silentError = exceptionThrown;
91+
}
92+
93+
94+
private void flushAppender(Appender<Event> appender) throws IOException {
95+
// Similar to #doAppend() - don't flush if appender is stopped
96+
if (!appender.isStarted()) {
97+
return;
98+
}
99+
if (appender instanceof Flushable) {
100+
flushAppender((Flushable)appender);
101+
}
102+
else
103+
if (appender instanceof OutputStreamAppender) {
104+
flushAppender((OutputStreamAppender<Event>)appender);
105+
}
106+
}
107+
108+
private void flushAppender(OutputStreamAppender<Event> appender) throws IOException {
109+
if (!appender.isImmediateFlush()) {
110+
OutputStream os = appender.getOutputStream();
111+
if (os != null) {
112+
os.flush();
113+
}
114+
}
115+
}
116+
117+
private void flushAppender(Flushable appender) throws IOException {
118+
appender.flush();
74119
}
75120
}
76121

0 commit comments

Comments
 (0)