Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segmented repartition #985

Merged
merged 10 commits into from
Nov 23, 2017
Merged

Conversation

mpilquist
Copy link
Member

The PR builds off of #950 and implements repartition such that each segment in the underlying stream is scanned instead of unconsing single elements.

Note I had to change the repartition function to return a Chunk instead of a Segment but this is okay -- repartitioning a single element should not require an expansion in to an infinite segment.

@soumyarupsarkar -- please take a look. This uses the double layer scanning algorithm I attempted to describe in one of the reviews of #950.

@soumyarupsarkar
Copy link
Contributor

I see, this makes sense. Thanks!

@soumyarupsarkar
Copy link
Contributor

soumyarupsarkar commented Nov 23, 2017

You may want to add this as a test case with an infinite segment in the stream: Stream.segment(Segment.from(0).map(_.toInt)).repartition(x => Chunk(x, x)).take(4).toVector shouldBe Vector(0, 1, 3, 6)

@mpilquist
Copy link
Member Author

@soumyarupsarkar Thanks for spotting that. It turns out that this is a limitation when working with infinite streams and pipelines that cause less than 1024 elements to be emitted. For example, here's a program that also hangs:

Stream.segment(Segment.from(0L)).filter(_ < 2).take(2).toVector

Conceptually, the take(2) is satisfied by the first two elements from the infinite segment, and so we should be able to terminate. However, the stream interpreter will wait for 1024 elements to be collected as a result of the filter before moving on. To fix, I think we'd need to change Segment#splitAt to take a maxSteps argument or something, which tracks the amount of work the segment is doing in creating the 1024 elements.

@mpilquist mpilquist merged commit 31c360d into typelevel:series/0.10 Nov 23, 2017
@mpilquist mpilquist deleted the segmented-repartition branch November 23, 2017 14:03
@mpilquist
Copy link
Member Author

@soumyarupsarkar On second thought, the following program will never emit any elements:

Stream.segment(Segment.from(0).map(_.toInt)).repartition(x => Chunk(x + 2)).take(1).toVector

Since each input element yields a single chunk, the carry value ends up representing a fold over all input elements and we never output anything. So while the example I gave earlier is in fact an issue with infinite streams, the repartition hang isn't an example of that issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants