Skip to content

Commit

Permalink
DebouncedChan: When channel is firing continously, only fire once per…
Browse files Browse the repository at this point in the history
… period (#222)

This one's a follow up to #221. `DebouncedChan` previously had a bit of
a bug where if `Call` was being invoked continuously, it was firing more
often than expected: once for an initial `Call`, and then again when the
timer elapsed.

Here, we modify the implementation so that under a continuous fire
situation, `DebouncedChan` will fire once initially, and then once every
time the timer elapses at the end of each period. This reduces the
number of emits on `C` from 2N+1 to the more expected N+1 (the +1 being
the initial fire).

We accomplish this by entering a loop that waits on the timer when
receiving an initial `Call`, with the loop continuously resetting and
waiting on the timer after each time it fires, sending on `C` each time
the period elapses where a `Call` invocation came in. If a period
elapses without a new `Call` coming in, the timer stops, the loop
returns and `DebouncedChan` returns to its initial state.
  • Loading branch information
brandur authored Feb 22, 2024
1 parent ed7cb20 commit d3bdd56
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 18 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Fix a problem in `DebouncedChan` where it would fire on its "out" channel too often when it was being signaled continuousy on its "in" channel. This would have caused work to be fetched more often than intended in busy systems. [PR #222](https://github.com/riverqueue/river/pull/222).

## [0.0.22] - 2024-02-19

### Fixed
Expand Down
44 changes: 38 additions & 6 deletions internal/util/chanutil/debounced_chan.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,19 @@ func NewDebouncedChan(ctx context.Context, cooldown time.Duration) *DebouncedCha

// C is the debounced channel. Multiple invocations to Call during the cooldown
// period will deduplicate to a single emission on this channel on the period's
// leading edge, and one more on the trailing edge.
// leading edge, and one more on the trailing edge for as many periods as
// invocations continue to come in.
func (d *DebouncedChan) C() <-chan struct{} {
return d.c
}

// Call invokes the debounced channel, and is the call which will be debounced.
// If multiple invocations of this function are made during the cooldown period,
// they'll be debounced to a single emission on C on the period's leading edge,
// and one more on the trailing edge.
// and then one fire on the trailing edge of each period for as long as Call
// continues to be invoked. If a timer period elapses without an invocation on
// Call, the timer is stopped and behavior resets the next time Call is invoked
// again.
func (d *DebouncedChan) Call() {
d.mu.Lock()
defer d.mu.Unlock()
Expand All @@ -69,7 +73,7 @@ func (d *DebouncedChan) Call() {
}
d.timerDone = false

go d.waitForTimer()
go d.waitForTimerLoop()
}

func (d *DebouncedChan) nonBlockingSendOnC() {
Expand All @@ -79,25 +83,53 @@ func (d *DebouncedChan) nonBlockingSendOnC() {
}
}

func (d *DebouncedChan) waitForTimer() {
// Waits for the timer to be fired, and loops as long as Call invocations come
// in. If a period elapses without a new Call coming in, the loop returns, and
// DebouncedChan returns to its initial state, waiting for a new Call.
//
// The loop also stops if context becomes done.
func (d *DebouncedChan) waitForTimerLoop() {
for {
if stopLoop := d.waitForTimerOnce(); stopLoop {
break
}
}
}

// Waits for the timer to fire once or context becomes done. Returns true if the
// caller should stop looping (i.e. don't wait on the timer again), and false
// otherwise.
func (d *DebouncedChan) waitForTimerOnce() bool {
select {
case <-d.ctxDone:
d.mu.Lock()
defer d.mu.Unlock()

if d.timer != nil {
if !d.timer.Stop() {
<-d.timer.C
}
d.timerDone = true
}

d.timerDone = true

case <-d.timer.C:
d.mu.Lock()
defer d.mu.Unlock()

if d.sendOnTimerExpired {
d.sendOnTimerExpired = false
d.nonBlockingSendOnC()

// Wait for another timer expiry, which will fire again if another
// Call comes in during that time. If no Call comes in, the timer
// will stop on the next cycle and we return to initial state.
d.timer.Reset(d.cooldown)
return false // do _not_ stop looping
}

d.timerDone = true
d.sendOnTimerExpired = false
}

return true // stop looping
}
19 changes: 7 additions & 12 deletions internal/util/chanutil/debounced_chan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,13 @@ func TestDebouncedChan_ContinuousOperation(t *testing.T) {
}

// Expect number of signals equal to number of cooldown periods that fit
// into our total test time, multiplied by two, because the debounced chan
// fires at the beginning and end of a bounce period. +1 for the last period
// that fires on the leading edge, but which we don't give time for the
// timer to fully expire. (We've chosen numbers so that test time doesn't
// divide by cooldown perfectly.)
// into our total test time, and +1 for an initial fire.
//
// This usually lands right on the expected number, but allow a delta of
// +/-4 to allow the channel to be off by two cycles (again, one cycle
// signals once at leading edge of the period and once at trailing, so 2
// cycles * 2 signals = 4) in either direction. By running at `-count 1000`
// or so I can usually reproduce an off-by-one-or-two cycle.
expectedNumSignal := int(math.Round(float64(testTime)/float64(cooldown)))*2 + 1
// This almost always lands right on the expected number, but allow a delta
// of +/-2 to allow the channel to be off by two cycles in either direction.
// By running at `-count 1000` I can usually reproduce an off-by-one-or-two
// cycle.
expectedNumSignal := int(math.Round(float64(testTime)/float64(cooldown))) + 1
t.Logf("Expected: %d, actual: %d", expectedNumSignal, numSignals)
require.InDelta(t, expectedNumSignal, numSignals, 4)
require.InDelta(t, expectedNumSignal, numSignals, 2)
}

0 comments on commit d3bdd56

Please sign in to comment.