Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a ThreadLocal approach instead of an ObjectPool #670

Merged
merged 4 commits into from
Oct 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import net.logstash.logback.decorate.JsonGeneratorDecorator;
import net.logstash.logback.decorate.NullJsonFactoryDecorator;
import net.logstash.logback.decorate.NullJsonGeneratorDecorator;
import net.logstash.logback.util.ObjectPool;
import net.logstash.logback.util.ProxyOutputStream;
import net.logstash.logback.util.ThreadLocalHolder;

import ch.qos.logback.access.spi.IAccessEvent;
import ch.qos.logback.classic.spi.ILoggingEvent;
Expand All @@ -50,10 +50,7 @@
* and then ends the JSON object ('}').
*
* <p>Jackson {@link JsonGenerator} are initially created with a "disconnected" output stream so they can be
* reused multiple times with different target output stream. They are kept in an internal pool whose
* size is technically unbounded. It will however never hold more entries than the number of concurrent
* threads accessing it. Entries are kept in the pool using soft references so they can be garbage
* collected by the JVM when running low in memory.
* reused multiple times with different target output stream.
*
* <p>{@link JsonGenerator} instances are *not* reused after they threw an exception. This is to prevent
* reusing an instance whose internal state may be unpredictable.
Expand Down Expand Up @@ -93,7 +90,7 @@ public abstract class AbstractCompositeJsonFormatter<Event extends DeferredProce

private volatile boolean started;

private ObjectPool<JsonFormatter> pool;
private ThreadLocalHolder<JsonFormatter> threadLocalJsonFormatter;


