From 27454d1d8e88bb3443e5b917564cb1a31fe3475f Mon Sep 17 00:00:00 2001 From: Soumyarup Sarkar Date: Sat, 14 Oct 2017 21:11:05 -0700 Subject: [PATCH 1/6] Reintroduce repartition --- core/jvm/src/test/scala/fs2/StreamSpec.scala | 11 +++++++ core/shared/src/main/scala/fs2/Stream.scala | 30 ++++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/core/jvm/src/test/scala/fs2/StreamSpec.scala b/core/jvm/src/test/scala/fs2/StreamSpec.scala index 68a68df18a..ec9e510a63 100644 --- a/core/jvm/src/test/scala/fs2/StreamSpec.scala +++ b/core/jvm/src/test/scala/fs2/StreamSpec.scala @@ -100,6 +100,17 @@ class StreamSpec extends Fs2Spec with Inside { IndexedSeq.range(0, 100) } + "repartition" in { + Stream("Lore", "m ip", "sum dolo", "r sit amet").repartition(_.split(" ").toIndexedSeq).toList == + List("Lorem", "ipsum", "dolor", "sit", "amet") && + Stream("hel", "l", "o Wor", "ld").repartition(_.grouped(2).toVector).toList == + List("he", "ll", "o ", "Wo", "rl", "d") && + Stream(1, 2, 3, 4, 5).repartition(i => Vector(i, i)).toList == + List(1, 3, 6, 10, 15, 15) && + (Stream(): Stream[Nothing, String]).repartition(_ => Vector()).toList == List() && + Stream("hello").repartition(_ => Vector()).toList == List() + } + "translate" in forAll { (s: PureStream[Int]) => runLog(s.get.flatMap(i => Stream.eval(IO.pure(i))).translate(cats.arrow.FunctionK.id[IO])) shouldBe runLog(s.get) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 41e69a0036..54d93b89d4 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -1983,6 +1983,36 @@ object Stream { def reduceSemigroup(implicit S: Semigroup[O]): Stream[F, O] = self.reduce(S.combine(_, _)) + /** + * Repartitions the input with the function `p`. On each step `p` is applied + * to the input and all elements but the last of the resulting sequence + * are emitted. The last element is then appended to the next input using the + * Semigroup `S`. + * + * @example {{{ + * scala> import cats.implicits._ + * scala> Stream("Hel", "l", "o Wor", "ld").repartition(_.split(" ").toIndexedSeq).toList + * res0: List[String] = List(Hello, World) + * }}} + */ + def repartition(p: O => IndexedSeq[O])(implicit S: Semigroup[O]): Stream[F,O] = { + def go(carry: Option[O], s: Stream[F,O]): Pull[F,O,Unit] = { + s.pull.uncons1.flatMap { + case Some((hd, tl)) => + val next = carry.fold(hd)(c => S.combine(c, hd)) + val parts = p(next) + parts.size match { + case 0 => go(None, tl) + case 1 => go(Some(parts.head), tl) + case _ => Pull.output(Chunk.indexedSeq(parts.init)) >> go(Some(parts.last), tl) + } + case None => + carry.fold[Pull[F,O,Unit]](Pull.done)(c => Pull.output1(c)) + } + } + go(None, self).stream + } + /** * Repeatedly invokes `using`, running the resultant `Pull` each time, halting when a pull * returns `None` instead of `Some(nextStream)`. From ddbb96ba510e401e6e56f820c579b5f78aee2bcf Mon Sep 17 00:00:00 2001 From: Soumyarup Sarkar Date: Sat, 21 Oct 2017 13:24:34 -0700 Subject: [PATCH 2/6] Prefer uncons in repartition --- core/jvm/src/test/scala/fs2/StreamSpec.scala | 10 ++++---- core/shared/src/main/scala/fs2/Stream.scala | 25 ++++++++++++-------- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/core/jvm/src/test/scala/fs2/StreamSpec.scala b/core/jvm/src/test/scala/fs2/StreamSpec.scala index ec9e510a63..d8372327e0 100644 --- a/core/jvm/src/test/scala/fs2/StreamSpec.scala +++ b/core/jvm/src/test/scala/fs2/StreamSpec.scala @@ -101,14 +101,14 @@ class StreamSpec extends Fs2Spec with Inside { } "repartition" in { - Stream("Lore", "m ip", "sum dolo", "r sit amet").repartition(_.split(" ").toIndexedSeq).toList == + Stream("Lore", "m ip", "sum dolo", "r sit amet").repartition(s => Segment.array(s.split(" "))).toList == List("Lorem", "ipsum", "dolor", "sit", "amet") && - Stream("hel", "l", "o Wor", "ld").repartition(_.grouped(2).toVector).toList == + Stream("hel", "l", "o Wor", "ld").repartition(s => Segment.indexedSeq(s.grouped(2).toVector)).toList == List("he", "ll", "o ", "Wo", "rl", "d") && - Stream(1, 2, 3, 4, 5).repartition(i => Vector(i, i)).toList == + Stream(1, 2, 3, 4, 5).repartition(i => Segment.indexedSeq(Vector(i, i))).toList == List(1, 3, 6, 10, 15, 15) && - (Stream(): Stream[Nothing, String]).repartition(_ => Vector()).toList == List() && - Stream("hello").repartition(_ => Vector()).toList == List() + (Stream(): Stream[Nothing, String]).repartition(_ => Segment.empty).toList == List() && + Stream("hello").repartition(_ => Segment.empty).toList == List() } "translate" in forAll { (s: PureStream[Int]) => diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 54d93b89d4..1da9152081 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -1991,20 +1991,25 @@ object Stream { * * @example {{{ * scala> import cats.implicits._ - * scala> Stream("Hel", "l", "o Wor", "ld").repartition(_.split(" ").toIndexedSeq).toList + * scala> Stream("Hel", "l", "o Wor", "ld").repartition(s => Segment.array(s.split(" "))).toList * res0: List[String] = List(Hello, World) * }}} */ - def repartition(p: O => IndexedSeq[O])(implicit S: Semigroup[O]): Stream[F,O] = { + def repartition(p: O => Segment[O, Unit])(implicit S: Semigroup[O]): Stream[F,O] = { def go(carry: Option[O], s: Stream[F,O]): Pull[F,O,Unit] = { - s.pull.uncons1.flatMap { - case Some((hd, tl)) => - val next = carry.fold(hd)(c => S.combine(c, hd)) - val parts = p(next) - parts.size match { - case 0 => go(None, tl) - case 1 => go(Some(parts.head), tl) - case _ => Pull.output(Chunk.indexedSeq(parts.init)) >> go(Some(parts.last), tl) + s.pull.uncons.flatMap { + case Some((hds, tl)) => + S.combineAllOption(hds.toVector) match { + case Some(hd) => + val next = carry.fold(hd)(c => S.combine(c, hd)) + val parts = p(next).toVector + parts.size match { + case 0 => go(None, tl) + case 1 => go(Some(parts.head), tl) + case _ => Pull.output(Chunk.indexedSeq(parts.init)) >> go(Some(parts.last), tl) + } + case None => + go(None, tl) } case None => carry.fold[Pull[F,O,Unit]](Pull.done)(c => Pull.output1(c)) From fd23289c4116a26264d4f10f6ca825976b2539a0 Mon Sep 17 00:00:00 2001 From: Soumyarup Sarkar Date: Thu, 26 Oct 2017 01:43:16 -0700 Subject: [PATCH 3/6] Replace the outer .toVector with .uncons1 in repartition --- core/jvm/src/test/scala/fs2/StreamSpec.scala | 16 ++++++++-------- core/shared/src/main/scala/fs2/Stream.scala | 13 +++++++------ 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/core/jvm/src/test/scala/fs2/StreamSpec.scala b/core/jvm/src/test/scala/fs2/StreamSpec.scala index d8372327e0..30484282c7 100644 --- a/core/jvm/src/test/scala/fs2/StreamSpec.scala +++ b/core/jvm/src/test/scala/fs2/StreamSpec.scala @@ -101,14 +101,14 @@ class StreamSpec extends Fs2Spec with Inside { } "repartition" in { - Stream("Lore", "m ip", "sum dolo", "r sit amet").repartition(s => Segment.array(s.split(" "))).toList == - List("Lorem", "ipsum", "dolor", "sit", "amet") && - Stream("hel", "l", "o Wor", "ld").repartition(s => Segment.indexedSeq(s.grouped(2).toVector)).toList == - List("he", "ll", "o ", "Wo", "rl", "d") && - Stream(1, 2, 3, 4, 5).repartition(i => Segment.indexedSeq(Vector(i, i))).toList == - List(1, 3, 6, 10, 15, 15) && - (Stream(): Stream[Nothing, String]).repartition(_ => Segment.empty).toList == List() && - Stream("hello").repartition(_ => Segment.empty).toList == List() + Stream("Lore", "m ip", "sum dolo", "r sit amet").repartition(s => Segment.array(s.split(" "))).toList shouldBe + List("Lorem", "ipsum", "dolor", "sit", "amet") + Stream("hel", "l", "o Wor", "ld").repartition(s => Segment.indexedSeq(s.grouped(2).toVector)).toList shouldBe + List("he", "ll", "o ", "Wo", "rl", "d") + Stream(1, 2, 3, 4, 5).repartition(i => Segment.indexedSeq(Vector(i, i))).toList shouldBe + List(1, 3, 6, 10, 15, 15) + (Stream(): Stream[Nothing, String]).repartition(_ => Segment.empty).toList shouldBe List() + Stream("hello").repartition(_ => Segment.empty).toList shouldBe List() } "translate" in forAll { (s: PureStream[Int]) => diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 1da9152081..2537d8ed8f 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -1998,18 +1998,19 @@ object Stream { def repartition(p: O => Segment[O, Unit])(implicit S: Semigroup[O]): Stream[F,O] = { def go(carry: Option[O], s: Stream[F,O]): Pull[F,O,Unit] = { s.pull.uncons.flatMap { - case Some((hds, tl)) => - S.combineAllOption(hds.toVector) match { - case Some(hd) => - val next = carry.fold(hd)(c => S.combine(c, hd)) + case Some((ohd, otl)) => + ohd.uncons1 match { + case Right((hhd,htl)) => + val tl = Stream.segment(htl) ++ otl + val next = carry.fold(hhd)(c => S.combine(c, hhd)) val parts = p(next).toVector parts.size match { case 0 => go(None, tl) case 1 => go(Some(parts.head), tl) case _ => Pull.output(Chunk.indexedSeq(parts.init)) >> go(Some(parts.last), tl) } - case None => - go(None, tl) + case Left(_) => + carry.fold[Pull[F,O,Unit]](Pull.done)(c => Pull.output1(c)) } case None => carry.fold[Pull[F,O,Unit]](Pull.done)(c => Pull.output1(c)) From dd031b3c42a06692b11536006c19ae4c85a37bbd Mon Sep 17 00:00:00 2001 From: Soumyarup Sarkar Date: Sat, 28 Oct 2017 02:47:55 -0700 Subject: [PATCH 4/6] Remove toVector calls from repartition and add test case with infinite segment --- core/jvm/src/test/scala/fs2/StreamSpec.scala | 16 +++++++-- core/shared/src/main/scala/fs2/Stream.scala | 36 +++++++++----------- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/core/jvm/src/test/scala/fs2/StreamSpec.scala b/core/jvm/src/test/scala/fs2/StreamSpec.scala index 30484282c7..a5467976a2 100644 --- a/core/jvm/src/test/scala/fs2/StreamSpec.scala +++ b/core/jvm/src/test/scala/fs2/StreamSpec.scala @@ -105,10 +105,20 @@ class StreamSpec extends Fs2Spec with Inside { List("Lorem", "ipsum", "dolor", "sit", "amet") Stream("hel", "l", "o Wor", "ld").repartition(s => Segment.indexedSeq(s.grouped(2).toVector)).toList shouldBe List("he", "ll", "o ", "Wo", "rl", "d") - Stream(1, 2, 3, 4, 5).repartition(i => Segment.indexedSeq(Vector(i, i))).toList shouldBe - List(1, 3, 6, 10, 15, 15) - (Stream(): Stream[Nothing, String]).repartition(_ => Segment.empty).toList shouldBe List() + Stream.empty.covaryOutput[String].repartition(_ => Segment.empty).toList shouldBe List() Stream("hello").repartition(_ => Segment.empty).toList shouldBe List() + + def input = Stream("ab").repeat + def ones(s: String) = Segment vector s.grouped(1).toVector + input.take(2).repartition(ones).toVector shouldBe Vector("a", "b", "a", "b") + input.take(4).repartition(ones).toVector shouldBe Vector("a", "b", "a", "b", "a", "b", "a", "b") + input.repartition(ones).take(2).toVector shouldBe Vector("a", "b") + input.repartition(ones).take(4).toVector shouldBe Vector("a", "b", "a", "b") + Stream.emits(input.take(4).toVector).repartition(ones).toVector shouldBe Vector("a", "b", "a", "b", "a", "b", "a", "b") + + Stream(1, 2, 3, 4, 5).repartition(i => Segment(i, i)).toList shouldBe List(1, 3, 6, 10, 15, 15) + + Stream(1, 10, 100).repartition(i => Segment.from(i).map(_.toInt)).take(4).toList shouldBe List(1, 2, 3, 4) } "translate" in forAll { (s: PureStream[Int]) => diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 2537d8ed8f..0291d9a3a1 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -1995,26 +1995,24 @@ object Stream { * res0: List[String] = List(Hello, World) * }}} */ - def repartition(p: O => Segment[O, Unit])(implicit S: Semigroup[O]): Stream[F,O] = { - def go(carry: Option[O], s: Stream[F,O]): Pull[F,O,Unit] = { - s.pull.uncons.flatMap { - case Some((ohd, otl)) => - ohd.uncons1 match { - case Right((hhd,htl)) => - val tl = Stream.segment(htl) ++ otl - val next = carry.fold(hhd)(c => S.combine(c, hhd)) - val parts = p(next).toVector - parts.size match { - case 0 => go(None, tl) - case 1 => go(Some(parts.head), tl) - case _ => Pull.output(Chunk.indexedSeq(parts.init)) >> go(Some(parts.last), tl) + def repartition(p: O => Segment[O,Unit])(implicit S: Semigroup[O]): Stream[F,O] = { + def go(carry: Option[O], s: Stream[F,O], partitions: Segment[O,Unit] = Segment.empty): Pull[F,O,Unit] = { + partitions.uncons1.fold({ _ => + s.pull.uncons1.flatMap { + case Some((hd, tl)) => + val next = carry.fold(hd)(c => S.combine(c, hd)) + val parts = p(next).uncons1 + val parts2 = parts.flatMap(_._2.uncons1) + (parts, parts2) match { + case (Left(()), _) => go(None, tl) + case (Right((phd, ptl)), Left(())) => go(Some(phd), tl) + case (Right((phd, ptl)), Right(_)) => Pull.output1(phd) >> go(None, tl, ptl) } - case Left(_) => - carry.fold[Pull[F,O,Unit]](Pull.done)(c => Pull.output1(c)) - } - case None => - carry.fold[Pull[F,O,Unit]](Pull.done)(c => Pull.output1(c)) - } + case None => + carry.fold[Pull[F, O, Unit]](Pull.done)(c => Pull.output1(c)) + } + }, + ps => ps._2.uncons1.fold(_ => go(Some(ps._1), s, ps._2), _ => Pull.output1(ps._1) >> go(None, s, ps._2))) } go(None, self).stream } From a3ee13bcf0028990d0ac54b424acc7ed168d8d54 Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Wed, 22 Nov 2017 12:22:49 -0500 Subject: [PATCH 5/6] Fixed merged conflict --- core/shared/src/main/scala/fs2/Stream.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index f95e1115d1..7c8f99cd79 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2036,13 +2036,13 @@ object Stream { (parts, parts2) match { case (Left(()), _) => go(None, tl) case (Right((phd, ptl)), Left(())) => go(Some(phd), tl) - case (Right((phd, ptl)), Right(_)) => Pull.output1(phd) >> go(None, tl, ptl) + case (Right((phd, ptl)), Right(_)) => Pull.output1(phd) *> go(None, tl, ptl) } case None => carry.fold[Pull[F, O, Unit]](Pull.done)(c => Pull.output1(c)) } }, - ps => ps._2.uncons1.fold(_ => go(Some(ps._1), s, ps._2), _ => Pull.output1(ps._1) >> go(None, s, ps._2))) + ps => ps._2.uncons1.fold(_ => go(Some(ps._1), s, ps._2), _ => Pull.output1(ps._1) *> go(None, s, ps._2))) } go(None, self).stream } From 7924226103b880fd77a761b8818f88f69e3dad4e Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Wed, 22 Nov 2017 14:23:42 -0500 Subject: [PATCH 6/6] Modified implementation of repartition so that it operates on stream segments --- core/jvm/src/test/scala/fs2/StreamSpec.scala | 14 ++++----- core/shared/src/main/scala/fs2/Stream.scala | 33 +++++++------------- 2 files changed, 18 insertions(+), 29 deletions(-) diff --git a/core/jvm/src/test/scala/fs2/StreamSpec.scala b/core/jvm/src/test/scala/fs2/StreamSpec.scala index e2c7c9f7e9..c81dbcf274 100644 --- a/core/jvm/src/test/scala/fs2/StreamSpec.scala +++ b/core/jvm/src/test/scala/fs2/StreamSpec.scala @@ -108,24 +108,24 @@ class StreamSpec extends Fs2Spec with Inside { } "repartition" in { - Stream("Lore", "m ip", "sum dolo", "r sit amet").repartition(s => Segment.array(s.split(" "))).toList shouldBe + Stream("Lore", "m ip", "sum dolo", "r sit amet").repartition(s => Chunk.array(s.split(" "))).toList shouldBe List("Lorem", "ipsum", "dolor", "sit", "amet") - Stream("hel", "l", "o Wor", "ld").repartition(s => Segment.indexedSeq(s.grouped(2).toVector)).toList shouldBe + Stream("hel", "l", "o Wor", "ld").repartition(s => Chunk.indexedSeq(s.grouped(2).toVector)).toList shouldBe List("he", "ll", "o ", "Wo", "rl", "d") - Stream.empty.covaryOutput[String].repartition(_ => Segment.empty).toList shouldBe List() - Stream("hello").repartition(_ => Segment.empty).toList shouldBe List() + Stream.empty.covaryOutput[String].repartition(_ => Chunk.empty).toList shouldBe List() + Stream("hello").repartition(_ => Chunk.empty).toList shouldBe List() def input = Stream("ab").repeat - def ones(s: String) = Segment vector s.grouped(1).toVector + def ones(s: String) = Chunk vector s.grouped(1).toVector input.take(2).repartition(ones).toVector shouldBe Vector("a", "b", "a", "b") input.take(4).repartition(ones).toVector shouldBe Vector("a", "b", "a", "b", "a", "b", "a", "b") input.repartition(ones).take(2).toVector shouldBe Vector("a", "b") input.repartition(ones).take(4).toVector shouldBe Vector("a", "b", "a", "b") Stream.emits(input.take(4).toVector).repartition(ones).toVector shouldBe Vector("a", "b", "a", "b", "a", "b", "a", "b") - Stream(1, 2, 3, 4, 5).repartition(i => Segment(i, i)).toList shouldBe List(1, 3, 6, 10, 15, 15) + Stream(1, 2, 3, 4, 5).repartition(i => Chunk(i, i)).toList shouldBe List(1, 3, 6, 10, 15, 15) - Stream(1, 10, 100).repartition(i => Segment.from(i).map(_.toInt)).take(4).toList shouldBe List(1, 2, 3, 4) + Stream(1, 10, 100).repartition(i => Segment.from(i).map(_.toInt).take(1000).toChunk).take(4).toList shouldBe List(1, 2, 3, 4) } "translate" in forAll { (s: PureStream[Int]) => diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 7c8f99cd79..120b2bb025 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2014,37 +2014,26 @@ object Stream { self.reduce(S.combine(_, _)) /** - * Repartitions the input with the function `p`. On each step `p` is applied + * Repartitions the input with the function `f`. On each step `f` is applied * to the input and all elements but the last of the resulting sequence * are emitted. The last element is then appended to the next input using the * Semigroup `S`. * * @example {{{ * scala> import cats.implicits._ - * scala> Stream("Hel", "l", "o Wor", "ld").repartition(s => Segment.array(s.split(" "))).toList + * scala> Stream("Hel", "l", "o Wor", "ld").repartition(s => Chunk.array(s.split(" "))).toList * res0: List[String] = List(Hello, World) * }}} */ - def repartition(p: O => Segment[O,Unit])(implicit S: Semigroup[O]): Stream[F,O] = { - def go(carry: Option[O], s: Stream[F,O], partitions: Segment[O,Unit] = Segment.empty): Pull[F,O,Unit] = { - partitions.uncons1.fold({ _ => - s.pull.uncons1.flatMap { - case Some((hd, tl)) => - val next = carry.fold(hd)(c => S.combine(c, hd)) - val parts = p(next).uncons1 - val parts2 = parts.flatMap(_._2.uncons1) - (parts, parts2) match { - case (Left(()), _) => go(None, tl) - case (Right((phd, ptl)), Left(())) => go(Some(phd), tl) - case (Right((phd, ptl)), Right(_)) => Pull.output1(phd) *> go(None, tl, ptl) - } - case None => - carry.fold[Pull[F, O, Unit]](Pull.done)(c => Pull.output1(c)) - } - }, - ps => ps._2.uncons1.fold(_ => go(Some(ps._1), s, ps._2), _ => Pull.output1(ps._1) *> go(None, s, ps._2))) - } - go(None, self).stream + def repartition(f: O => Chunk[O])(implicit S: Semigroup[O]): Stream[F,O] = { + pull.scanSegments(Option.empty[O]) { (carry, segment) => + segment.scan((Segment.empty[O], carry)) { case ((_, carry), o) => + val o2: O = carry.fold(o)(S.combine(_, o)) + val partitions: Chunk[O] = f(o2) + if (partitions.isEmpty) partitions -> None + else partitions.take(partitions.size - 1).voidResult -> Some(partitions.strict.last) + }.flatMap { case (out, carry) => out }.mapResult { case ((out, carry), unit) => carry } + }.flatMap { case Some(carry) => Pull.output1(carry); case None => Pull.done }.stream } /**