Skip to content

Commit fa81678

Browse files
authored
Merge pull request #472 from brenuart/gh461
Reduce memory allocations by writing directly into the output stream (#461)
2 parents ef9dced + 513da38 commit fa81678

12 files changed

+895
-260
lines changed

README.md

+4
Original file line numberDiff line numberDiff line change
@@ -1801,6 +1801,10 @@ The logstash-logback-encoder library contains many providers out-of-the-box,
18011801
and you can even plug-in your own by extending `JsonProvider`.
18021802
Each provider has its own configuration options to further customize it.
18031803

1804+
These encoders/layouts make use of an internal buffer to hold the JSON output during the rendering process.
1805+
The size of this buffer is set to `1024` bytes by default. A different size can be configured by setting the `minBufferSize` property to the desired value.
1806+
The buffer automatically grows above the `minBufferSize` when needed to accommodate with larger events. However, only the first `minBufferSize` bytes will be reused by subsequent invocations. It is therefore strongly advised to set the minimum size at least equal to the average size of the encoded events to reduce unnecessary memory allocations and reduce pressure on the garbage collector.
1807+
18041808

18051809
#### Providers for LoggingEvents
18061810

src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java

+20-5
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@
4848
import net.logstash.logback.appender.destination.PreferPrimaryDestinationConnectionStrategy;
4949
import net.logstash.logback.appender.listener.TcpAppenderListener;
5050
import net.logstash.logback.encoder.SeparatorParser;
51+
import net.logstash.logback.encoder.StreamingEncoder;
52+
53+
import com.lmax.disruptor.EventHandler;
54+
import com.lmax.disruptor.LifecycleAware;
55+
import com.lmax.disruptor.RingBuffer;
5156

5257
import ch.qos.logback.core.encoder.Encoder;
5358
import ch.qos.logback.core.joran.spi.DefaultClass;
@@ -59,10 +64,6 @@
5964
import ch.qos.logback.core.util.CloseUtil;
6065
import ch.qos.logback.core.util.Duration;
6166

62-
import com.lmax.disruptor.EventHandler;
63-
import com.lmax.disruptor.LifecycleAware;
64-
import com.lmax.disruptor.RingBuffer;
65-
6667
/**
6768
* An {@link AsyncDisruptorAppender} appender that writes
6869
* events to a TCP {@link Socket} outputStream.
@@ -584,7 +585,7 @@ private void writeEvent(Socket socket, OutputStream outputStream, LogEvent<Event
584585
* This is a standard (non-keepAlive) event.
585586
* Therefore, we need to send the event.
586587
*/
587-
outputStream.write(encoder.encode(logEvent.event));
588+
encode(logEvent.event, outputStream);
588589
} else if (hasKeepAliveDurationElapsed(lastSendEndNanoTime, startNanoTime)) {
589590
/*
590591
* This is a keep alive event, and the keepAliveDuration has passed,
@@ -612,6 +613,20 @@ private void writeEvent(Socket socket, OutputStream outputStream, LogEvent<Event
612613
}
613614
}
614615

616+
617+
@SuppressWarnings("unchecked")
618+
private void encode(Event event, OutputStream outputStream) throws IOException {
619+
if (encoder instanceof StreamingEncoder) {
620+
((StreamingEncoder<Event>) encoder).encode(event, outputStream);
621+
} else {
622+
byte[] data = encoder.encode(event);
623+
if (data != null) {
624+
outputStream.write(data);
625+
}
626+
}
627+
}
628+
629+
615630
private boolean hasKeepAliveDurationElapsed(long lastSentNanoTime, long currentNanoTime) {
616631
return isKeepAliveEnabled()
617632
&& lastSentNanoTime + TimeUnit.MILLISECONDS.toNanos(keepAliveDuration.getMilliseconds()) < currentNanoTime;

src/main/java/net/logstash/logback/composite/CompositeJsonFormatter.java

+37-71
Original file line numberDiff line numberDiff line change
@@ -16,29 +16,26 @@
1616
import java.io.IOException;
1717
import java.io.OutputStream;
1818
import java.io.Writer;
19-
import java.lang.ref.SoftReference;
2019
import java.util.ServiceConfigurationError;
2120

2221
import net.logstash.logback.decorate.JsonFactoryDecorator;
2322
import net.logstash.logback.decorate.JsonGeneratorDecorator;
2423
import net.logstash.logback.decorate.NullJsonFactoryDecorator;
2524
import net.logstash.logback.decorate.NullJsonGeneratorDecorator;
26-
import ch.qos.logback.access.spi.IAccessEvent;
27-
import ch.qos.logback.classic.spi.ILoggingEvent;
28-
import ch.qos.logback.core.spi.ContextAware;
29-
import ch.qos.logback.core.spi.ContextAwareBase;
30-
import ch.qos.logback.core.spi.DeferredProcessingAware;
31-
import ch.qos.logback.core.spi.LifeCycle;
3225

3326
import com.fasterxml.jackson.core.JsonEncoding;
3427
import com.fasterxml.jackson.core.JsonFactory;
3528
import com.fasterxml.jackson.core.JsonGenerator;
36-
import com.fasterxml.jackson.core.io.SegmentedStringWriter;
37-
import com.fasterxml.jackson.core.util.BufferRecycler;
38-
import com.fasterxml.jackson.core.util.ByteArrayBuilder;
3929
import com.fasterxml.jackson.databind.ObjectMapper;
4030
import com.fasterxml.jackson.databind.SerializationFeature;
4131

32+
import ch.qos.logback.access.spi.IAccessEvent;
33+
import ch.qos.logback.classic.spi.ILoggingEvent;
34+
import ch.qos.logback.core.spi.ContextAware;
35+
import ch.qos.logback.core.spi.ContextAwareBase;
36+
import ch.qos.logback.core.spi.DeferredProcessingAware;
37+
import ch.qos.logback.core.spi.LifeCycle;
38+
4239
/**
4340
* Formats logstash Events as JSON using {@link JsonProvider}s.
4441
* <p>
@@ -52,18 +49,6 @@
5249
public abstract class CompositeJsonFormatter<Event extends DeferredProcessingAware>
5350
extends ContextAwareBase implements LifeCycle {
5451

55-
/**
56-
* This <code>ThreadLocal</code> contains a {@link java.lang.ref.SoftReference}
57-
* to a {@link BufferRecycler} used to provide a low-cost
58-
* buffer recycling between writer instances.
59-
*/
60-
private final ThreadLocal<SoftReference<BufferRecycler>> recycler = new ThreadLocal<SoftReference<BufferRecycler>>() {
61-
protected SoftReference<BufferRecycler> initialValue() {
62-
final BufferRecycler bufferRecycler = new BufferRecycler();
63-
return new SoftReference<BufferRecycler>(bufferRecycler);
64-
}
65-
};
66-
6752
/**
6853
* Used to create the necessary {@link JsonGenerator}s for generating JSON.
6954
*/
@@ -134,35 +119,7 @@ private JsonFactory createJsonFactory() {
134119
}
135120
}
136121

