Skip to content

Commit

Permalink
add no-copy overload of Stream.buffer to reduce allocation pressure (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
davidmoten authored Mar 13, 2024
1 parent 3be56b6 commit ff3d5de
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 20 deletions.
19 changes: 18 additions & 1 deletion kool/src/main/java/org/davidmoten/kool/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -860,7 +860,24 @@ default Stream<List<T>> buffer(int size) {
}

default Stream<List<T>> buffer(int size, int step) {
return new Buffer<T>(this, size, step);
return new Buffer<T>(this, size, step, true);
}

/**
* Buffers the stream into list chunks of given size and step. If and only if
* copy is set to false the actual buffer used internally will be emitted. This
* is a performance-oriented offering to reduce allocation pressure but has
* side-effects. You must consume the emitted list immediately because the next
* emitted buffer reuses that object.
*
* @param size buffer size
* @param step buffer step
* @param copy if false then the internal buffer will be emitted (but must be
* consumed immediately)
* @return stream of lists
*/
default Stream<List<T>> buffer(int size, int step, boolean copy) {
return new Buffer<T>(this, size, step, copy);
}

default Stream<T> skip(long size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import org.davidmoten.kool.Stream;
import org.davidmoten.kool.StreamIterator;
import org.davidmoten.kool.internal.util.RingBuffer;

import com.github.davidmoten.guavamini.Preconditions;

Expand All @@ -14,20 +15,23 @@ public class Buffer<T> implements Stream<List<T>> {
private final Stream<T> stream;
private final int size;
private final int step;
private final boolean copy;

public Buffer(Stream<T> stream, int size, int step) {
public Buffer(Stream<T> stream, int size, int step, boolean copy) {
Preconditions.checkArgument(step > 0, "step must be greater than 0");
this.stream = stream;
this.size = size;
this.step = step;
this.copy = copy;
}

@Override
public StreamIterator<List<T>> iterator() {
return new StreamIterator<List<T>>() {

StreamIterator<T> it = stream.iteratorNullChecked();
List<T> buffer = new ArrayList<>(size);
RingBuffer<T> buffer = new RingBuffer<>(size);
boolean applyStep = false;

@Override
public boolean hasNext() {
Expand All @@ -41,10 +45,12 @@ public List<T> next() {
if (buffer.isEmpty()) {
throw new NoSuchElementException();
} else {
List<T> list = buffer;
buffer = new ArrayList<>(
buffer.subList(Math.min(step, buffer.size()), buffer.size()));
return list;
applyStep = true;
if (copy) {
return new ArrayList<>(buffer);
} else {
return buffer;
}
}
}

Expand All @@ -54,6 +60,13 @@ public void dispose() {
}

private void loadNext() {
if (applyStep) {
int n = Math.min(step, buffer.size());
for (int i = 0; i < n; i++) {
buffer.poll();
}
applyStep = false;
}
while (buffer.size() < size && it.hasNext()) {
buffer.add(it.nextNullChecked());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public boolean hasNext() {
@Override
public T next() {
if (buffer.isEmpty()) {
buffer.add(it.next());
buffer.offer(it.next());
}
return buffer.poll();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void dispose() {

private void loadNext() {
while (buffer.size() < size + 1 && it.hasNext()) {
buffer.add(it.next());
buffer.offer(it.next());
}
}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package org.davidmoten.kool.internal.util;

import java.util.AbstractList;
import java.util.RandomAccess;
import java.util.function.Consumer;

import org.davidmoten.kool.exceptions.BufferOverflowException;

public final class RingBuffer<T> {
public final class RingBuffer<T> extends AbstractList<T> implements RandomAccess {

private T[] buffer;
private int start;
Expand All @@ -17,7 +21,7 @@ public RingBuffer(int maxSize) {
this.finish = 0;
}

public RingBuffer<T> add(T value) {
public RingBuffer<T> offer(T value) {
buffer[finish] = value;
finish = (finish + 1) % buffer.length;
if (finish == start) {
Expand All @@ -36,6 +40,7 @@ public T poll() {
}
}

@Override
public int size() {
if (finish < start) {
return finish + buffer.length - start;
Expand All @@ -59,5 +64,37 @@ public RingBuffer<T> replay(int count) {
public boolean isEmpty() {
return start == finish;
}

private int arrayIndex(int index) {
if (index < 0 || index >= size()) {
throw new ArrayIndexOutOfBoundsException();
}
return (start + index) % buffer.length;
}

public T get(int index) {
return buffer[arrayIndex(index)];
}

@Override
public boolean add(T e) {
offer(e);
return true;
}

@Override
public T set(int index, T element) {
int i = arrayIndex(index);
T v = buffer[i];
buffer[i] = element;
return v;
}

@Override
public void forEach(Consumer<? super T> action) {
int size = size();
for (int i = 0; i < size; i++) {
action.accept(get(i));
}
}
}
28 changes: 27 additions & 1 deletion kool/src/test/java/org/davidmoten/kool/StreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,32 @@ public void testBufferBadStepThrows() {
public void testBufferDispose() {
checkTrue(b -> Stream.of(1, 2).doOnDispose(() -> b.set(true)).buffer(1).first().get());
}

@Test
public void testBufferNoCopyUsedCorrectly() {
String s = Stream //
.of(1, 2, 3) //
.buffer(2, 1, false) //
.map(x -> x.toString()) //
.join(",") //
.get();
assertEquals("[1, 2],[2, 3],[3]", s);
}

@Test
public void testBufferNoCopyWhenUsedIncorrectly() {
long count = Stream //
.of(1, 2, 3) //
.buffer(2, 1, false) //
// accumulate, which we should not do with no-copy buffer
.toList() //
.flatMap(list -> Stream.from(list)) //
.filter(list -> list.isEmpty()) //
.count() //
.get();
// assert that 3 empty lists returned
assertEquals(3, count);
}

@Test
public void testAfterDispose() {
Expand Down Expand Up @@ -2135,7 +2161,7 @@ public void testPublisher() {
// queue empty
assertEquals(3, p.count().get().intValue());
}

public static void main(String[] args) throws MalformedURLException {
URL url = new URL("https://doesnotexist.zz");
Stream.using(() -> url.openStream(), in -> Stream.bytes(in))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.util.Arrays;
import java.util.stream.Collectors;

import org.junit.Test;

Expand All @@ -17,7 +21,7 @@ public void test() {
assertEquals(0, r.size());
assertTrue(r.isEmpty());
assertNull(r.poll());
r.add(1).add(2);
r.offer(1).offer(2);
assertFalse(r.isEmpty());
assertEquals(2, r.size());
assertEquals(1, (int) r.poll());
Expand All @@ -31,7 +35,7 @@ public void test() {
assertEquals(1, (int) r.poll());
assertEquals(2, (int) r.poll());
assertTrue(r.isEmpty());
r.add(3);
r.offer(3);
r.replay(1);
assertEquals(2, r.size());
assertEquals(2, (int) r.poll());
Expand All @@ -41,32 +45,56 @@ public void test() {
@Test
public void testSize() {
RingBuffer<Integer> r = new RingBuffer<>(4);
r.add(1).add(2).add(3).add(4);
r.offer(1).offer(2).offer(3).offer(4);
assertEquals(4, r.size());
assertEquals(1, (int) r.poll());
assertEquals(2, (int) r.poll());
assertEquals(3, (int) r.poll());
assertEquals(4, (int) r.poll());
assertEquals(0, r.size());
r.add(5).add(6).add(7);
r.offer(5).offer(6).offer(7);
assertEquals(3, r.size());
}

@Test
public void testAllocateMoreWhenFinishBeforeStart() {
RingBuffer<Integer> r = new RingBuffer<>(MAX_SIZE);
r.add(1).poll();
r.add(2).add(3).add(4);
r.offer(1).poll();
r.offer(2).offer(3).offer(4);
r.replay(1);
assertEquals(1, (int) r.poll());
assertEquals(2, (int) r.poll());
assertEquals(3, (int) r.poll());
r.add(5).add(6).add(7).add(8).add(9).add(10).add(11).add(12);
r.offer(5).offer(6).offer(7).offer(8).offer(9).offer(10).offer(11).offer(12);
}

@Test
public void testGetByIndex() {
RingBuffer<Integer> r = new RingBuffer<>(MAX_SIZE);
r.offer(1).offer(2).offer(3).offer(4);
assertEquals(1, r.get(0).intValue());
assertEquals(2, r.get(1).intValue());
assertEquals(3, r.get(2).intValue());
assertEquals(4, r.get(3).intValue());
try {
r.get(4);
fail();
} catch (ArrayIndexOutOfBoundsException e) {
// good
}
}

@Test
public void testAsList() {
RingBuffer<Integer> r = new RingBuffer<>(MAX_SIZE);
r.offer(1).offer(2).offer(3).offer(4);
assertEquals(Arrays.asList(1, 2, 3, 4), r);
assertEquals(Arrays.asList(1, 2, 3), r.subList(0, 3).stream().collect(Collectors.toList()));
}

@Test(expected = RuntimeException.class)
public void addMoreThanMaxSize() {
RingBuffer<Integer> r = new RingBuffer<>(2);
r.add(1).add(2).add(3).add(4);
r.offer(1).offer(2).offer(3).offer(4);
}
}

0 comments on commit ff3d5de

Please sign in to comment.