Skip to content

Commit 5b3d0c5

Browse files
committed
Reduce GC pressure by writing directly into the outpustream instead of returning a byte array
Introduce a new (internal) StreamingEncoder interface to be implemented by Encoders that supports writing directly into the output stream instead of returning their results as a byte array. Update both the AbstractLogstashTcpSocketAppender and the CompositeJsonEncoder to support this new interface. This should hoppefully reduce the amount of short-lived byte arrays created for each log event. See logfellow#461 for more information.
1 parent 3fdc998 commit 5b3d0c5

File tree

4 files changed

+252
-116
lines changed

4 files changed

+252
-116
lines changed

src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java

+23-6
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@
4848
import net.logstash.logback.appender.destination.PreferPrimaryDestinationConnectionStrategy;
4949
import net.logstash.logback.appender.listener.TcpAppenderListener;
5050
import net.logstash.logback.encoder.SeparatorParser;
51+
import net.logstash.logback.encoder.StreamingEncoder;
52+
53+
import com.lmax.disruptor.EventHandler;
54+
import com.lmax.disruptor.LifecycleAware;
55+
import com.lmax.disruptor.RingBuffer;
5156

5257
import ch.qos.logback.core.encoder.Encoder;
5358
import ch.qos.logback.core.joran.spi.DefaultClass;
@@ -59,10 +64,6 @@
5964
import ch.qos.logback.core.util.CloseUtil;
6065
import ch.qos.logback.core.util.Duration;
6166

