Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add no-copy overload of Stream.buffer to reduce allocation pressure #100

Merged
merged 2 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion kool/src/main/java/org/davidmoten/kool/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,24 @@ default Stream<List<T>> buffer(int size) {
}

default Stream<List<T>> buffer(int size, int step) {
return new Buffer<T>(this, size, step);
return new Buffer<T>(this, size, step, true);
}

/**
* Buffers the stream into list chunks of given size and step. If and only if
* copy is set to false the actual buffer used internally will be emitted. This
* is a performance-oriented offering to reduce allocation pressure but has
* side-effects. You must consume the emitted list immediately because the next
* emitted buffer reuses that object.
*
* @param size buffer size
* @param step buffer step
* @param copy if false then the internal buffer will be emitted (but must be
* consumed immediately)
* @return stream of lists
*/
default Stream<List<T>> buffer(int size, int step, boolean copy) {
return new Buffer<T>(this, size, step, copy);
}

default Stream<T> skip(long size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import org.davidmoten.kool.Stream;
import org.davidmoten.kool.StreamIterator;
import org.davidmoten.kool.internal.util.RingBuffer;

import com.github.davidmoten.guavamini.Preconditions;

Expand All @@ -14,20 +15,23 @@ public class Buffer<T> implements Stream<List<T>> {
private final Stream<T> stream;
private final int size;
private final int step;
private final boolean copy;

public Buffer(Stream<T> stream, int size, int step) {
public Buffer(Stream<T> stream, int size, int step, boolean copy) {
Preconditions.checkArgument(step > 0, "step must be greater than 0");
this.stream = stream;
this.size = size;
this.step = step;
this.copy = copy;
}

@Override
public StreamIterator<List<T>> iterator() {
return new StreamIterator<List<T>>() {

StreamIterator<T> it = stream.iteratorNullChecked();
List<T> buffer = new ArrayList<>(size);
RingBuffer<T> buffer = new RingBuffer<>(size);
boolean applyStep = false;

@Override
public boolean hasNext() {
Expand All @@ -41,10 +45,12 @@ public List<T> next() {
if (buffer.isEmpty()) {
throw new NoSuchElementException();
} else {
List<T> list = buffer;
buffer = new ArrayList<>(
buffer.subList(Math.min(step, buffer.size()), buffer.size()));
return list;
applyStep = true;
if (copy) {
return new ArrayList<>(buffer);
} else {
return buffer;
}
}
}

Expand All @@ -54,6 +60,13 @@ public void dispose() {
}

private void loadNext() {
if (applyStep) {
int n = Math.min(step, buffer.size());
for (int i = 0; i < n; i++) {
buffer.poll();
}
applyStep = false;
}
while (buffer.size() < size && it.hasNext()) {
buffer.add(it.nextNullChecked());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public boolean hasNext() {
@Override
public T next() {
if (buffer.isEmpty()) {
buffer.add(it.next());
buffer.offer(it.next());
}
return buffer.poll();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void dispose() {

private void loadNext() {
while (buffer.size() < size + 1 && it.hasNext()) {
buffer.add(it.next());
buffer.offer(it.next());
}
}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package org.davidmoten.kool.internal.util;

import java.util.AbstractList;
import java.util.RandomAccess;
import java.util.function.Consumer;

import org.davidmoten.kool.exceptions.BufferOverflowException;

public final class RingBuffer<T> {
public final class RingBuffer<T> extends AbstractList<T> implements RandomAccess {

private T[] buffer;
private int start;
Expand All @@ -17,7 +21,7 @@ public RingBuffer(int maxSize) {
this.finish = 0;
}

public RingBuffer<T> add(T value) {
public RingBuffer<T> offer(T value) {
buffer[finish] = value;
finish = (finish + 1) % buffer.length;
if (finish == start) {
Expand All @@ -36,6 +40,7 @@ public T poll() {
}
}

@Override
public int size() {
if (finish < start) {
return finish + buffer.length - start;
Expand All @@ -59,5 +64,37 @@ public RingBuffer<T> replay(int count) {
public boolean isEmpty() {
return start == finish;
}

private int arrayIndex(int index) {
if (index < 0 || index >= size()) {
throw new ArrayIndexOutOfBoundsException();
}
return (start + index) % buffer.length;
}

public T get(int index) {
return buffer[arrayIndex(index)];
}

@Override
public boolean add(T e) {
offer(e);
return true;
}

@Override
public T set(int index, T element) {
int i = arrayIndex(index);
T v = buffer[i];
buffer[i] = element;
return v;
}

@Override
public void forEach(Consumer<? super T> action) {
int size = size();
for (int i = 0; i < size; i++) {
action.accept(get(i));
}
}
}
28 changes: 27 additions & 1 deletion kool/src/test/java/org/davidmoten/kool/StreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,32 @@ public void testBufferBadStepThrows() {
public void testBufferDispose() {
checkTrue(b -> Stream.of(1, 2).doOnDispose(() -> b.set(true)).buffer(1).first().get());
}

@Test
public void testBufferNoCopyUsedCorrectly() {
String s = Stream //
.of(1, 2, 3) //
.buffer(2, 1, false) //
.map(x -> x.toString()) //
.join(",") //
.get();
assertEquals("[1, 2],[2, 3],[3]", s);
}

@Test
public void testBufferNoCopyWhenUsedIncorrectly() {
long count = Stream //
.of(1, 2, 3) //
.buffer(2, 1, false) //
// accumulate, which we should not do with no-copy buffer
.toList() //
.flatMap(list -> Stream.from(list)) //
.filter(list -> list.isEmpty()) //
.count() //
.get();
// assert that 3 empty lists returned
assertEquals(3, count);
}

@Test
public void testAfterDispose() {
Expand Down Expand Up @@ -2135,7 +2161,7 @@ public void testPublisher() {
// queue empty
assertEquals(3, p.count().get().intValue());
}

public static void main(String[] args) throws MalformedURLException {
URL url = new URL("https://doesnotexist.zz");
Stream.using(() -> url.openStream(), in -> Stream.bytes(in))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.util.Arrays;
import java.util.stream.Collectors;

import org.junit.Test;

Expand All @@ -17,7 +21,7 @@ public void test() {
assertEquals(0, r.size());
assertTrue(r.isEmpty());
assertNull(r.poll());
r.add(1).add(2);
r.offer(1).offer(2);
assertFalse(r.isEmpty());
assertEquals(2, r.size());
assertEquals(1, (int) r.poll());
Expand All @@ -31,7 +35,7 @@ public void test() {
assertEquals(1, (int) r.poll());
assertEquals(2, (int) r.poll());
assertTrue(r.isEmpty());
r.add(3);
r.offer(3);
r.replay(1);
assertEquals(2, r.size());
assertEquals(2, (int) r.poll());
Expand All @@ -41,32 +45,56 @@ public void test() {
@Test
public void testSize() {
RingBuffer<Integer> r = new RingBuffer<>(4);
r.add(1).add(2).add(3).add(4);
r.offer(1).offer(2).offer(3).offer(4);
assertEquals(4, r.size());
assertEquals(1, (int) r.poll());
assertEquals(2, (int) r.poll());
assertEquals(3, (int) r.poll());
assertEquals(4, (int) r.poll());
assertEquals(0, r.size());
r.add(5).add(6).add(7);
r.offer(5).offer(6).offer(7);
assertEquals(3, r.size());
}

@Test
public void testAllocateMoreWhenFinishBeforeStart() {
RingBuffer<Integer> r = new RingBuffer<>(MAX_SIZE);
r.add(1).poll();
r.add(2).add(3).add(4);
r.offer(1).poll();
r.offer(2).offer(3).offer(4);
r.replay(1);
assertEquals(1, (int) r.poll());
assertEquals(2, (int) r.poll());
assertEquals(3, (int) r.poll());
r.add(5).add(6).add(7).add(8).add(9).add(10).add(11).add(12);
r.offer(5).offer(6).offer(7).offer(8).offer(9).offer(10).offer(11).offer(12);
}

@Test
public void testGetByIndex() {
RingBuffer<Integer> r = new RingBuffer<>(MAX_SIZE);
r.offer(1).offer(2).offer(3).offer(4);
assertEquals(1, r.get(0).intValue());
assertEquals(2, r.get(1).intValue());
assertEquals(3, r.get(2).intValue());
assertEquals(4, r.get(3).intValue());
try {
r.get(4);
fail();
} catch (ArrayIndexOutOfBoundsException e) {
// good
}
}

@Test
public void testAsList() {
RingBuffer<Integer> r = new RingBuffer<>(MAX_SIZE);
r.offer(1).offer(2).offer(3).offer(4);
assertEquals(Arrays.asList(1, 2, 3, 4), r);
assertEquals(Arrays.asList(1, 2, 3), r.subList(0, 3).stream().collect(Collectors.toList()));
}

@Test(expected = RuntimeException.class)
public void addMoreThanMaxSize() {
RingBuffer<Integer> r = new RingBuffer<>(2);
r.add(1).add(2).add(3).add(4);
r.offer(1).offer(2).offer(3).offer(4);
}
}
Loading