-
Notifications
You must be signed in to change notification settings - Fork 613
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
(Proposal) Applicative, Functor, and Invariant instances for Signal #1139
Conversation
Right now it's rather anemic what we can actually do with Signals, when they could become a light-weight FRP-ish system in their own right.
These come for free since we already have map and imap defined
Initial thought: Does it pass laws? |
Yes. If there's interest I'll write up some property tests to demonstrate this, but for now I'll give some high level arguments, especially since my current implementation is really meant more as a proof of concept of an idea rather than definitively how I think it should be implemented. The first thing to establish is that I'll use a loose version of equivalence. Two signals are equal if, when no new updates come through, the two signals converge to the same value. By converge I mean that after a finite (hopefully very short) amount of time, both The reason for this laxer definition is that Let's do each of the three levels of the proposal in turn. Note that I'll focus on Functor: Just use the current Applicative: If we use the proposed high-level approach for an If you're familiar with the monoidal presentation of Applicatives, I'll just note that zipping nondeterministically or deterministically is monoidal. Monad: I'll leave off |
@changlinli thanks for sharing your thoughts. Could you then add tests with Laws that verify your observations ? |
Add Discipline tests that verify our immutable.Signal instances are lawful. Note that I haven't found thought of a good way to test mutable.Signal instances that doesn't effectively just test the immutable.Signal part.
@pchlupacek @ChristopherDavenport done. Take a look. There's a bit of gymnastics I have to do to get some of the right testing typeclass instances, but I think the end result is okay. |
FunctorTests.apply[SignalIO](immutable.Signal.signalIsFunctor).functor[String, Int, Double] | ||
) | ||
|
||
private implicit val testExecutionContext: ExecutionContextExecutor = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we either use the global EC or shut this one down at the end of all tests? Maybe TestUtil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm... I'd like to keep as many factors constant when testing as possible and the global
will spin up threads based on the number of processors. If you don't think that's too big of a deal I'm happy to use the global one, but for now I'll just shutdown the executor. If you have preferences around what you'd like to move to TestUtil
let me know as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never mind, see my next comment.
To prevent thread pools from hanging around for the life of the test suite.
@mpilquist Actually I just realized the I still feel a little uncomfortable about having the number of threads change based on the machine, but I suppose the number of physical cores changing might matter more anyways. |
This brings the Signal laws in line with the rest of the test code, e.g. ChunkSpec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good -- added a few minor style comments to be more consistent with other parts of code base.
@@ -37,6 +38,49 @@ abstract class Signal[F[_], A] { self => | |||
} | |||
|
|||
object Signal { | |||
implicit def signalIsFunctor[F[_]: Functor]: Functor[({ type L[X] = Signal[F, X] })#L] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/signalIsFunctor/functorInstance/
@@ -37,6 +38,49 @@ abstract class Signal[F[_], A] { self => | |||
} | |||
|
|||
object Signal { | |||
implicit def signalIsFunctor[F[_]: Functor]: Functor[({ type L[X] = Signal[F, X] })#L] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use kind projector syntax here -- Functor[Signal[F, ?]]
override def map[A, B](fa: Signal[F, A])(f: A => B): Signal[F, B] = fa.map(f) | ||
} | ||
|
||
implicit def signalIsApplicative[F[_]]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/signalIsApplicative/applicativeInstance/
} | ||
|
||
implicit def signalIsApplicative[F[_]]( | ||
implicit effectEv: Effect[F], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note on series/1.0, we'll likely be able to get rid of the EC requirement and downgrade Effect
to Concurrent
@@ -134,4 +140,10 @@ object Signal { | |||
} | |||
} | |||
} | |||
|
|||
implicit def signalIsInvariantFunctor[F[_]: Functor] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/signalIsInvariantFunctor/invariantFunctorInstance/ - also use kind project syntax below
Because we have conflicting Functor instances that might arise both from our independent Functor instance for Signal and our Applicative instance for Signal, we need to make sure that one of them wins.
This allows us to gain the efficiency advantage of Signal's Functor even if we're using an Applicative instance.
This prevents this method from being created anew on each instance of our Applicative
Move Signal instances to kind projector syntax, bringing it in line with the rest of the codebase as well as the naming. Also rename the LowPriority trait to bring it in line with how TranslateInterrupt is named.
I'm wary of inadvertently increasing the public API of fs2. I'd be more comfortable doing so in the 1.0 release.
Oh I hadn't realized it was going in without more discussion; I've added in some commits to clean up some of the ergonomics as a result. I'd also love to discuss your gist https://gist.github.com/mpilquist/9deef315f0aafc0de9b6756fe4146231 at some point because I think that something like that (for a newer version of fs2) would both be a useful method by itself, and would also enable a I also suspect this should deprecate I would also love to update the micro-site with the |
} | ||
} | ||
|
||
private def nondeterministicZip[F[_], A0, A1](xs: Stream[F, A0], ys: Stream[F, A1])( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 This could be added to Stream
directly as yip
, which we had pre-0.9.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just remark I would add this in separate PR + tests for it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I'm happy to leave it as a private method for now and then expose it as part of another PR if that's what you mean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a side note to yip
derived from this implementation. This does not handle any scopes, errors, recoveries etc. (I hope with signal it is safe to assume they are not issue).
In Stream version however, this will have these to be addressed, and likely we will need to use merge-like pattern, unless new Fold
will allow us to do something better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really understand how scopes work (and I don't know in particular what's missing here scope-wise) so I'll need help there if I were to implement it :(, but I can try reading through the code some more. I'm not sure what you mean by errors and recovery though. As far I can tell this stream will fail if any of its constituent streams fail, and recovery never comes for free anyways; the standard bracket
and restart pattern should work here as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@changlinli no worries, lets focus on that in separate PRs. Thanks for this work.
Looks good to me. I could see a subsequent PR that defines Re: |
|
||
override def pure[A](x: A): Signal[F, A] = fs2.async.mutable.Signal.constant(x) | ||
|
||
override def ap[A, B](ff: Signal[F, A => B])(fa: Signal[F, A]): Signal[F, B] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit curious on ff
. Eventhough I understand the need for this, I am sort of curious how to define it in reality. You will need for discrete signal (#55) to assure that new value is produced for each ff, and as such gurantee that ff1, ff2, ff3 are different instances. I am not saying this is impossible, but I am feeling a bit awkward here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I understand what you mean by discrete signal here. I think you could construct one with Stream.unfold
and async.hold
?
I agree that ap
is probably not what most people would use. In fact I'm considering overriding Applicative.product
and then implementing ap
in terms of product
and map
. The intent here is that they would use Applicative.product
or Applicative.mapN
far more. E.g.
val someSignal: Signal[F, String] = ???
val otherSignal: Signal[F, String] = ???
(someSignal, otherSignal).mapN{ case (x, y) => (x ++ y).size}
In fact I wager that ap
is usually used with a pure
on the function rather than actually having a wrapped function (e.g. f.pure ap x
) and that mapN
probably has more usage than either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might make sense for me to just add a bunch more docs as well to the micro-site, but I don't know if I want to do that here or in a separate PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My trouble is that function always persists its identtity, so :
val f1: Int => String = (i) => i.toString
val f2: Int => String = (i) => i.toString
assert(f1 == f1)
assert(f1 != f2)
assert(f2 == f2)
Now imagine signal of functions you need to produce. Essentially you will need to create always new function instance something like :
val signal: Signal[Int] = ???
val signalF: Signal[Int => String] = signal.map { i0 => val f1: Int => String = (i) => (i + i0).toString }
to produce signal, that will produce reliable discrete source of Int => String function.
As I said, I am not saying it is impossible, I just hardly believe this is something that users will do and is not "perfect" code.
In fact in 0.8 we had similar encodings for sinks and pipes, and we moved from it for several reasons, including performance bottleneck and resource safety.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't define ap
here with the expectation that users will use it. I define it purely because that's what is needed for an Applicative
which I do want because it gives me access to mapN
and product
which I am far more likely to use.
Stream
actually is in the exact same scenario. This compiles:
import cats.implicits._
Stream((i: Int) => i + 1).ap(Stream(0)) // Stream(1)
because Stream
is Applicative
, but I would be surprised if anyone ever used ap
. A similar situation arises here. I define ap
because that's what makes Signal
applicative, but I don't think anyone would ever use it directly.
So I agree that ap
is hard to use, but I don't see users using it. I see them using other interesting parts of the Applicative
instance. Instead, it makes things like this possible:
val s0: Signal[IO, Int] = ???
val s1: Signal[IO, Int] = ???
List(s0, s1).sequence // This gives you a Signal[IO, List[Int]]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, and agree. The reason why I am bringing it up is the fact that
nondeterministicZip(ff.discrete, fa.discrete).map { case (f, a) => f(a) }
assumes that ff.discrete
will change regularly, and actually it will likely only emit single value and then holds. Not sure of the impacts there, I mean what combinators for Applicative will this broke.
I hope that all Laws fro applicative covers that and we are safe there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The laws do cover it :).
In fact it's pretty common behavior. Most examples of direct ap
I've seen go basically something like
f.pure[F] ap x ap y ap z
where the f
is always wrapped with just a pure
.
Take the Applicative
instance of List
for example. It has the exact same problem, where people rarely make a list of functions, and you'd only end up with the singleton List
of a function most of the time if you used ap
directly, but it is what enables all the other Applicative
methods to work (such as producte
and mapN
).
FWIW, almost always the ff.discrete
is going to be generated off another non-singleton Signal
rather than user-defined since e.g. product
and map2
are defined as
override def product[A, B](fa: F[A], fb: F[B]): F[(A, B)] =
ap(map(fa)(a => (b: B) => (a, b)))(fb)
def map2[A, B, Z](fa: F[A], fb: F[B])(f: (A, B) => Z): F[Z] =
map(product(fa, fb))(f.tupled)
* straightforward way would cause the (non-deterministically) zipped | ||
* stream to be empty. | ||
*/ | ||
def discrete = Stream(a) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't believe this implementation is correct. discrete
is defined as Stream(a) ++ eval(F.never)
, actually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is interesting; what do you mean by correct? Is there another Signal
implementation I should look at? I see the linked pull request in another comment, but I'm not sure immediately how it relates. I'm assuming you're talking about invariants as they relate to Signal
s themselves rather than say the Applicative laws (since Discipline is already checking those).
Presumably there are series of invariants/laws that we intuitively expect all Signal
s to obey, although that's not spelled anywhere (perhaps they should be?).
My intuition is that there are the following invariants with some hand-waving around the word "equivalent" and "subsequence":
continuous
should be equivalent toStream.repeatEval(get)
, even if it is not implemented as such.discrete
should be a subsequence ofcontinuous
; that is given a sufficiently fast consumer, all the elements observed in adiscrete
stream should be observed in acontinuous
stream and in the same order (although thecontinuous
stream may have more elements).get
,continuous
, anddiscrete
should each be "side-effect-free;" that is even though there is an effectF
parameter which indicates non-determinism, callingget
,continuous
, ordiscrete
should not affect any other observers of the same signal (mySignal
generator I use for the tests does not follow this because this makes testing more difficult).
I suspect you have another invariant in mind, in analogy to how the included implementations of Topic
and Queue
work:
- Neither
continuous
nordiscrete
should ever terminate. Or perhaps this is a corollary of a deeper intuition that you have, that a downstream consumer should be unable to distinguish between aSignal
that is "done" and aSignal
which happens to be quiet for the moment, but could at any time receive a new change.
There's three things that are mildly annoying about this invariant. The first, superficial one is that it just makes testing much more difficult. Equivalence between Signal
s probably has to be defined with some notion of convergence and without termination, I think your tests tend to become timeouts with all the crazy non-determinism around them. This is less true for Applicative
where a sentinel value might still be enough (e.g. use noneTerminate
with async.hold
and wait on the discrete
stream until it hits None
and then check get
, A0], ys: Stream[F, A1])(). However, I strongly suspect that for the Monad
instance (I do think Signal
has a useful Monad
instance, I'm just not sure how to write the switchMap
that it depends on yet), you'll potentially have short-lived oscillation of values (e.g. with certain flatMap
configurations you'll have a Signal
that oscillates to None
then away and then back again), which makes sentinel-based testing very difficult.
The somewhat less superficial one is that I think that behavior is exactly what I would expect of a mutable.Signal
, but not necessarily of an immutable.Signal
. I would lump mutable.Signal
with all the other mutable
structures, but I view an immutable.Signal
as essentially just a Stream
that happens to have a notion of "current" state which I can query at any time. And in the same way that a Stream
can be infinite or finite, an immutable.Signal
can be infinite or finite as well.
There's a final backwards-compatibility issue where I'm already a bit wary of changing Signal.constant
's behavior to be a singleton Stream
. I was considering leaving it be and just making a custom pure
instead of reusing constant
, but in the end I hoped that nobody was using constant
in a way where changing it from an empty stream to a finite stream would make a difference since they both still terminate. Making it a non-terminating Stream
seems scarier because now potentially there's code that used to terminate but now might hang.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies for the wall of text :(
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed like you observed, the problem with implementation that def discrete = Stream(a)
is that that stream will terminate after a
is produced, and that should not be the case. In fact discrete stream shall never ever terminate (same as continuous) unless explicitly told so (i.e. with Pipe).
I am quite negative on having signals where one terminates discrete, while continuos
is not terminates, and other variants of signals that works differently.
I am not yet convinced by your arguments that we shall allow it. I am using signals for all tests in our code, and very, very rarely we need to rely on timeouts. Perhaps I am not understanding your point well.
Also just few more remarks :
discrete
may not produce all elements available incontinuous
. There is no queue-like guarantee.- the only difference between
discrete
andcontinuous
is thatdiscrete
will hold if there is no new value signalled, unlikecontinuous
that always produces value. - I don't think so there shall be any difference in behaviour of mutable/immutable signal. If we would allow it, then I think the
mutable
Signal must not extendsimmutable
signal variant. - I believe that current implementation of
mutable.Signal.constant#discrete
is incorrect (buggy), and it should in fact be Stream(a) ++ eval(F.never).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Yep I agree that
discrete
is a subsequence ofcontinuous
(and not the other way around). - I agree here, but it's very difficult to define what "hold" means in a rigorous way, especially since
mutable.Signal
has arefresh
method. I think the best one can do is just saydiscrete
is a subsequence ofcontinuous
and then just hope that it's not the trivial subsequence. - I view always infinite (
mutable
) as a subset of infinite or finite (immutable
) so it seems fine to me that even with this behaviormutable.Signal
extendsimmutable.Signal
. Liskov substitution holds just fine. - At this point
constant
's finiteness is a part of the public API and making it never terminate can potentially cause downstream code to hang. If you're firmly convinced this is a bug that might be fine, but I'm wary of the damage that'll cause, especially since if IIRC this has been its definition since 0.9.
My point about tests wasn't about using Signal
s in other tests, but testing Signal
s themselves. For example, my Discipline tests that validate the Applicative
laws for immutable.Signal
are more annoying to write in a deterministic fashion if none of my Signal
s ever terminate. They're still possible with some finagling. On the other hand, they become much harder if in the future someone writes a Monad
instance and wants to write tests that it passes the Monad
laws.
That being said, there's no reason Applicative.pure
has to be Signal.constant
, although that's probably non-intuitive. So you could modify them independently if you wanted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok, I see. any thoughts on this @mpilquist // @SystemFw
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only that we can't make that signature change in 0.10
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mpilquist @pchlupacek So can I push again for the Applicative
instance in 0.10 and then hold off on the non-termination until 1.0? It passes Applicative
laws either way (although it's a lot harder to show if you put non-termination in)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, I'm still blocked on your review; is there anything else you'd like me to fix up?
Overall looks promising. My biggest concern is the |
Running |
Stemming from a gitter conversation with @SystemFw.
This is a proposal to give
Signal
some useful cats typeclass instances. There's three "layers" to this proposal with two of them implemented here. Apologies for the large amount of text.Functor
instance forimmutable.Signal
and anInvariant
instance formutable.Signal
. This comes "for free" since ofmap
andimap
already are defined.Applicative
instance forimmutable.Signal
. I have a proof of concept here; it feels a bit weird not actually create a "real"Signal
and instead delegateget
anddiscrete
to different parts of the constituentSignal
s, but hopefully I've demonstrated it's not very difficult to do so.Monad
instance forimmutable.Signal
. This is harder. I suspect you need something like switchMap (which is defined here by @mpilquist for version 0.9) to define this. I didn't know how to do that well.I hope that
Functor
andInvariant
are pretty uncontroversial and basically boil down to whether you want the additional lines of code explicitly writing out the instances as a burden or not.Applicative
andMonad
are probably where things get more controversial.There's two central questions: why would would these instances (especially
Applicative
andMonad
) be useful and what are the semantics ofApplicative
andMonad
?The utility of these instances is for composing
Signal
s together. Right now I've found thatSignal
s that are emitted by fs2, such as thesize
of aQueue
are essentially useful only to immediately turn intoStream
s viadiscrete
orcontinuous
. As a motivating example, if I wanted to make a dashboard gadget that had the sum of all the sizes of all the activeQueue
s in my application that both showed the current value and dynamically updated it, it's very straightforward withApplicative
(if the number of activeQueue
s in my application remains constant over the lifetime of my application) orMonad
(if the number of activeQueue
s changes) by combiningSignal
s directly into a newSignal
, but it's a bit more cumbersome right now.The semantics of the
get
method onSignal
s constructed viaApplicative
andMonad
is pretty straightforward. Just get the current value of the underlying constituentSignal
s and applyap
ed orflatMap
ed functions as necessary.continuous
is justget
ing anytime there's a pull.discrete
is a bit harder. ForApplicative
it's basically a "non-deterministic zip" of the underlyingSignal
s. That is if you take Applicativezip
(i.e.ap
ing the tuple function)Signal[F, A]
andSignal[F, B]
, thediscrete
stream of the resultingSignal[F, (A, B)]
, is a stream that emits a new(A, B)
whenever either one of the underlyingSignal
sdiscrete
streams emits an element (as opposed toStream.zip
which only emits when both emit).The
discrete
that results from flattening aSignal[F, Signal[F, A]]
is a version of "flattening" aStream[F, Stream[F, A]]
where we run the inner stream until the outer stream emits a new inner stream and immediately throw away our current inner stream and start pulling down from the new inner stream. This is analogous to Monix's switchMap and @mpilquist had a proof of concept of this in fs2 0.9.So, if you got this far, what are people's thoughts?