62-
import com.lmax.disruptor.EventHandler;
63-
import com.lmax.disruptor.LifecycleAware;
64-
import com.lmax.disruptor.RingBuffer;
65-
6667
/**
6768
* An {@link AsyncDisruptorAppender} appender that writes
6869
* events to a TCP {@link Socket} outputStream.
@@ -594,8 +595,9 @@ private void writeEvent(Socket socket, OutputStream outputStream, LogEvent<Event
594595
* This is a standard (non-keepAlive) event.
595596
* Therefore, we need to send the event.
596597
*/
597-
outputStream.write(encoder.encode(logEvent.event));
598-
} else if (hasKeepAliveDurationElapsed(lastSendEndNanoTime, startNanoTime)){
598+
encode(logEvent.event, outputStream);
599+
}
600+
else if (hasKeepAliveDurationElapsed(lastSendEndNanoTime, startNanoTime)){
599601
/*
600602
* This is a keep alive event, and the keepAliveDuration has passed,
601603
* Therefore, we need to send the keepAliveMessage.
@@ -622,6 +624,21 @@ private void writeEvent(Socket socket, OutputStream outputStream, LogEvent<Event
622624
}
623625
}
624626

627+
628+
@SuppressWarnings("unchecked")
629+
private void encode(Event event, OutputStream outputStream) throws IOException {
630+
if (encoder instanceof StreamingEncoder) {
631+
((StreamingEncoder<Event>)encoder).encode(event, outputStream);
632+
}
633+
else {
634+
byte[] data = encoder.encode(event);
635+
if (data!=null) {
636+
outputStream.write(data);
637+
}
638+
}
639+
}
640+
641+
625642
private boolean hasKeepAliveDurationElapsed(long lastSentNanoTime, long currentNanoTime) {
626643
return isKeepAliveEnabled()
627644
&& lastSentNanoTime + TimeUnit.MILLISECONDS.toNanos(keepAliveDuration.getMilliseconds()) < currentNanoTime;

src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java

+39-41
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,22 @@
1515

1616
import java.io.ByteArrayOutputStream;
1717
import java.io.IOException;
18+
import java.io.OutputStream;
1819
import java.nio.charset.Charset;
1920

2021
import net.logstash.logback.composite.CompositeJsonFormatter;
2122
import net.logstash.logback.composite.JsonProviders;
2223
import net.logstash.logback.decorate.JsonFactoryDecorator;
2324
import net.logstash.logback.decorate.JsonGeneratorDecorator;
25+
2426
import ch.qos.logback.core.encoder.Encoder;
2527
import ch.qos.logback.core.encoder.EncoderBase;
2628
import ch.qos.logback.core.encoder.LayoutWrappingEncoder;
2729
import ch.qos.logback.core.pattern.PatternLayoutBase;
2830
import ch.qos.logback.core.spi.DeferredProcessingAware;
2931

3032
public abstract class CompositeJsonEncoder<Event extends DeferredProcessingAware>
31-
extends EncoderBase<Event> {
33+
extends EncoderBase<Event> implements StreamingEncoder<Event> {
3234

3335
private static final byte[] EMPTY_BYTES = new byte[0];
3436

@@ -60,54 +62,48 @@ public CompositeJsonEncoder() {
6062
protected abstract CompositeJsonFormatter<Event> createFormatter();
6163

6264
@Override
63-
public byte[] encode(Event event) {
65+
public void encode(Event event, OutputStream outputStream) throws IOException {
6466
if (!isStarted()) {
6567
throw new IllegalStateException("Encoder is not started");
6668
}
67-
byte[] prefixBytes = doEncodeWrappedToBytes(prefix, event);
68-
byte[] suffixBytes = doEncodeWrappedToBytes(suffix, event);
69+
if (!isStarted()) {
70+
throw new IllegalStateException("Encoder is not started.");
71+
}
6972

70-
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(
71-
minBufferSize
72-
+ (prefixBytes == null ? 0 : prefixBytes.length)
73-
+ (suffixBytes == null ? 0 : suffixBytes.length)
74-
+ lineSeparatorBytes.length);
75-
try {
76-
if (prefixBytes != null) {
77-
outputStream.write(prefixBytes);
78-
}
79-
80-
formatter.writeEventToOutputStream(event, outputStream);
81-
82-
if (suffixBytes != null) {
83-
outputStream.write(suffixBytes);
84-
}
85-
86-
outputStream.write(lineSeparatorBytes);
87-
73+
encode(prefix, event, outputStream);
74+
formatter.writeEventToOutputStream(event, outputStream);
75+
encode(suffix, event, outputStream);
76+
77+
outputStream.write(lineSeparatorBytes);
78+
}
79+
80+
@Override
81+
public byte[] encode(Event event) {
82+
try(ByteArrayOutputStream outputStream = new ByteArrayOutputStream(getMinBufferSize())) {
83+
encode(event, outputStream);
8884
return outputStream.toByteArray();
89-
} catch (IOException e) {
90-
addWarn("Error encountered while encoding log event. "
91-
+ "Event: " + event, e);
92-
return EMPTY_BYTES;
93-
} finally {
94-
try {
95-
outputStream.close();
96-
} catch (IOException e) {
97-
throw new RuntimeException(e);
98-
}
9985
}
86+
catch (IOException e) {
87+
addWarn("Error encountered while encoding log event. Event: " + event, e);
88+
return EMPTY_BYTES;
89+
}
10090
}
10191

102-
private byte[] doEncodeWrappedToBytes(Encoder<Event> wrapped, Event event) {
103-
if (wrapped != null) {
104-
return wrapped.encode(event);
92+
private void encode(Encoder<Event> encoder, Event event, OutputStream outputStream) throws IOException {
93+
if (encoder!=null) {
94+
byte[] data = encoder.encode(event);
95+
if (data!=null) {
96+
outputStream.write(data);
97+
}
10598
}
106-
return EMPTY_BYTES;
10799
}
108100

109101
@Override
110102
public void start() {
103+
if (isStarted()) {
104+
return;
105+
}
106+
111107
super.start();
112108
formatter.setContext(getContext());
113109
formatter.start();
@@ -154,14 +150,16 @@ private void startWrapped(Encoder<Event> wrapped) {
154150

155151
@Override
156152
public void stop() {
157-
super.stop();
158-
formatter.stop();
159-
stopWrapped(prefix);
160-
stopWrapped(suffix);
153+
if (isStarted()) {
154+
super.stop();
155+
formatter.stop();
156+
stopWrapped(prefix);
157+
stopWrapped(suffix);
158+
}
161159
}
162160

163161
private void stopWrapped(Encoder<Event> wrapped) {
164-
if (wrapped != null && !wrapped.isStarted()) {
162+
if (wrapped != null && wrapped.isStarted()) {
165163
wrapped.stop();
166164
}
167165
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package net.logstash.logback.encoder;
15+
16+
import java.io.IOException;
17+
import java.io.OutputStream;
18+
19+
/**
20+
* Interface implemented by {@link Encoder} that supports writing directly into a {@link OutputStream}
21+
* instead of returning a byte array.
22+
*/
23+
public interface StreamingEncoder<Event> {
24+
25+
public void encode(Event event, OutputStream outputStream) throws IOException;
26+
27+
}

0 commit comments

Comments
 (0)