public AbstractCompositeJsonFormatter(ContextAware declaredOrigin) {
Expand All @@ -119,14 +116,14 @@ public void start() {
jsonProviders.setJsonFactory(jsonFactory);
jsonProviders.start();

pool = new ObjectPool<>(this::createJsonFormatter);
threadLocalJsonFormatter = new ThreadLocalHolder<>(this::createJsonFormatter);
started = true;
}

@Override
public void stop() {
if (isStarted()) {
pool.clear();
threadLocalJsonFormatter.close();
jsonProviders.stop();
jsonFactory = null;
started = false;
Expand All @@ -152,7 +149,7 @@ public void writeEvent(Event event, OutputStream outputStream) throws IOExceptio
throw new IllegalStateException("Formatter is not started");
}

try (JsonFormatter formatter = this.pool.acquire()) {
try (JsonFormatter formatter = this.threadLocalJsonFormatter.acquire()) {
formatter.writeEvent(outputStream, event);
}
}
Expand All @@ -174,7 +171,7 @@ private JsonFormatter createJsonFormatter() {

}

private class JsonFormatter implements ObjectPool.Lifecycle, Closeable {
private class JsonFormatter implements ThreadLocalHolder.Lifecycle, Closeable {
private final JsonGenerator generator;
private final DisconnectedOutputStream stream;
private boolean recyclable = true;
Expand Down Expand Up @@ -206,11 +203,18 @@ public boolean recycle() {
@Override
public void dispose() {
CloseUtil.closeQuietly(this.generator);

// Note:
// The stream is disconnected at this point.
// Closing the JsonGenerator may throw additional exception if it is flagged as not recyclable,
// meaning it already threw a exception earlier during the writeEvent() method. The generator
// is disposed here and won't be reused anymore - we can safely ignore these new exceptions
// here.
}

@Override
public void close() throws IOException {
AbstractCompositeJsonFormatter.this.pool.release(this);
AbstractCompositeJsonFormatter.this.threadLocalJsonFormatter.release();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
public class JsonProviders<Event extends DeferredProcessingAware> implements JsonFactoryAware {

private final List<JsonProvider<Event>> jsonProviders = new ArrayList<>();

public void start() {
for (JsonProvider<Event> jsonProvider : jsonProviders) {
jsonProvider.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import net.logstash.logback.decorate.JsonFactoryDecorator;
import net.logstash.logback.decorate.JsonGeneratorDecorator;
import net.logstash.logback.util.ReusableByteBuffer;
import net.logstash.logback.util.ReusableByteBufferPool;
import net.logstash.logback.util.ThreadLocalReusableByteBuffer;

import ch.qos.logback.core.encoder.Encoder;
import ch.qos.logback.core.encoder.EncoderBase;
Expand All @@ -49,9 +49,9 @@ public abstract class CompositeJsonEncoder<Event extends DeferredProcessingAware
private int minBufferSize = 1024;

/**
* Pool of reusable byte buffers used when calling {@link #encode(DeferredProcessingAware)}
* Per-thread {@link ReusableByteBuffer} instance used when calling {@link #encode(DeferredProcessingAware)}
*/
private ReusableByteBufferPool bufferPool;
private ThreadLocalReusableByteBuffer threadLocalBuffer;

private Encoder<Event> prefix;
private Encoder<Event> suffix;
Expand Down Expand Up @@ -86,7 +86,7 @@ public byte[] encode(Event event) {
throw new IllegalStateException("Encoder is not started");
}

ReusableByteBuffer buffer = bufferPool.acquire();
ReusableByteBuffer buffer = threadLocalBuffer.acquire();

try {
encode(buffer, event);
Expand All @@ -97,7 +97,7 @@ public byte[] encode(Event event) {
return EMPTY_BYTES;

} finally {
bufferPool.release(buffer);
threadLocalBuffer.release();
}
}

Expand Down Expand Up @@ -134,7 +134,7 @@ public void start() {
startWrapped(prefix);
startWrapped(suffix);

this.bufferPool = ReusableByteBufferPool.create(minBufferSize);
this.threadLocalBuffer = new ThreadLocalReusableByteBuffer(minBufferSize);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand Down Expand Up @@ -178,7 +178,7 @@ public void stop() {
stopWrapped(prefix);
stopWrapped(suffix);

bufferPool = null;
threadLocalBuffer = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import net.logstash.logback.encoder.CompositeJsonEncoder;
import net.logstash.logback.encoder.SeparatorParser;
import net.logstash.logback.util.ReusableByteBuffer;
import net.logstash.logback.util.ReusableByteBufferPool;
import net.logstash.logback.util.ThreadLocalReusableByteBuffer;

import ch.qos.logback.core.Layout;
import ch.qos.logback.core.LayoutBase;
Expand Down Expand Up @@ -64,9 +64,9 @@ public abstract class CompositeJsonLayout<Event extends DeferredProcessingAware>
private int minBufferSize = 1024;

/**
* Pool of reusable byte buffers
* Per-thread {@link ReusableByteBuffer}
*/
private ReusableByteBufferPool bufferPool;
private ThreadLocalReusableByteBuffer threadLocalBuffer;

private final AbstractCompositeJsonFormatter<Event> formatter;

Expand All @@ -83,7 +83,7 @@ public String doLayout(Event event) {
throw new IllegalStateException("Layout is not started");
}

ReusableByteBuffer buffer = bufferPool.acquire();
ReusableByteBuffer buffer = threadLocalBuffer.acquire();
try {
writeEvent(buffer, event);
return new String(buffer.toByteArray());
Expand All @@ -93,7 +93,7 @@ public String doLayout(Event event) {
return null;

} finally {
bufferPool.release(buffer);
threadLocalBuffer.release();
}
}

Expand Down Expand Up @@ -136,7 +136,7 @@ public void start() {
startWrapped(prefix);
startWrapped(suffix);

this.bufferPool = ReusableByteBufferPool.create(minBufferSize);
this.threadLocalBuffer = new ThreadLocalReusableByteBuffer(minBufferSize);
}

private void startWrapped(Layout<Event> wrapped) {
Expand Down Expand Up @@ -170,7 +170,7 @@ public void stop() {
stopWrapped(prefix);
stopWrapped(suffix);

this.bufferPool = null;
this.threadLocalBuffer = null;
}

private void stopWrapped(Layout<Event> wrapped) {
Expand Down
Loading