Skip to content

Commit 7e2fd07

Browse files
committed
Reduce GC pressure by writing directly into the outpustream instead of returning a byte array
Introduce a new (internal) StreamingEncoder interface to be implemented by Encoders that supports writing directly into the output stream instead of returning their results as a byte array. Update both the AbstractLogstashTcpSocketAppender and the CompositeJsonEncoder to support this new interface. This should hoppefully reduce the amount of short-lived byte arrays created for each log event. See logfellow#461 for more information.
1 parent 0c4f058 commit 7e2fd07

File tree

4 files changed

+259
-113
lines changed

4 files changed

+259
-113
lines changed

src/main/java/net/logstash/logback/appender/AbstractLogstashTcpSocketAppender.java

+23-6
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@
4848
import net.logstash.logback.appender.destination.PreferPrimaryDestinationConnectionStrategy;
4949
import net.logstash.logback.appender.listener.TcpAppenderListener;
5050
import net.logstash.logback.encoder.SeparatorParser;
51+
import net.logstash.logback.encoder.StreamingEncoder;
52+
53+
import com.lmax.disruptor.EventHandler;
54+
import com.lmax.disruptor.LifecycleAware;
55+
import com.lmax.disruptor.RingBuffer;
5156

5257
import ch.qos.logback.core.encoder.Encoder;
5358
import ch.qos.logback.core.joran.spi.DefaultClass;
@@ -59,10 +64,6 @@
5964
import ch.qos.logback.core.util.CloseUtil;
6065
import ch.qos.logback.core.util.Duration;
6166

