Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit c02ab5f

Browse files
committedJul 8, 2021
First implementation of Async append blocking until an optional timeout
See logfellow#559
1 parent b1ec1db commit c02ab5f

File tree

3 files changed

+362
-26
lines changed

3 files changed

+362
-26
lines changed
 

‎pom.xml

+6
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,12 @@
211211
<version>${org.mockito.version}</version>
212212
<scope>test</scope>
213213
</dependency>
214+
<dependency>
215+
<groupId>org.awaitility</groupId>
216+
<artifactId>awaitility</artifactId>
217+
<version>4.1.0</version>
218+
<scope>test</scope>
219+
</dependency>
214220
</dependencies>
215221

216222
<build>

‎src/main/java/net/logstash/logback/appender/AsyncDisruptorAppender.java

+166-15
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,20 @@
1919
import java.util.Arrays;
2020
import java.util.Formatter;
2121
import java.util.List;
22+
import java.util.Objects;
2223
import java.util.concurrent.BlockingQueue;
2324
import java.util.concurrent.ScheduledExecutorService;
2425
import java.util.concurrent.ScheduledThreadPoolExecutor;
2526
import java.util.concurrent.ThreadFactory;
2627
import java.util.concurrent.TimeUnit;
2728
import java.util.concurrent.atomic.AtomicInteger;
2829
import java.util.concurrent.atomic.AtomicLong;
30+
import java.util.concurrent.locks.LockSupport;
2931

3032
import ch.qos.logback.core.status.OnConsoleStatusListener;
3133
import ch.qos.logback.core.status.Status;
34+
import ch.qos.logback.core.util.Duration;
35+
3236
import net.logstash.logback.appender.listener.AppenderListener;
3337
import ch.qos.logback.access.spi.IAccessEvent;
3438
import ch.qos.logback.classic.AsyncAppender;
@@ -250,6 +254,41 @@ public abstract class AsyncDisruptorAppender<Event extends DeferredProcessingAwa
250254
*/
251255
protected final List<Listener> listeners = new ArrayList<>();
252256

