Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DebouncedChan: When channel is firing continously, only fire once per period #222

Merged
merged 1 commit into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Fix a problem in `DebouncedChan` where it would fire on its "out" channel too often when it was being signaled continuousy on its "in" channel. This would have caused work to be fetched more often than intended in busy systems. [PR #222](https://github.com/riverqueue/river/pull/222).

## [0.0.22] - 2024-02-19

### Fixed
Expand Down
44 changes: 38 additions & 6 deletions internal/util/chanutil/debounced_chan.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,19 @@ func NewDebouncedChan(ctx context.Context, cooldown time.Duration) *DebouncedCha

// C is the debounced channel. Multiple invocations to Call during the cooldown
// period will deduplicate to a single emission on this channel on the period's
// leading edge, and one more on the trailing edge.
// leading edge, and one more on the trailing edge for as many periods as
// invocations continue to come in.
func (d *DebouncedChan) C() <-chan struct{} {
return d.c
}

// Call invokes the debounced channel, and is the call which will be debounced.
// If multiple invocations of this function are made during the cooldown period,
// they'll be debounced to a single emission on C on the period's leading edge,
// and one more on the trailing edge.
// and then one fire on the trailing edge of each period for as long as Call
// continues to be invoked. If a timer period elapses without an invocation on
// Call, the timer is stopped and behavior resets the next time Call is invoked
// again.
func (d *DebouncedChan) Call() {
d.mu.Lock()
defer d.mu.Unlock()
Expand All @@ -69,7 +73,7 @@ func (d *DebouncedChan) Call() {
}
d.timerDone = false

go d.waitForTimer()
go d.waitForTimerLoop()
}

func (d *DebouncedChan) nonBlockingSendOnC() {
Expand All @@ -79,25 +83,53 @@ func (d *DebouncedChan) nonBlockingSendOnC() {
}
}

func (d *DebouncedChan) waitForTimer() {
// Waits for the timer to be fired, and loops as long as Call invocations come
// in. If a period elapses without a new Call coming in, the loop returns, and
// DebouncedChan returns to its initial state, waiting for a new Call.
//
// The loop also stops if context becomes done.
func (d *DebouncedChan) waitForTimerLoop() {
for {
if stopLoop := d.waitForTimerOnce(); stopLoop {
break
}
}
}

// Waits for the timer to fire once or context becomes done. Returns true if the
// caller should stop looping (i.e. don't wait on the timer again), and false
// otherwise.
func (d *DebouncedChan) waitForTimerOnce() bool {
select {
case <-d.ctxDone:
d.mu.Lock()
defer d.mu.Unlock()

if d.timer != nil {
if !d.timer.Stop() {
<-d.timer.C
}
d.timerDone = true
}

d.timerDone = true

case <-d.timer.C:
d.mu.Lock()
defer d.mu.Unlock()

if d.sendOnTimerExpired {
d.sendOnTimerExpired = false
d.nonBlockingSendOnC()

// Wait for another timer expiry, which will fire again if another
// Call comes in during that time. If no Call comes in, the timer
// will stop on the next cycle and we return to initial state.
d.timer.Reset(d.cooldown)
return false // do _not_ stop looping
}

d.timerDone = true
d.sendOnTimerExpired = false
}

return true // stop looping
}
19 changes: 7 additions & 12 deletions internal/util/chanutil/debounced_chan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,13 @@ func TestDebouncedChan_ContinuousOperation(t *testing.T) {
}

// Expect number of signals equal to number of cooldown periods that fit
// into our total test time, multiplied by two, because the debounced chan
// fires at the beginning and end of a bounce period. +1 for the last period
// that fires on the leading edge, but which we don't give time for the
// timer to fully expire. (We've chosen numbers so that test time doesn't
// divide by cooldown perfectly.)
// into our total test time, and +1 for an initial fire.
//
// This usually lands right on the expected number, but allow a delta of
// +/-4 to allow the channel to be off by two cycles (again, one cycle
// signals once at leading edge of the period and once at trailing, so 2
// cycles * 2 signals = 4) in either direction. By running at `-count 1000`
// or so I can usually reproduce an off-by-one-or-two cycle.
expectedNumSignal := int(math.Round(float64(testTime)/float64(cooldown)))*2 + 1
// This almost always lands right on the expected number, but allow a delta
// of +/-2 to allow the channel to be off by two cycles in either direction.
// By running at `-count 1000` I can usually reproduce an off-by-one-or-two
// cycle.
expectedNumSignal := int(math.Round(float64(testTime)/float64(cooldown))) + 1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the key test change here: notice how we drop the *2 expected invocations.

t.Logf("Expected: %d, actual: %d", expectedNumSignal, numSignals)
require.InDelta(t, expectedNumSignal, numSignals, 4)
require.InDelta(t, expectedNumSignal, numSignals, 2)
}
Loading