137-
JsonFactory jsonFactory = objectMapper
138-
.getFactory()
139-
/*
140-
* When generators are flushed, don't flush the underlying outputStream.
141-
*
142-
* This allows some streaming optimizations when using an encoder.
143-
*
144-
* The encoder generally determines when the stream should be flushed
145-
* by an 'immediateFlush' property.
146-
*
147-
* The 'immediateFlush' property of the encoder can be set to false
148-
* when the appender performs the flushes at appropriate times
149-
* (such as the end of a batch in the AbstractLogstashTcpSocketAppender).
150-
*/
151-
.disable(JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM);
152-
153-
return this.jsonFactoryDecorator.decorate(jsonFactory);
154-
}
155-
156-
public byte[] writeEventAsBytes(Event event) throws IOException {
157-
ByteArrayBuilder outputStream = new ByteArrayBuilder(getBufferRecycler());
158-
159-
try {
160-
writeEventToOutputStream(event, outputStream);
161-
outputStream.flush();
162-
return outputStream.toByteArray();
163-
} finally {
164-
outputStream.release();
165-
}
122+
return this.jsonFactoryDecorator.decorate(objectMapper.getFactory());
166123
}
167124

168125
public void writeEventToOutputStream(Event event, OutputStream outputStream) throws IOException {
@@ -177,13 +134,9 @@ public void writeEventToOutputStream(Event event, OutputStream outputStream) thr
177134
*/
178135
}
179136

180-
public String writeEventAsString(Event event) throws IOException {
181-
SegmentedStringWriter writer = new SegmentedStringWriter(getBufferRecycler());
182-
137+
public void writeEventToWriter(Event event, Writer writer) throws IOException {
183138
try (JsonGenerator generator = createGenerator(writer)) {
184139
writeEventToGenerator(generator, event);
185-
writer.flush();
186-
return writer.getAndClear();
187140
}
188141
}
189142

@@ -203,23 +156,36 @@ protected void prepareForDeferredProcessing(Event event) {
203156
}
204157

205158
private JsonGenerator createGenerator(OutputStream outputStream) throws IOException {
206-
return this.jsonGeneratorDecorator.decorate(jsonFactory.createGenerator(outputStream, encoding));
159+
return decorateGenerator(jsonFactory.createGenerator(outputStream, encoding));
207160
}
208-
161+
209162
private JsonGenerator createGenerator(Writer writer) throws IOException {
210-
return this.jsonGeneratorDecorator.decorate(jsonFactory.createGenerator(writer));
211-
}
212-
213-
private BufferRecycler getBufferRecycler() {
214-
SoftReference<BufferRecycler> bufferRecyclerReference = recycler.get();
215-
BufferRecycler bufferRecycler = bufferRecyclerReference.get();
216-
if (bufferRecycler == null) {
217-
recycler.remove();
218-
return getBufferRecycler();
219-
}
220-
return bufferRecycler;
221-
}
222-
163+
return decorateGenerator(jsonFactory.createGenerator(writer));
164+
}
165+
166+
private JsonGenerator decorateGenerator(JsonGenerator generator) {
167+
return this.jsonGeneratorDecorator.decorate(generator)
168+
/*
169+
* When generators are flushed, don't flush the underlying outputStream.
170+
*
171+
* This allows some streaming optimizations when using an encoder.
172+
*
173+
* The encoder generally determines when the stream should be flushed
174+
* by an 'immediateFlush' property.
175+
*
176+
* The 'immediateFlush' property of the encoder can be set to false
177+
* when the appender performs the flushes at appropriate times
178+
* (such as the end of a batch in the AbstractLogstashTcpSocketAppender).
179+
*/
180+
.disable(JsonGenerator.Feature.FLUSH_PASSED_TO_STREAM)
181+
182+
/*
183+
* Don't let the json generator close the underlying outputStream and let the
184+
* encoder managed it.
185+
*/
186+
.disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
187+
}
188+
223189
public JsonFactory getJsonFactory() {
224190
return jsonFactory;
225191
}

0 commit comments

Comments
 (0)