Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit d55ce29

Browse files
committedJul 10, 2021
First implementation of Async append blocking until an optional timeout
See logfellow#559
1 parent 68b8be1 commit d55ce29

File tree

3 files changed

+355
-18
lines changed

3 files changed

+355
-18
lines changed
 

‎pom.xml

+6
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,12 @@
211211
<version>${org.mockito.version}</version>
212212
<scope>test</scope>
213213
</dependency>
214+
<dependency>
215+
<groupId>org.awaitility</groupId>
216+
<artifactId>awaitility</artifactId>
217+
<version>4.1.0</version>
218+
<scope>test</scope>
219+
</dependency>
214220
</dependencies>
215221

216222
<build>

‎src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java

+165-15
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
import java.util.Arrays;
2020
import java.util.Formatter;
2121
import java.util.List;
22+
import java.util.Objects;
2223
import java.util.concurrent.BlockingQueue;
2324
import java.util.concurrent.ScheduledExecutorService;
2425
import java.util.concurrent.ScheduledThreadPoolExecutor;
2526
import java.util.concurrent.ThreadFactory;
2627
import java.util.concurrent.TimeUnit;
2728
import java.util.concurrent.atomic.AtomicInteger;
2829
import java.util.concurrent.atomic.AtomicLong;
30+
import java.util.concurrent.locks.LockSupport;
2931

3032
import net.logstash.logback.appender.listener.AppenderListener;
3133
import net.logstash.logback.status.LevelFilteringStatusListener;
@@ -37,6 +39,7 @@
3739
import ch.qos.logback.core.spi.DeferredProcessingAware;
3840
import ch.qos.logback.core.status.OnConsoleStatusListener;
3941
import ch.qos.logback.core.status.Status;
42+
import ch.qos.logback.core.util.Duration;
4043
import com.lmax.disruptor.BlockingWaitStrategy;
4144
import com.lmax.disruptor.EventFactory;
4245
import com.lmax.disruptor.EventHandler;
@@ -250,6 +253,41 @@ public abstract class AsyncDisruptorAppender<Event extends DeferredProcessingAwa
250253
*/
251254
protected final List<Listener> listeners = new ArrayList<>();
252255

