Skip to content

Commit feefdb4

Browse files
committed
Track threads and allocated values
Track threads and allocated values. Dispose value when a thread dies. Dispose allocated values on close()
1 parent 306db61 commit feefdb4

12 files changed

+818
-410
lines changed

src/main/java/net/logstash/logback/composite/AbstractCompositeJsonFormatter.java

+14-7
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
import net.logstash.logback.decorate.JsonGeneratorDecorator;
2626
import net.logstash.logback.decorate.NullJsonFactoryDecorator;
2727
import net.logstash.logback.decorate.NullJsonGeneratorDecorator;
28-
import net.logstash.logback.util.ObjectPool;
2928
import net.logstash.logback.util.ProxyOutputStream;
29+
import net.logstash.logback.util.ThreadLocalHolder;
3030

3131
import ch.qos.logback.access.spi.IAccessEvent;
3232
import ch.qos.logback.classic.spi.ILoggingEvent;
@@ -93,7 +93,7 @@ public abstract class AbstractCompositeJsonFormatter<Event extends DeferredProce
9393

9494
private volatile boolean started;
9595

96-
private ObjectPool<JsonFormatter> pool;
96+
private ThreadLocalHolder<JsonFormatter> threadLocalJsonFormatter;
9797

9898

9999
public AbstractCompositeJsonFormatter(ContextAware declaredOrigin) {
@@ -119,14 +119,14 @@ public void start() {
119119
jsonProviders.setJsonFactory(jsonFactory);
120120
jsonProviders.start();
121121

122-
pool = new ObjectPool<>(this::createJsonFormatter);
122+
threadLocalJsonFormatter = new ThreadLocalHolder<>(this::createJsonFormatter);
123123
started = true;
124124
}
125125

126126
@Override
127127
public void stop() {
128128
if (isStarted()) {
129-
pool.clear();
129+
threadLocalJsonFormatter.close();
130130
jsonProviders.stop();
131131
jsonFactory = null;
132132
started = false;
@@ -152,7 +152,7 @@ public void writeEvent(Event event, OutputStream outputStream) throws IOExceptio
152152
throw new IllegalStateException("Formatter is not started");
153153
}
154154

155-
try (JsonFormatter formatter = this.pool.acquire()) {
155+
try (JsonFormatter formatter = this.threadLocalJsonFormatter.acquire()) {
156156
formatter.writeEvent(outputStream, event);
157157
}
158158
}
@@ -174,7 +174,7 @@ private JsonFormatter createJsonFormatter() {
174174

175175
}
176176

177-
private class JsonFormatter implements ObjectPool.Lifecycle, Closeable {
177+
private class JsonFormatter implements ThreadLocalHolder.Lifecycle, Closeable {
178178
private final JsonGenerator generator;
179179
private final DisconnectedOutputStream stream;
180180
private boolean recyclable = true;
@@ -206,11 +206,18 @@ public boolean recycle() {
206206
@Override
207207
public void dispose() {
208208
CloseUtil.closeQuietly(this.generator);
209+
210+
// Note:
211+
// The stream is disconnected at this point.
212+
// Closing the JsonGenerator may throw additional exception if it is flagged as not recyclable,
213+
// meaning it already threw a exception earlier during the writeEvent() method. The generator
214+
// is disposed here and won't be reused anymore - we can safely ignore these new exceptions
215+
// here.
209216
}
210217

211218
@Override
212219
public void close() throws IOException {
213-
AbstractCompositeJsonFormatter.this.pool.release(this);
220+
AbstractCompositeJsonFormatter.this.threadLocalJsonFormatter.release();
214221
}
215222
}
216223

src/main/java/net/logstash/logback/composite/JsonProviders.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@
4141
*/
4242
public class JsonProviders<Event extends DeferredProcessingAware> implements JsonFactoryAware {
4343

44-
private final List<JsonProvider<Event>> jsonProviders = new ArrayList<>();
45-
44+
private final ArrayList<JsonProvider<Event>> jsonProviders = new ArrayList<>();
45+
4646
public void start() {
4747
for (JsonProvider<Event> jsonProvider : jsonProviders) {
4848
jsonProvider.start();

src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import net.logstash.logback.decorate.JsonFactoryDecorator;
2626
import net.logstash.logback.decorate.JsonGeneratorDecorator;
2727
import net.logstash.logback.util.ReusableByteBuffer;
28-
import net.logstash.logback.util.ReusableByteBufferPool;
28+
import net.logstash.logback.util.ThreadLocalReusableByteBuffer;
2929

3030
import ch.qos.logback.core.encoder.Encoder;
3131
import ch.qos.logback.core.encoder.EncoderBase;
@@ -49,9 +49,9 @@ public abstract class CompositeJsonEncoder<Event extends DeferredProcessingAware
4949
private int minBufferSize = 1024;
5050

5151
/**
52-
* Pool of reusable byte buffers used when calling {@link #encode(DeferredProcessingAware)}
52+
* Per-thread {@link ReusableByteBuffer} instance used when calling {@link #encode(DeferredProcessingAware)}
5353
*/
54-
private ReusableByteBufferPool bufferPool;
54+
private ThreadLocalReusableByteBuffer threadLocalBuffer;
5555

5656
private Encoder<Event> prefix;
5757
private Encoder<Event> suffix;
@@ -86,7 +86,7 @@ public byte[] encode(Event event) {
8686
throw new IllegalStateException("Encoder is not started");
8787
}
8888

89-
ReusableByteBuffer buffer = bufferPool.acquire();
89+
ReusableByteBuffer buffer = threadLocalBuffer.acquire();
9090

9191
try {
9292
encode(buffer, event);
@@ -97,7 +97,7 @@ public byte[] encode(Event event) {
9797
return EMPTY_BYTES;
9898

9999
} finally {
100-
bufferPool.release(buffer);
100+
threadLocalBuffer.release();
101101
}
102102
}
103103

@@ -134,7 +134,7 @@ public void start() {
134134
startWrapped(prefix);
135135
startWrapped(suffix);
136136

137-
this.bufferPool = ReusableByteBufferPool.create(minBufferSize);
137+
this.threadLocalBuffer = new ThreadLocalReusableByteBuffer(minBufferSize);
138138
}
139139

140140
@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -178,7 +178,7 @@ public void stop() {
178178
stopWrapped(prefix);
179179
stopWrapped(suffix);
180180

181-
bufferPool = null;
181+
threadLocalBuffer = null;
182182
}
183183
}
184184

src/main/java/net/logstash/logback/layout/CompositeJsonLayout.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import net.logstash.logback.encoder.CompositeJsonEncoder;
2929
import net.logstash.logback.encoder.SeparatorParser;
3030
import net.logstash.logback.util.ReusableByteBuffer;
31-
import net.logstash.logback.util.ReusableByteBufferPool;
31+
import net.logstash.logback.util.ThreadLocalReusableByteBuffer;
3232

3333
import ch.qos.logback.core.Layout;
3434
import ch.qos.logback.core.LayoutBase;
@@ -64,9 +64,9 @@ public abstract class CompositeJsonLayout<Event extends DeferredProcessingAware>
6464
private int minBufferSize = 1024;
6565

6666
/**
67-
* Pool of reusable byte buffers
67+
* Per-thread {@link ReusableByteBuffer}
6868
*/
69-
private ReusableByteBufferPool bufferPool;
69+
private ThreadLocalReusableByteBuffer threadLocalBuffer;
7070

7171
private final AbstractCompositeJsonFormatter<Event> formatter;
7272

@@ -83,7 +83,7 @@ public String doLayout(Event event) {
8383
throw new IllegalStateException("Layout is not started");
8484
}
8585

86-
ReusableByteBuffer buffer = bufferPool.acquire();
86+
ReusableByteBuffer buffer = threadLocalBuffer.acquire();
8787
try {
8888
writeEvent(buffer, event);
8989
return new String(buffer.toByteArray());
@@ -93,7 +93,7 @@ public String doLayout(Event event) {
9393
return null;
9494

9595
} finally {
96-
bufferPool.release(buffer);
96+
threadLocalBuffer.release();
9797
}
9898
}
9999

@@ -136,7 +136,7 @@ public void start() {
136136
startWrapped(prefix);
137137
startWrapped(suffix);
138138

139-
this.bufferPool = ReusableByteBufferPool.create(minBufferSize);
139+
this.threadLocalBuffer = new ThreadLocalReusableByteBuffer(minBufferSize);
140140
}
141141

142142
private void startWrapped(Layout<Event> wrapped) {
@@ -170,7 +170,7 @@ public void stop() {
170170
stopWrapped(prefix);
171171
stopWrapped(suffix);
172172

173-
this.bufferPool = null;
173+
this.threadLocalBuffer = null;
174174
}
175175

176176
private void stopWrapped(Layout<Event> wrapped) {

0 commit comments

Comments
 (0)