62-
import com.lmax.disruptor.EventHandler;
63-
import com.lmax.disruptor.LifecycleAware;
64-
import com.lmax.disruptor.RingBuffer;
65-
6667
/**
6768
* An {@link AsyncDisruptorAppender} appender that writes
6869
* events to a TCP {@link Socket} outputStream.
@@ -594,8 +595,9 @@ private void writeEvent(Socket socket, OutputStream outputStream, LogEvent<Event
594595
* This is a standard (non-keepAlive) event.
595596
* Therefore, we need to send the event.
596597
*/
597-
outputStream.write(encoder.encode(logEvent.event));
598-
} else if (hasKeepAliveDurationElapsed(lastSendEndNanoTime, startNanoTime)){
598+
encode(logEvent.event, outputStream);
599+
}
600+
else if (hasKeepAliveDurationElapsed(lastSendEndNanoTime, startNanoTime)){
599601
/*
600602
* This is a keep alive event, and the keepAliveDuration has passed,
601603
* Therefore, we need to send the keepAliveMessage.
@@ -622,6 +624,21 @@ private void writeEvent(Socket socket, OutputStream outputStream, LogEvent<Event
622624
}
623625
}
624626

627+
628+
@SuppressWarnings("unchecked")
629+
private void encode(Event event, OutputStream outputStream) throws IOException {
630+
if (encoder instanceof StreamingEncoder) {
631+
((StreamingEncoder<Event>)encoder).encode(event, outputStream);
632+
}
633+
else {
634+
byte[] data = encoder.encode(event);
635+
if (data!=null) {
636+
outputStream.write(data);
637+
}
638+
}
639+
}
640+
641+
625642
private boolean hasKeepAliveDurationElapsed(long lastSentNanoTime, long currentNanoTime) {
626643
return isKeepAliveEnabled()
627644
&& lastSentNanoTime + TimeUnit.MILLISECONDS.toNanos(keepAliveDuration.getMilliseconds()) < currentNanoTime;

src/main/java/net/logstash/logback/encoder/CompositeJsonEncoder.java

+39-41
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,22 @@
1515

1616
import java.io.ByteArrayOutputStream;
1717
import java.io.IOException;
18+
import java.io.OutputStream;
1819
import java.nio.charset.Charset;
1920

2021
import net.logstash.logback.composite.CompositeJsonFormatter;
2122
import net.logstash.logback.composite.JsonProviders;
2223
import net.logstash.logback.decorate.JsonFactoryDecorator;
2324
import net.logstash.logback.decorate.JsonGeneratorDecorator;
25+
2426
import ch.qos.logback.core.encoder.Encoder;
2527
import ch.qos.logback.core.encoder.EncoderBase;
2628
import ch.qos.logback.core.encoder.LayoutWrappingEncoder;
2729
import ch.qos.logback.core.pattern.PatternLayoutBase;
2830
import ch.qos.logback.core.spi.DeferredProcessingAware;
2931

3032
public abstract class CompositeJsonEncoder<Event extends DeferredProcessingAware>
31-
extends EncoderBase<Event> {
33+
extends EncoderBase<Event> implements StreamingEncoder<Event> {
3234

3335
private static final byte[] EMPTY_BYTES = new byte[0];
3436

@@ -60,51 +62,45 @@ public CompositeJsonEncoder() {
6062
protected abstract CompositeJsonFormatter<Event> createFormatter();
6163

6264
@Override
63-
public byte[] encode(Event event) {
64-
byte[] prefixBytes = doEncodeWrappedToBytes(prefix, event);
65-
byte[] suffixBytes = doEncodeWrappedToBytes(suffix, event);
65+
public void encode(Event event, OutputStream outputStream) throws IOException {
66+
if (!isStarted()) {
67+
throw new IllegalStateException("Encoder is not started.");
68+
}
6669

67-
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(
68-
minBufferSize
69-
+ (prefixBytes == null ? 0 : prefixBytes.length)
70-
+ (suffixBytes == null ? 0 : suffixBytes.length)
71-
+ lineSeparatorBytes.length);
72-
try {
73-
if (prefixBytes != null) {
74-
outputStream.write(prefixBytes);
75-
}
76-
77-
formatter.writeEventToOutputStream(event, outputStream);
78-
79-
if (suffixBytes != null) {
80-
outputStream.write(suffixBytes);
81-
}
82-
83-
outputStream.write(lineSeparatorBytes);
84-
70+
encode(prefix, event, outputStream);
71+
formatter.writeEventToOutputStream(event, outputStream);
72+
encode(suffix, event, outputStream);
73+
74+
outputStream.write(lineSeparatorBytes);
75+
}
76+
77+
@Override
78+
public byte[] encode(Event event) {
79+
try(ByteArrayOutputStream outputStream = new ByteArrayOutputStream(getMinBufferSize())) {
80+
encode(event, outputStream);
8581
return outputStream.toByteArray();
86-
} catch (IOException e) {
87-
addWarn("Error encountered while encoding log event. "
88-
+ "Event: " + event, e);
89-
return EMPTY_BYTES;
90-
} finally {
91-
try {
92-
outputStream.close();
93-
} catch (IOException e) {
94-
throw new RuntimeException(e);
95-
}
9682
}
83+
catch (IOException e) {
84+
addWarn("Error encountered while encoding log event. Event: " + event, e);
85+
return EMPTY_BYTES;
86+
}
9787
}
9888

99-
private byte[] doEncodeWrappedToBytes(Encoder<Event> wrapped, Event event) {
100-
if (wrapped != null) {
101-
return wrapped.encode(event);
89+
private void encode(Encoder<Event> encoder, Event event, OutputStream outputStream) throws IOException {
90+
if (encoder!=null) {
91+
byte[] data = encoder.encode(event);
92+
if (data!=null) {
93+
outputStream.write(data);
94+
}
10295
}
103-
return EMPTY_BYTES;
10496
}
10597

10698
@Override
10799
public void start() {
100+
if (isStarted()) {
101+
return;
102+
}
103+
108104
super.start();
109105
formatter.setContext(getContext());
110106
formatter.start();
@@ -151,14 +147,16 @@ private void startWrapped(Encoder<Event> wrapped) {
151147

152148
@Override
153149
public void stop() {
154-
super.stop();
155-
formatter.stop();
156-
stopWrapped(prefix);
157-
stopWrapped(suffix);
150+
if (isStarted()) {
151+
super.stop();
152+
formatter.stop();
153+
stopWrapped(prefix);
154+
stopWrapped(suffix);
155+
}
158156
}
159157

160158
private void stopWrapped(Encoder<Event> wrapped) {
161-
if (wrapped != null && !wrapped.isStarted()) {
159+
if (wrapped != null && wrapped.isStarted()) {
162160
wrapped.stop();
163161
}
164162
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/**
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package net.logstash.logback.encoder;
15+
16+
import java.io.IOException;
17+
import java.io.OutputStream;
18+
19+
/**
20+
* Interface implemented by {@link Encoder} that supports writing directly into a {@link OutputStream}
21+
* instead of returning a byte array.
22+
*/
23+
public interface StreamingEncoder<Event> {
24+
25+
public void encode(Event event, OutputStream outputStream) throws IOException;
26+
27+
}

0 commit comments

Comments
 (0)