256+
public enum AsyncMode {
257+
/**
258+
* Appender thread is blocked until space is available in the ring buffer
259+
* or the retry timeout expires.
260+
*/
261+
BLOCK,
262+
263+
/**
264+
* Event is dropped when the ring buffer is full
265+
*/
266+
DROP
267+
}
268+
private AsyncMode asyncMode = AsyncMode.DROP;
269+
270+
/**
271+
* Delay (in millis) between consecutive attempts to append an event in the ring buffer when full.
272+
* Applicable only when {@link #asyncMode} is set to {@link AsyncMode#DROP}.
273+
*/
274+
private long retryMillis = 100;
275+
276+
/**
277+
* Maximum time to wait for space in the ring buffer before dropping the event.
278+
* Applicable only when {@link #asyncMode} is set to {@link AsyncMode#DROP}.
279+
*
280+
* <p>Use {@code -1} for no timeout, i.e. block until space is available.
281+
*/
282+
private Duration retryTimeout = Duration.buildByMilliseconds(1000);
283+
284+
/**
285+
* How long to wait for in-flight events during shutdown.
286+
*/
287+
private Duration shutdownGracePeriod = Duration.buildByMinutes(1);
288+
289+
290+
253291
/**
254292
* Event wrapper object used for each element of the {@link RingBuffer}.
255293
*/
@@ -422,57 +460,141 @@ public void stop() {
422460
if (!super.isStarted()) {
423461
return;
424462
}
463+
425464
/*
426465
* Don't allow any more events to be appended.
427466
*/
428467
super.stop();
468+
469+
470+
/*
471+
* Shutdown disruptor and executorService
472+
*/
473+
boolean errorDuringShutdown = false;
474+
long remainingTime = Math.max(0, getShutdownGracePeriod().getMilliseconds());
475+
long startTime = System.currentTimeMillis();
476+
429477
try {
430-
this.disruptor.shutdown(1, TimeUnit.MINUTES);
478+
this.disruptor.shutdown(remainingTime, TimeUnit.MILLISECONDS);
431479
} catch (TimeoutException e) {
432-
addWarn("Some queued events have not been logged due to requested shutdown");
480+
errorDuringShutdown = true;
433481
}
434482

435483
this.executorService.shutdown();
436484

437485
try {
438-
if (!this.executorService.awaitTermination(1, TimeUnit.MINUTES)) {
439-
addWarn("Some queued events have not been logged due to requested shutdown");
486+
remainingTime = Math.max(0, remainingTime - (System.currentTimeMillis() - startTime));
487+
if (!this.executorService.awaitTermination(remainingTime, TimeUnit.MILLISECONDS)) {
488+
errorDuringShutdown = true;
440489
}
441490
} catch (InterruptedException e) {
442-
addWarn("Some queued events have not been logged due to requested shutdown", e);
491+
errorDuringShutdown = true;
443492
}
493+
494+
if (errorDuringShutdown) {
495+
addWarn("Some queued events have not been logged due to requested shutdown");
496+
}
497+
498+
499+
/*
500+
* Notify listeners
501+
*/
444502
fireAppenderStopped();
445503
}
446504

505+
447506
@Override
448507
protected void append(Event event) {
449508
long startTime = System.nanoTime();
509+
450510
try {
451511
prepareForDeferredProcessing(event);
452512
} catch (RuntimeException e) {
453-
addWarn("Unable to prepare event for deferred processing. Event output might be missing data.", e);
513+
addWarn("Unable to prepare event for deferred processing. Event output might be missing data.", e);
454514
}
455515

456-
if (!this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event)) {
457-
long consecutiveDropped = this.consecutiveDroppedCount.incrementAndGet();
458-
if ((consecutiveDropped) % this.droppedWarnFrequency == 1) {
459-
addWarn("Dropped " + consecutiveDropped + " events (and counting...) due to ring buffer at max capacity [" + this.ringBufferSize + "]");
460-
}
461-
fireEventAppendFailed(event, RING_BUFFER_FULL_EXCEPTION);
462-
} else {
463-
long endTime = System.nanoTime();
516+
if (enqueueEvent(event)) {
517+
// Enqueue success - notify if we had errors previously
518+
//
464519
long consecutiveDropped = this.consecutiveDroppedCount.get();
465520
if (consecutiveDropped != 0 && this.consecutiveDroppedCount.compareAndSet(consecutiveDropped, 0L)) {
466521
addWarn("Dropped " + consecutiveDropped + " total events due to ring buffer at max capacity [" + this.ringBufferSize + "]");
467522
}
468-
fireEventAppended(event, endTime - startTime);
523+
524+
// Notify parties
525+
//
526+
fireEventAppended(event, System.nanoTime() - startTime);
527+
528+
} else {
529+
// Log a warning status about the failure
530+
//
531+
long consecutiveDropped = this.consecutiveDroppedCount.incrementAndGet();
532+
if ((consecutiveDropped % this.droppedWarnFrequency) == 1) {
533+
addWarn("Dropped " + consecutiveDropped + " events (and counting...) due to ring buffer at max capacity [" + this.ringBufferSize + "]");
534+
}
535+
536+
// Notify parties
537+
//
538+
fireEventAppendFailed(event, RING_BUFFER_FULL_EXCEPTION);
469539
}
470540
}
471541

472542
protected void prepareForDeferredProcessing(Event event) {
473543
event.prepareForDeferredProcessing();
474544
}
475545

546+
/**
547+
* Enqueue the given {@code event} in the ring buffer according to the configured {@link #asyncMode}.
548+
*
549+
* @param event the {@link Event} to enqueue
550+
* @return {@code true} when the even is successfully enqueued in the ring buffer
551+
*/
552+
protected boolean enqueueEvent(Event event) {
553+
if (this.asyncMode == AsyncMode.BLOCK) {
554+
return enqueueEventBlock(event);
555+
} else {
556+
return enqueueEventDrop(event);
557+
}
558+
}
559+
560+
/**
561+
* Enqueue the given {@code event} in the ring buffer, blocking until enough space
562+
* is available or the {@link #retryTimeout} expires (if configured).
563+
*
564+
* @param event the {@link Event} to enqueue
565+
* @return {@code true} when the even is successfully enqueued in the ring buffer
566+
*/
567+
private boolean enqueueEventBlock(Event event) {
568+
long timeout = this.retryTimeout.getMilliseconds() <= 0 ? Long.MAX_VALUE : System.currentTimeMillis() + this.retryTimeout.getMilliseconds();
569+
570+
while (isStarted() && !this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event)) {
571+
// Check for timeout
572+
//
573+
if (System.currentTimeMillis() >= timeout) {
574+
return false;
575+
}
576+
577+
// Wait before retry
578+
//
579+
long waitDuration = Math.min(this.retryMillis, System.currentTimeMillis() - timeout);
580+
if (waitDuration > 0) {
581+
LockSupport.parkNanos(waitDuration * 1_000_000L);
582+
}
583+
}
584+
585+
return true;
586+
}
587+
588+
/**
589+
* Attempt to enqueue the given {@code event} in the ring buffer without blocking. Drop the event
590+
* if the ring buffer is full.
591+
*
592+
* @param event the {@link Event} to enqueue
593+
* @return {@code true} when the even is successfully enqueued in the ring buffer
594+
*/
595+
private boolean enqueueEventDrop(Event event) {
596+
return this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event);
597+
}
476598

