From ff3d5decc1949e4008de05d9d01b4025d2345297 Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Thu, 14 Mar 2024 10:05:15 +1100 Subject: [PATCH] add no-copy overload of Stream.buffer to reduce allocation pressure (#100) --- .../main/java/org/davidmoten/kool/Stream.java | 19 +++++++- .../internal/operators/stream/Buffer.java | 25 ++++++++--- .../stream/ReplayableStreamIterator.java | 2 +- .../internal/operators/stream/SkipLast.java | 2 +- .../kool/internal/util/RingBuffer.java | 41 ++++++++++++++++- .../java/org/davidmoten/kool/StreamTest.java | 28 +++++++++++- .../kool/internal/util/RingBufferTest.java | 44 +++++++++++++++---- 7 files changed, 141 insertions(+), 20 deletions(-) diff --git a/kool/src/main/java/org/davidmoten/kool/Stream.java b/kool/src/main/java/org/davidmoten/kool/Stream.java index b13d6e4..4906b13 100644 --- a/kool/src/main/java/org/davidmoten/kool/Stream.java +++ b/kool/src/main/java/org/davidmoten/kool/Stream.java @@ -860,7 +860,24 @@ default Stream> buffer(int size) { } default Stream> buffer(int size, int step) { - return new Buffer(this, size, step); + return new Buffer(this, size, step, true); + } + + /** + * Buffers the stream into list chunks of given size and step. If and only if + * copy is set to false the actual buffer used internally will be emitted. This + * is a performance-oriented offering to reduce allocation pressure but has + * side-effects. You must consume the emitted list immediately because the next + * emitted buffer reuses that object. + * + * @param size buffer size + * @param step buffer step + * @param copy if false then the internal buffer will be emitted (but must be + * consumed immediately) + * @return stream of lists + */ + default Stream> buffer(int size, int step, boolean copy) { + return new Buffer(this, size, step, copy); } default Stream skip(long size) { diff --git a/kool/src/main/java/org/davidmoten/kool/internal/operators/stream/Buffer.java b/kool/src/main/java/org/davidmoten/kool/internal/operators/stream/Buffer.java index 918f2f0..3dcf622 100644 --- a/kool/src/main/java/org/davidmoten/kool/internal/operators/stream/Buffer.java +++ b/kool/src/main/java/org/davidmoten/kool/internal/operators/stream/Buffer.java @@ -6,6 +6,7 @@ import org.davidmoten.kool.Stream; import org.davidmoten.kool.StreamIterator; +import org.davidmoten.kool.internal.util.RingBuffer; import com.github.davidmoten.guavamini.Preconditions; @@ -14,12 +15,14 @@ public class Buffer implements Stream> { private final Stream stream; private final int size; private final int step; + private final boolean copy; - public Buffer(Stream stream, int size, int step) { + public Buffer(Stream stream, int size, int step, boolean copy) { Preconditions.checkArgument(step > 0, "step must be greater than 0"); this.stream = stream; this.size = size; this.step = step; + this.copy = copy; } @Override @@ -27,7 +30,8 @@ public StreamIterator> iterator() { return new StreamIterator>() { StreamIterator it = stream.iteratorNullChecked(); - List buffer = new ArrayList<>(size); + RingBuffer buffer = new RingBuffer<>(size); + boolean applyStep = false; @Override public boolean hasNext() { @@ -41,10 +45,12 @@ public List next() { if (buffer.isEmpty()) { throw new NoSuchElementException(); } else { - List list = buffer; - buffer = new ArrayList<>( - buffer.subList(Math.min(step, buffer.size()), buffer.size())); - return list; + applyStep = true; + if (copy) { + return new ArrayList<>(buffer); + } else { + return buffer; + } } } @@ -54,6 +60,13 @@ public void dispose() { } private void loadNext() { + if (applyStep) { + int n = Math.min(step, buffer.size()); + for (int i = 0; i < n; i++) { + buffer.poll(); + } + applyStep = false; + } while (buffer.size() < size && it.hasNext()) { buffer.add(it.nextNullChecked()); } diff --git a/kool/src/main/java/org/davidmoten/kool/internal/operators/stream/ReplayableStreamIterator.java b/kool/src/main/java/org/davidmoten/kool/internal/operators/stream/ReplayableStreamIterator.java index c098d1c..cff5e52 100644 --- a/kool/src/main/java/org/davidmoten/kool/internal/operators/stream/ReplayableStreamIterator.java +++ b/kool/src/main/java/org/davidmoten/kool/internal/operators/stream/ReplayableStreamIterator.java @@ -21,7 +21,7 @@ public boolean hasNext() { @Override public T next() { if (buffer.isEmpty()) { - buffer.add(it.next()); + buffer.offer(it.next()); } return buffer.poll(); } diff --git a/kool/src/main/java/org/davidmoten/kool/internal/operators/stream/SkipLast.java b/kool/src/main/java/org/davidmoten/kool/internal/operators/stream/SkipLast.java index 4092d66..e382d0c 100644 --- a/kool/src/main/java/org/davidmoten/kool/internal/operators/stream/SkipLast.java +++ b/kool/src/main/java/org/davidmoten/kool/internal/operators/stream/SkipLast.java @@ -45,7 +45,7 @@ public void dispose() { private void loadNext() { while (buffer.size() < size + 1 && it.hasNext()) { - buffer.add(it.next()); + buffer.offer(it.next()); } } }; diff --git a/kool/src/main/java/org/davidmoten/kool/internal/util/RingBuffer.java b/kool/src/main/java/org/davidmoten/kool/internal/util/RingBuffer.java index 5ed63b7..5e541f2 100644 --- a/kool/src/main/java/org/davidmoten/kool/internal/util/RingBuffer.java +++ b/kool/src/main/java/org/davidmoten/kool/internal/util/RingBuffer.java @@ -1,8 +1,12 @@ package org.davidmoten.kool.internal.util; +import java.util.AbstractList; +import java.util.RandomAccess; +import java.util.function.Consumer; + import org.davidmoten.kool.exceptions.BufferOverflowException; -public final class RingBuffer { +public final class RingBuffer extends AbstractList implements RandomAccess { private T[] buffer; private int start; @@ -17,7 +21,7 @@ public RingBuffer(int maxSize) { this.finish = 0; } - public RingBuffer add(T value) { + public RingBuffer offer(T value) { buffer[finish] = value; finish = (finish + 1) % buffer.length; if (finish == start) { @@ -36,6 +40,7 @@ public T poll() { } } + @Override public int size() { if (finish < start) { return finish + buffer.length - start; @@ -59,5 +64,37 @@ public RingBuffer replay(int count) { public boolean isEmpty() { return start == finish; } + + private int arrayIndex(int index) { + if (index < 0 || index >= size()) { + throw new ArrayIndexOutOfBoundsException(); + } + return (start + index) % buffer.length; + } + + public T get(int index) { + return buffer[arrayIndex(index)]; + } + + @Override + public boolean add(T e) { + offer(e); + return true; + } + + @Override + public T set(int index, T element) { + int i = arrayIndex(index); + T v = buffer[i]; + buffer[i] = element; + return v; + } + @Override + public void forEach(Consumer action) { + int size = size(); + for (int i = 0; i < size; i++) { + action.accept(get(i)); + } + } } diff --git a/kool/src/test/java/org/davidmoten/kool/StreamTest.java b/kool/src/test/java/org/davidmoten/kool/StreamTest.java index b1e88d1..4114e01 100644 --- a/kool/src/test/java/org/davidmoten/kool/StreamTest.java +++ b/kool/src/test/java/org/davidmoten/kool/StreamTest.java @@ -572,6 +572,32 @@ public void testBufferBadStepThrows() { public void testBufferDispose() { checkTrue(b -> Stream.of(1, 2).doOnDispose(() -> b.set(true)).buffer(1).first().get()); } + + @Test + public void testBufferNoCopyUsedCorrectly() { + String s = Stream // + .of(1, 2, 3) // + .buffer(2, 1, false) // + .map(x -> x.toString()) // + .join(",") // + .get(); + assertEquals("[1, 2],[2, 3],[3]", s); + } + + @Test + public void testBufferNoCopyWhenUsedIncorrectly() { + long count = Stream // + .of(1, 2, 3) // + .buffer(2, 1, false) // + // accumulate, which we should not do with no-copy buffer + .toList() // + .flatMap(list -> Stream.from(list)) // + .filter(list -> list.isEmpty()) // + .count() // + .get(); + // assert that 3 empty lists returned + assertEquals(3, count); + } @Test public void testAfterDispose() { @@ -2135,7 +2161,7 @@ public void testPublisher() { // queue empty assertEquals(3, p.count().get().intValue()); } - + public static void main(String[] args) throws MalformedURLException { URL url = new URL("https://doesnotexist.zz"); Stream.using(() -> url.openStream(), in -> Stream.bytes(in)) diff --git a/kool/src/test/java/org/davidmoten/kool/internal/util/RingBufferTest.java b/kool/src/test/java/org/davidmoten/kool/internal/util/RingBufferTest.java index 0559bd4..e14f8b4 100644 --- a/kool/src/test/java/org/davidmoten/kool/internal/util/RingBufferTest.java +++ b/kool/src/test/java/org/davidmoten/kool/internal/util/RingBufferTest.java @@ -4,6 +4,10 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.Arrays; +import java.util.stream.Collectors; import org.junit.Test; @@ -17,7 +21,7 @@ public void test() { assertEquals(0, r.size()); assertTrue(r.isEmpty()); assertNull(r.poll()); - r.add(1).add(2); + r.offer(1).offer(2); assertFalse(r.isEmpty()); assertEquals(2, r.size()); assertEquals(1, (int) r.poll()); @@ -31,7 +35,7 @@ public void test() { assertEquals(1, (int) r.poll()); assertEquals(2, (int) r.poll()); assertTrue(r.isEmpty()); - r.add(3); + r.offer(3); r.replay(1); assertEquals(2, r.size()); assertEquals(2, (int) r.poll()); @@ -41,32 +45,56 @@ public void test() { @Test public void testSize() { RingBuffer r = new RingBuffer<>(4); - r.add(1).add(2).add(3).add(4); + r.offer(1).offer(2).offer(3).offer(4); assertEquals(4, r.size()); assertEquals(1, (int) r.poll()); assertEquals(2, (int) r.poll()); assertEquals(3, (int) r.poll()); assertEquals(4, (int) r.poll()); assertEquals(0, r.size()); - r.add(5).add(6).add(7); + r.offer(5).offer(6).offer(7); assertEquals(3, r.size()); } @Test public void testAllocateMoreWhenFinishBeforeStart() { RingBuffer r = new RingBuffer<>(MAX_SIZE); - r.add(1).poll(); - r.add(2).add(3).add(4); + r.offer(1).poll(); + r.offer(2).offer(3).offer(4); r.replay(1); assertEquals(1, (int) r.poll()); assertEquals(2, (int) r.poll()); assertEquals(3, (int) r.poll()); - r.add(5).add(6).add(7).add(8).add(9).add(10).add(11).add(12); + r.offer(5).offer(6).offer(7).offer(8).offer(9).offer(10).offer(11).offer(12); + } + + @Test + public void testGetByIndex() { + RingBuffer r = new RingBuffer<>(MAX_SIZE); + r.offer(1).offer(2).offer(3).offer(4); + assertEquals(1, r.get(0).intValue()); + assertEquals(2, r.get(1).intValue()); + assertEquals(3, r.get(2).intValue()); + assertEquals(4, r.get(3).intValue()); + try { + r.get(4); + fail(); + } catch (ArrayIndexOutOfBoundsException e) { + // good + } + } + + @Test + public void testAsList() { + RingBuffer r = new RingBuffer<>(MAX_SIZE); + r.offer(1).offer(2).offer(3).offer(4); + assertEquals(Arrays.asList(1, 2, 3, 4), r); + assertEquals(Arrays.asList(1, 2, 3), r.subList(0, 3).stream().collect(Collectors.toList())); } @Test(expected = RuntimeException.class) public void addMoreThanMaxSize() { RingBuffer r = new RingBuffer<>(2); - r.add(1).add(2).add(3).add(4); + r.offer(1).offer(2).offer(3).offer(4); } }