-
Notifications
You must be signed in to change notification settings - Fork 613
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Queue#peek1 and timedPeek1 #996
Conversation
} | ||
|
||
private def peek1Impl: F[Either[Ref[F, A], A]] = ref[F, A].flatMap { ref => | ||
qref.modify2 { state => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency, I'd rather this follow the pattern of dealing with state and output separately, using modify
. This will also allow for some simplification
} | ||
|
||
def dequeue1: F[A] = cancellableDequeue1.flatMap { _._1 } | ||
|
||
def peek1: F[A] = peek1Impl.flatMap { | ||
case Left(ref) => ref.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be just a fold
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean like .flatMap(_.fold(_.get, F.pure))
? Pattern matching is supposedly faster; but if fold
is preferred in this codebase, I can change it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it will make a big difference (unless I see numbers), but I also don't think is something to get hung up on if you don't like the change (and yes, .flatMap(_.fold(_.get, F.pure))
). The only important points are modify
vs modify2
, and whether we should have a cancellablePeek
or not
// queue was empty, we had waiting dequeuers | ||
c.previous.deq.head.setAsyncPure(Chunk.singleton(a)) | ||
} | ||
val pk = if (c.previous.peek.isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want, you can fold
or map(...).getOrElse
on c.previous.peek
here, since it's an Option. Not a big deal though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've used isEmpty
/get
to conform to the already existing code there (which uses isEmpty
/head
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough :)
final case class State( | ||
queue: Vector[A], | ||
deq: Vector[Ref[F,Chunk[A]]], | ||
peek: Option[Ref[F, A]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a @param
for this as well would be nice, for consistency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
} | ||
}.map(_._2) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally speaking I like the approach here, not sure whether we should go the full route of having a cancellablePeek1
like for dequeue
or not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll think about it.
Generally I like the approach, left a few comments. From an implementation point of view |
I've pushed a commit addressing (hopefully) all comments, except the one about cancellable peek. I'll have to think about that one. (Actually I wish there was a general way of cancelling async stuff, not implementing separately one variant for every method...) |
else state | ||
}.map { change => | ||
if (change.previous.queue.isEmpty) Left(change.now.peek.get) | ||
else Right(change.now.queue.head) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One super minor nitpick (sorry). change.previous.queue.head
for symmetry with the isEmpty
(since the queue hasn't changed between previous and now)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough.
Apart from one extra little comment, it's ready. Thank you very much for this :) |
@@ -16,3 +16,4 @@ Also see the [GitHub contributor stats](https://github.com/functional-streams-fo | |||
- Rúnar Ó. Bjarnason ([@runarorama](https://github.com/runarorama)): Various processes and combinators, circular buffers, compression. I'm an ideas man, Michael. | |||
- Jed Wesley-Smith ([@jedws](https://github.com/jedws)): really very minor tweaks, cleanups and pestering, hardly worth the mention | |||
- Michael Pilquist ([@mpilquist](https://github.com/mpilquist)): 0.9 redesign work, maintenance | |||
- Daniel Urban ([@durban](https://github.com/durban)): queue peek implementation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add to build.sbt too so it shows up in the generated POM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol I need to remember to do that as well at some point
Looks great, thanks |
Queue#peek1
andtimedPeek1
(fixes #983).I'm unsure about the semantics for
synchronous
(and consequently forsynchronousNoneTerminated
). If I understand correctly, that queue is (conceptually) always empty. That would suggest, thatpeek1
should never finish. However, it is in fact possible (sometimes) to dequeue an element from such a queue. Thus, peek could also work. This seems more useful, so this is what this PR currently implements.