477599
protected String calculateThreadName() {
478600
List<Object> threadNameFormatParams = getThreadNameFormatParams();
@@ -581,6 +703,34 @@ public void setWaitStrategyType(String waitStrategyType) {
581703
setWaitStrategy(WaitStrategyFactory.createWaitStrategyFromString(waitStrategyType));
582704
}
583705

706+
public AsyncMode getAsyncMode() {
707+
return asyncMode;
708+
}
709+
public void setAsyncMode(AsyncMode asyncMode) {
710+
this.asyncMode = asyncMode;
711+
}
712+
713+
public long getRetryMillis() {
714+
return retryMillis;
715+
}
716+
public void setRetryMillis(long retryMillis) {
717+
this.retryMillis = retryMillis;
718+
}
719+
720+
public Duration getRetryTimeout() {
721+
return retryTimeout;
722+
}
723+
public void setRetryTimeout(Duration retryTimeout) {
724+
this.retryTimeout = Objects.requireNonNull(retryTimeout);
725+
}
726+
727+
public void setShutdownGracePeriod(Duration shutdownGracePeriod) {
728+
this.shutdownGracePeriod = Objects.requireNonNull(shutdownGracePeriod);
729+
}
730+
public Duration getShutdownGracePeriod() {
731+
return shutdownGracePeriod;
732+
}
733+
584734
public ThreadFactory getThreadFactory() {
585735
return threadFactory;
586736
}

‎src/test/java/net/logstash/logback/appender/AsyncDisruptorAppenderTest.java

+184-3
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,34 @@
1616
package net.logstash.logback.appender;
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.awaitility.Awaitility.await;
1920
import static org.mockito.ArgumentMatchers.any;
2021
import static org.mockito.ArgumentMatchers.anyBoolean;
2122
import static org.mockito.ArgumentMatchers.anyLong;
2223
import static org.mockito.ArgumentMatchers.eq;
2324
import static org.mockito.Mockito.doAnswer;
2425
import static org.mockito.Mockito.doThrow;
2526
import static org.mockito.Mockito.timeout;
27+
import static org.mockito.Mockito.times;
2628
import static org.mockito.Mockito.verify;
2729
import static org.mockito.Mockito.when;
2830

31+
import java.time.Duration;
32+
import java.util.ArrayList;
33+
import java.util.List;
2934
import java.util.concurrent.CountDownLatch;
35+
import java.util.concurrent.ExecutorService;
36+
import java.util.concurrent.Executors;
37+
import java.util.concurrent.Future;
3038
import java.util.concurrent.TimeUnit;
3139
import java.util.concurrent.atomic.AtomicReference;
3240

41+
import net.logstash.logback.appender.AsyncDisruptorAppender.AsyncMode;
3342
import net.logstash.logback.appender.AsyncDisruptorAppender.LogEvent;
3443
import net.logstash.logback.appender.listener.AppenderListener;
3544

3645
import ch.qos.logback.classic.spi.ILoggingEvent;
46+
import ch.qos.logback.core.BasicStatusManager;
3747
import ch.qos.logback.core.Context;
3848
import ch.qos.logback.core.status.Status;
3949
import ch.qos.logback.core.status.StatusManager;
@@ -46,6 +56,7 @@
4656
import org.mockito.ArgumentCaptor;
4757
import org.mockito.InjectMocks;
4858
import org.mockito.Mock;
59+
import org.mockito.Spy;
4960
import org.mockito.invocation.InvocationOnMock;
5061
import org.mockito.junit.jupiter.MockitoExtension;
5162
import org.mockito.stubbing.Answer;
@@ -64,8 +75,8 @@ public class AsyncDisruptorAppenderTest {
6475
@Mock(lenient = true)
6576
private Context context;
6677

67-
@Mock
68-
private StatusManager statusManager;
78+
@Spy
79+
private StatusManager statusManager = new BasicStatusManager();
6980

7081
@Mock
7182
private ILoggingEvent event1;
@@ -198,7 +209,6 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
198209
Assertions.assertTrue(statusCaptor.getValue().getMessage().startsWith("Dropped"));
199210

200211
eventHandlerWaiter.countDown();
201-
202212
}
203213

204214
@SuppressWarnings("unchecked")
@@ -220,4 +230,175 @@ public void testEventHandlerThrowsException() throws Exception {
220230

221231
}
222232

233+
/*
234+
* Appender is configured to block indefinitely when ring buffer is full
235+
*/
236+
@Test
237+
public void appendBlockingWhenFull() {
238+
CountDownLatch eventHandlerWaiter = new CountDownLatch(1);
239+
240+
try {
241+
TestEventHandler eventHandler = new TestEventHandler(eventHandlerWaiter);
242+
appender.setRingBufferSize(1);
243+
appender.setAsyncMode(AsyncMode.BLOCK);
244+
appender.setEventHandler(eventHandler);
245+
appender.start();
246+
247+
/*
248+
* First event blocks the ring buffer until eventHandlerWaiter is released
249+
*/
250+
appender.append(event1);
251+
await().until(() -> eventHandlerWaiter.getCount()==1); // wait until the handler is actually invoked before going any further
252+
253+
/*
254+
* Publishing the second event is blocked until the first is released (buffer full)
255+
*/
256+
Future<?> future = execute(() -> appender.append(event2));
257+
258+
/*
259+
* Release the handler -> both events are now unblocked
260+
*/
261+
eventHandlerWaiter.countDown();
262+
263+
await().untilAsserted(() -> assertThat(eventHandler.getEvents()).containsExactly(event1, event2));
264+
assertThat(future).isDone();
265+
assertThat(statusManager.getCopyOfStatusList()).isEmpty();
266+
267+
} finally {
268+
eventHandlerWaiter.countDown();
269+
}
270+
}
271+
272+
273+
/*
274+
* Appender configured in async "block" mode -> assert appending threads are blocked for the
275+
* configured timeout.
276+
*/
277+
@Test
278+
public void appendBlockingWithTimeout() throws Exception {
279+
// Block for the specified timeout
280+
final Duration timeout = Duration.ofMillis(150);
281+
282+
final CountDownLatch eventHandlerWaiter = new CountDownLatch(1);
283+
284+
try {
285+
TestEventHandler eventHandler = new TestEventHandler(eventHandlerWaiter);
286+
appender.setRingBufferSize(1);
287+
appender.setAsyncMode(AsyncMode.BLOCK);
288+
appender.setRetryTimeout(toLogback(timeout));
289+
appender.setEventHandler(eventHandler);
290+
appender.start();
291+
292+
/*
293+
* First event blocks the ring buffer until eventHandlerWaiter is released
294+
*/
295+
appender.append(event1);
296+
await().until(() -> eventHandlerWaiter.getCount()==1); // wait until the handler is actually invoked before going any further
297+
298+
299+
/*
300+
* Second event is blocked until the first is released (buffer full) - but no more than the configured timeout
301+
*/
302+
Future<?> future = execute(() -> appender.append(event2));
303+
304+
// wait for the timeout
305+
await().atLeast(timeout).and().atMost(timeout.plusMillis(100)).until(future::isDone);
306+
307+
// a WARN status is logged
308+
assertThat(statusManager.getCopyOfStatusList())
309+
.hasSize(1)
310+
.allMatch(s -> s.getMessage().startsWith("Dropped"));
311+
312+
// listeners invoked with appendFailed
313+
verify(listener).eventAppendFailed(eq(appender), eq(event2), any());
314+
315+
316+
/*
317+
* Unlock the handler and assert only the first event went through
318+
*/
319+
eventHandlerWaiter.countDown();
320+
await().untilAsserted( () -> assertThat(eventHandler.getEvents()).containsExactly(event1) );
321+
322+
} finally {
323+
eventHandlerWaiter.countDown();
324+
}
325+
}
326+
327+
328+
/*
329+
* Appender configured in async "block" mode -> assert threads blocked waiting for free space are
330+
* released when the appender is stopped
331+
*/
332+
@Test
333+
public void appendBlockingReleasedOnStop() {
334+
final CountDownLatch eventHandlerWaiter = new CountDownLatch(1);
335+
336+
try {
337+
TestEventHandler eventHandler = new TestEventHandler(eventHandlerWaiter);
338+
appender.setRingBufferSize(1);
339+
appender.setAsyncMode(AsyncMode.BLOCK);
340+
appender.setShutdownGracePeriod(toLogback(Duration.ofMillis(0))); // don't want to wait for inflight events...
341+
appender.setEventHandler(eventHandler);
342+
appender.start();
343+
344+
/*
345+
* First event will block the ring buffer until eventHandlerWaiter is released
346+
*/
347+
appender.append(event1);
348+
await().until(() -> eventHandlerWaiter.getCount()==1); // wait until the handler is actually invoked before going any further
349+
350+
/*
351+
* Publishing the second event is blocked until the first is released (buffer full)
352+
*/
353+
Future<?> future = execute(() -> appender.append(event2));
354+
355+
/*
356+
* Stop appender
357+
*/
358+
appender.stop();
359+
360+
// appending thread is released
361+
await().until(future::isDone);
362+
363+
// no events handled
364+
assertThat(eventHandler.getEvents()).isEmpty();
365+
366+
// no listener invoked
367+
verify(listener, times(0)).eventAppendFailed(eq(appender), eq(event2), any());
368+
369+
} finally {
370+
eventHandlerWaiter.countDown();
371+
}
372+
}
373+
374+
375+
private ExecutorService executorService = Executors.newCachedThreadPool();
376+
377+
private Future<?> execute(Runnable runnable) {
378+
return executorService.submit(runnable);
379+
}
380+
381+
private static class TestEventHandler implements EventHandler<LogEvent<ILoggingEvent>> {
382+
private final List<ILoggingEvent> events = new ArrayList<>();
383+
private final CountDownLatch waiter;
384+
385+
public TestEventHandler(CountDownLatch waiter) {
386+
this.waiter = waiter;
387+
}
388+
@Override
389+
public void onEvent(LogEvent<ILoggingEvent> event, long sequence, boolean endOfBatch) throws Exception {
390+
if (waiter != null ) {
391+
waiter.await();
392+
}
393+
this.events.add(event.event);
394+
}
395+
396+
public List<ILoggingEvent> getEvents() {
397+
return events;
398+
}
399+
}
400+
401+
private static ch.qos.logback.core.util.Duration toLogback(Duration duration) {
402+
return ch.qos.logback.core.util.Duration.buildByMilliseconds(duration.toMillis());
403+
}
223404
}

0 commit comments

Comments
 (0)
Please sign in to comment.