diff --git a/benchmark/src/main/scala/fs2/benchmark/ChannelBenchmark.scala b/benchmark/src/main/scala/fs2/benchmark/ChannelBenchmark.scala new file mode 100644 index 0000000000..89cc2efd06 --- /dev/null +++ b/benchmark/src/main/scala/fs2/benchmark/ChannelBenchmark.scala @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2013 Functional Streams for Scala + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of + * this software and associated documentation files (the "Software"), to deal in + * the Software without restriction, including without limitation the rights to + * use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of + * the Software, and to permit persons to whom the Software is furnished to do so, + * subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS + * FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR + * COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER + * IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package fs2 +package benchmark + +import cats.effect.IO +import cats.effect.unsafe.implicits.global +import org.openjdk.jmh.annotations.{Benchmark, Param, Scope, Setup, State} +import cats.syntax.all._ +import fs2.concurrent.Channel +import org.openjdk.jmh.infra.Blackhole + +@State(Scope.Thread) +class ChannelBenchmark { + @Param(Array("64", "1024", "16384")) + var size: Int = _ + + var list: List[Unit] = _ + var lists: List[List[Unit]] = _ + + @Setup + def setup() { + list = List.fill(size)(()) + val subList = List.fill(size / 8)(()) + lists = List.fill(8)(subList) + } + + @Benchmark + def sendPull(): Unit = + Channel + .bounded[IO, Unit](size / 8) + .flatMap { channel => + val action = sendAll(list, channel.send(()).void) *> channel.close + action.start *> channel.stream.through(blackHole).compile.drain + } + .unsafeRunSync() + + @Benchmark + def sendPullPar8(): Unit = + Channel + .bounded[IO, Unit](size / 8) + .flatMap { channel => + val action = lists.parTraverse_(sendAll(_, channel.send(()).void)) *> channel.close + action.start *> channel.stream.through(blackHole).compile.drain + } + .unsafeRunSync() + + @Benchmark + def sendPullParUnlimited(): Unit = + Channel + .bounded[IO, Unit](size / 8) + .flatMap { channel => + val action = sendAll(list, channel.send(()).start.void) + action *> channel.stream.take(size).through(blackHole).compile.drain + } + .unsafeRunSync() + + @inline + private def sendAll(list: List[Unit], action: IO[Unit]) = + list.foldLeft(IO.unit)((acc, _) => acc *> action) + + private def blackHole(s: Stream[IO, Unit]) = + s.repeatPull(_.uncons.flatMap { + case None => Pull.pure(None) + case Some((hd, tl)) => + val action = IO.delay(0.until(hd.size).foreach(_ => Blackhole.consumeCPU(100))) + Pull.eval(action).as(Some(tl)) + }) +} diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index f1c4a9647e..0117e62ecc 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -117,16 +117,20 @@ object Channel { def bounded[F[_], A](capacity: Int)(implicit F: Concurrent[F]): F[Channel[F, A]] = { case class State( - values: Vector[A], + values: List[A], size: Int, waiting: Option[Deferred[F, Unit]], - producers: Vector[(A, Deferred[F, Unit])], + producers: List[(A, Deferred[F, Unit])], closed: Boolean ) - val initial = State(Vector.empty, 0, None, Vector.empty, false) + val open = State(List.empty, 0, None, List.empty, closed = false) - (F.ref(initial), F.deferred[Unit]).mapN { (state, closedGate) => + def empty(isClosed: Boolean): State = + if (isClosed) State(List.empty, 0, None, List.empty, closed = true) + else open + + (F.ref(open), F.deferred[Unit]).mapN { (state, closedGate) => new Channel[F, A] { def sendAll: Pipe[F, A, Nothing] = { in => @@ -146,12 +150,12 @@ object Channel { case State(values, size, waiting, producers, closed @ false) => if (size < capacity) ( - State(values :+ a, size + 1, None, producers, false), + State(a :: values, size + 1, None, producers, false), notifyStream(waiting) ) else ( - State(values, size, None, producers :+ (a -> producer), false), + State(values, size, None, (a, producer) :: producers, false), notifyStream(waiting) <* waitOnBound(producer, poll) ) }.flatten @@ -183,39 +187,33 @@ object Channel { Pull.eval { F.deferred[Unit].flatMap { waiting => state - .modify { case State(values, size, ignorePreviousWaiting @ _, producers, closed) => - if (values.nonEmpty || producers.nonEmpty) { - var unblock_ = F.unit - var allValues_ = values - - producers.foreach { case (value, producer) => - allValues_ = allValues_ :+ value - unblock_ = unblock_ >> producer.complete(()).void - } - - val unblock = unblock_ - val allValues = allValues_ - - val toEmit = Chunk.vector(allValues) - - ( - State(Vector(), 0, None, Vector.empty, closed), - // unblock needs to execute in F, so we can make it uncancelable - unblock.as( - Pull.output(toEmit) >> consumeLoop - ) - ) - } else { - ( - State(values, size, waiting.some, producers, closed), + .modify { state => + if (shouldEmit(state)) (empty(state.closed), state) + else (state.copy(waiting = waiting.some), state) + } + .flatMap { + case s @ State(values, stateSize, ignorePreviousWaiting @ _, producers, closed) => + if (shouldEmit(s)) { + var size = stateSize + var allValues = values + var unblock = F.unit + + producers.foreach { case (value, producer) => + size += 1 + allValues = value :: allValues + unblock = unblock <* producer.complete(()) + } + + val toEmit = makeChunk(allValues, size) + + unblock.as(Pull.output(toEmit) >> consumeLoop) + } else { F.pure( if (closed) Pull.done - else (Pull.eval(waiting.get) >> consumeLoop) + else Pull.eval(waiting.get) >> consumeLoop ) - ) - } + } } - .flatten .uncancelable } }.flatten @@ -231,6 +229,20 @@ object Channel { } def signalClosure = closedGate.complete(()) + + @inline private def shouldEmit(s: State) = s.values.nonEmpty || s.producers.nonEmpty + + private def makeChunk(allValues: List[A], size: Int): Chunk[A] = { + val arr = new Array[Any](size) + var i = size - 1 + var values = allValues + while (i >= 0) { + arr(i) = values.head + values = values.tail + i -= 1 + } + Chunk.array(arr).asInstanceOf[Chunk[A]] + } } } }