-
Notifications
You must be signed in to change notification settings - Fork 613
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Segmented repartition #985
Segmented repartition #985
Conversation
I see, this makes sense. Thanks! |
You may want to add this as a test case with an infinite segment in the stream: |
@soumyarupsarkar Thanks for spotting that. It turns out that this is a limitation when working with infinite streams and pipelines that cause less than 1024 elements to be emitted. For example, here's a program that also hangs: Stream.segment(Segment.from(0L)).filter(_ < 2).take(2).toVector Conceptually, the |
@soumyarupsarkar On second thought, the following program will never emit any elements: Stream.segment(Segment.from(0).map(_.toInt)).repartition(x => Chunk(x + 2)).take(1).toVector Since each input element yields a single chunk, the carry value ends up representing a fold over all input elements and we never output anything. So while the example I gave earlier is in fact an issue with infinite streams, the repartition hang isn't an example of that issue. |
The PR builds off of #950 and implements
repartition
such that each segment in the underlying stream is scanned instead of unconsing single elements.Note I had to change the repartition function to return a
Chunk
instead of aSegment
but this is okay -- repartitioning a single element should not require an expansion in to an infinite segment.@soumyarupsarkar -- please take a look. This uses the double layer scanning algorithm I attempted to describe in one of the reviews of #950.