257+
public enum AsyncMode {
258+
/**
259+
* Appender thread is blocked until space is available in the ring buffer
260+
* or the retry timeout expires.
261+
*/
262+
BLOCK,
263+
264+
/**
265+
* Event is dropped when the ring buffer is full
266+
*/
267+
DROP
268+
}
269+
private AsyncMode asyncMode = AsyncMode.DROP;
270+
271+
/**
272+
* Delay (in millis) between consecutive attempts to append an event in the ring buffer when full.
273+
* Applicable only when {@link #asyncMode} is set to {@link AsyncMode#DROP}.
274+
*/
275+
private long retryMillis = 100;
276+
277+
/**
278+
* Maximum time to wait for space in the ring buffer before dropping the event.
279+
* Applicable only when {@link #asyncMode} is set to {@link AsyncMode#DROP}.
280+
*
281+
* <p>Use {@code -1} for no timeout, i.e. block until space is available.
282+
*/
283+
private Duration retryTimeout = Duration.buildByMilliseconds(1000);
284+
285+
/**
286+
* How long to wait for in-flight events during shutdown.
287+
*/
288+
private Duration shutdownGracePeriod = Duration.buildByMinutes(1);
289+
290+
291+
253292
/**
254293
* Event wrapper object used for each element of the {@link RingBuffer}.
255294
*/
@@ -422,57 +461,141 @@ public void stop() {
422461
if (!super.isStarted()) {
423462
return;
424463
}
464+
425465
/*
426466
* Don't allow any more events to be appended.
427467
*/
428468
super.stop();
469+
470+
471+
/*
472+
* Shutdown disruptor and executorService
473+
*/
474+
boolean errorDuringShutdown = false;
475+
long remainingTime = Math.max(0, getShutdownGracePeriod().getMilliseconds());
476+
long startTime = System.currentTimeMillis();
477+
429478
try {
430-
this.disruptor.shutdown(1, TimeUnit.MINUTES);
479+
this.disruptor.shutdown(remainingTime, TimeUnit.MILLISECONDS);
431480
} catch (TimeoutException e) {
432-
addWarn("Some queued events have not been logged due to requested shutdown");
481+
errorDuringShutdown = true;
433482
}
434483

435484
this.executorService.shutdown();
436485

437486
try {
438-
if (!this.executorService.awaitTermination(1, TimeUnit.MINUTES)) {
439-
addWarn("Some queued events have not been logged due to requested shutdown");
487+
remainingTime = Math.max(0, remainingTime - (System.currentTimeMillis() - startTime));
488+
if (!this.executorService.awaitTermination(remainingTime, TimeUnit.MILLISECONDS)) {
489+
errorDuringShutdown = true;
440490
}
441491
} catch (InterruptedException e) {
442-
addWarn("Some queued events have not been logged due to requested shutdown", e);
492+
errorDuringShutdown = true;
443493
}
494+
495+
if (errorDuringShutdown) {
496+
addWarn("Some queued events have not been logged due to requested shutdown");
497+
}
498+
499+
500+
/*
501+
* Notify listeners
502+
*/
444503
fireAppenderStopped();
445504
}
446505

506+
447507
@Override
448508
protected void append(Event event) {
449509
long startTime = System.nanoTime();
510+
450511
try {
451512
prepareForDeferredProcessing(event);
452513
} catch (RuntimeException e) {
453-
addWarn("Unable to prepare event for deferred processing. Event output might be missing data.", e);
514+
addWarn("Unable to prepare event for deferred processing. Event output might be missing data.", e);
454515
}
455516

456-
if (!this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event)) {
457-
long consecutiveDropped = this.consecutiveDroppedCount.incrementAndGet();
458-
if ((consecutiveDropped) % this.droppedWarnFrequency == 1) {
459-
addWarn("Dropped " + consecutiveDropped + " events (and counting...) due to ring buffer at max capacity [" + this.ringBufferSize + "]");
460-
}
461-
fireEventAppendFailed(event, RING_BUFFER_FULL_EXCEPTION);
462-
} else {
463-
long endTime = System.nanoTime();
517+
if (enqueueEvent(event)) {
518+
// Enqueue success - notify if we had errors previously
519+
//
464520
long consecutiveDropped = this.consecutiveDroppedCount.get();
465521
if (consecutiveDropped != 0 && this.consecutiveDroppedCount.compareAndSet(consecutiveDropped, 0L)) {
466522
addWarn("Dropped " + consecutiveDropped + " total events due to ring buffer at max capacity [" + this.ringBufferSize + "]");
467523
}
468-
fireEventAppended(event, endTime - startTime);
524+
525+
// Notify parties
526+
//
527+
fireEventAppended(event, System.nanoTime() - startTime);
528+
529+
} else {
530+
// Log a warning status about the failure
531+
//
532+
long consecutiveDropped = this.consecutiveDroppedCount.incrementAndGet();
533+
if ((consecutiveDropped % this.droppedWarnFrequency) == 1) {
534+
addWarn("Dropped " + consecutiveDropped + " events (and counting...) due to ring buffer at max capacity [" + this.ringBufferSize + "]");
535+
}
536+
537+
// Notify parties
538+
//
539+
fireEventAppendFailed(event, RING_BUFFER_FULL_EXCEPTION);
469540
}
470541
}
471542

472543
protected void prepareForDeferredProcessing(Event event) {
473544
event.prepareForDeferredProcessing();
474545
}
475546

547+
/**
548+
* Enqueue the given {@code event} in the ring buffer according to the configured {@link #asyncMode}.
549+
*
550+
* @param event the {@link Event} to enqueue
551+
* @return {@code true} when the even is successfully enqueued in the ring buffer
552+
*/
553+
protected boolean enqueueEvent(Event event) {
554+
if (this.asyncMode == AsyncMode.BLOCK) {
555+
return enqueueEventBlock(event);
556+
} else {
557+
return enqueueEventDrop(event);
558+
}
559+
}
560+
561+
/**
562+
* Enqueue the given {@code event} in the ring buffer, blocking until enough space
563+
* is available or the {@link #retryTimeout} expires (if configured).
564+
*
565+
* @param event the {@link Event} to enqueue
566+
* @return {@code true} when the even is successfully enqueued in the ring buffer
567+
*/
568+
private boolean enqueueEventBlock(Event event) {
569+
long timeout = this.retryTimeout.getMilliseconds() <= 0 ? Long.MAX_VALUE : System.currentTimeMillis() + this.retryTimeout.getMilliseconds();
570+
571+
while (isStarted() && !this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event)) {
572+
// Check for timeout
573+
//
574+
if (System.currentTimeMillis() >= timeout) {
575+
return false;
576+
}
577+
578+
// Wait before retry
579+
//
580+
long waitDuration = Math.min(this.retryMillis, System.currentTimeMillis() - timeout);
581+
if (waitDuration > 0) {
582+
LockSupport.parkNanos(waitDuration * 1_000_000L);
583+
}
584+
}
585+
586+
return true;
587+
}
588+
589+
/**
590+
* Attempt to enqueue the given {@code event} in the ring buffer without blocking. Drop the event
591+
* if the ring buffer is full.
592+
*
593+
* @param event the {@link Event} to enqueue
594+
* @return {@code true} when the even is successfully enqueued in the ring buffer
595+
*/
596+
private boolean enqueueEventDrop(Event event) {
597+
return this.disruptor.getRingBuffer().tryPublishEvent(this.eventTranslator, event);
598+
}
476599

477600
protected String calculateThreadName() {
478601
List<Object> threadNameFormatParams = getThreadNameFormatParams();
@@ -581,6 +704,34 @@ public void setWaitStrategyType(String waitStrategyType) {
581704
setWaitStrategy(WaitStrategyFactory.createWaitStrategyFromString(waitStrategyType));
582705
}
583706

707+
public AsyncMode getAsyncMode() {
708+
return asyncMode;
709+
}
710+
public void setAsyncMode(AsyncMode asyncMode) {
711+
this.asyncMode = asyncMode;
712+
}
713+
714+
public long getRetryMillis() {
715+
return retryMillis;
716+
}
717+
public void setRetryMillis(long retryMillis) {
718+
this.retryMillis = retryMillis;
719+
}
720+
721+
public Duration getRetryTimeout() {
722+
return retryTimeout;
723+
}
724+
public void setRetryTimeout(Duration retryTimeout) {
725+
this.retryTimeout = Objects.requireNonNull(retryTimeout);
726+
}
727+
728+
public void setShutdownGracePeriod(Duration shutdownGracePeriod) {
729+
this.shutdownGracePeriod = Objects.requireNonNull(shutdownGracePeriod);
730+
}
731+
public Duration getShutdownGracePeriod() {
732+
return shutdownGracePeriod;
733+
}
734+
584735
public ThreadFactory getThreadFactory() {
585736
return threadFactory;
586737
}

0 commit comments

Comments
 (0)
Please sign in to comment.