Skip to content

Commit

Permalink
add ability to put metadata updates after job execution (#758)
Browse files Browse the repository at this point in the history
Leverage this to update the snooze count in metadata without requiring
sqlc param specifically for that purpose. This is more generalizable and
can be used to solve other problems too.
  • Loading branch information
bgentry authored Feb 20, 2025
1 parent c7c7e45 commit 0f6987b
Show file tree
Hide file tree
Showing 20 changed files with 983 additions and 185 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

⚠️ Version 0.18.0 has breaking changes for the `rivertest.Worker` type that was just introduced. While attempting to round out some edge cases with its design, we realized some of them simply couldn't be solved adequately without changing the overall design such that all tested jobs are inserted into the database. Given the short duration since it was released (over a weekend) it's unlikely many users have adopted it and it seemed best to rip off the bandaid to fix it before it gets widely used.

### Added

- Jobs can now store a recorded "output" value, a JSON-encoded payload set by the job during execution and stored in the job's metadata. The `river.RecordOutput` function makes it easy to use the job row to store transient/temporary values that are needed for introspection or for other downstream jobs. The output can be accessed using the `JobRow.Output()` helper method.

This output is stored at the same time as the job is completed following execution, so it does not require additional database calls or overhead. Output can be anything that can be stored in a Postgres JSONB field, though for performance reasons it should be limited in size. [PR #758](https://github.com/riverqueue/river/pull/758).

### Changed

- **Breaking change:** The `rivertest.Worker` type now requires all jobs to be inserted into the database. The original design allowed workers to be tested without hitting the database at all. Ultimately this design made it hard to correctly simulate features like `JobCompleteTx` and the other potential solutions seemed undesirable.
Expand Down
48 changes: 26 additions & 22 deletions internal/jobcompleter/job_completer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,14 @@ func (c *InlineCompleter) Start(ctx context.Context) error {

func setStateParamsToMany(params *riverdriver.JobSetStateIfRunningParams) *riverdriver.JobSetStateIfRunningManyParams {
return &riverdriver.JobSetStateIfRunningManyParams{
Attempt: []*int{params.Attempt},
ErrData: [][]byte{params.ErrData},
FinalizedAt: []*time.Time{params.FinalizedAt},
ID: []int64{params.ID},
ScheduledAt: []*time.Time{params.ScheduledAt},
SnoozeDoIncrement: []bool{params.SnoozeDoIncrement},
State: []rivertype.JobState{params.State},
Attempt: []*int{params.Attempt},
ErrData: [][]byte{params.ErrData},
FinalizedAt: []*time.Time{params.FinalizedAt},
ID: []int64{params.ID},
MetadataDoMerge: []bool{params.MetadataDoMerge},
MetadataUpdates: [][]byte{params.MetadataUpdates},
ScheduledAt: []*time.Time{params.ScheduledAt},
State: []rivertype.JobState{params.State},
}
}

Expand Down Expand Up @@ -421,22 +422,24 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error {
// it's done this way to allocate as few new slices as necessary.
mapBatch := func(setStateBatch map[int64]*batchCompleterSetState) *riverdriver.JobSetStateIfRunningManyParams {
params := &riverdriver.JobSetStateIfRunningManyParams{
ID: make([]int64, len(setStateBatch)),
Attempt: make([]*int, len(setStateBatch)),
ErrData: make([][]byte, len(setStateBatch)),
FinalizedAt: make([]*time.Time, len(setStateBatch)),
ScheduledAt: make([]*time.Time, len(setStateBatch)),
SnoozeDoIncrement: make([]bool, len(setStateBatch)),
State: make([]rivertype.JobState, len(setStateBatch)),
ID: make([]int64, len(setStateBatch)),
Attempt: make([]*int, len(setStateBatch)),
ErrData: make([][]byte, len(setStateBatch)),
FinalizedAt: make([]*time.Time, len(setStateBatch)),
MetadataDoMerge: make([]bool, len(setStateBatch)),
MetadataUpdates: make([][]byte, len(setStateBatch)),
ScheduledAt: make([]*time.Time, len(setStateBatch)),
State: make([]rivertype.JobState, len(setStateBatch)),
}
var i int
for _, setState := range setStateBatch {
params.ID[i] = setState.Params.ID
params.Attempt[i] = setState.Params.Attempt
params.ErrData[i] = setState.Params.ErrData
params.FinalizedAt[i] = setState.Params.FinalizedAt
params.MetadataDoMerge[i] = setState.Params.MetadataDoMerge
params.MetadataUpdates[i] = setState.Params.MetadataUpdates
params.ScheduledAt[i] = setState.Params.ScheduledAt
params.SnoozeDoIncrement[i] = setState.Params.SnoozeDoIncrement
params.State[i] = setState.Params.State
i++
}
Expand All @@ -458,13 +461,14 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error {
for i := 0; i < len(setStateBatch); i += c.completionMaxSize {
endIndex := min(i+c.completionMaxSize, len(params.ID)) // beginning of next sub-batch or end of slice
subBatch := &riverdriver.JobSetStateIfRunningManyParams{
ID: params.ID[i:endIndex],
Attempt: params.Attempt[i:endIndex],
ErrData: params.ErrData[i:endIndex],
FinalizedAt: params.FinalizedAt[i:endIndex],
ScheduledAt: params.ScheduledAt[i:endIndex],
SnoozeDoIncrement: params.SnoozeDoIncrement[i:endIndex],
State: params.State[i:endIndex],
ID: params.ID[i:endIndex],
Attempt: params.Attempt[i:endIndex],
ErrData: params.ErrData[i:endIndex],
FinalizedAt: params.FinalizedAt[i:endIndex],
MetadataDoMerge: params.MetadataDoMerge[i:endIndex],
MetadataUpdates: params.MetadataUpdates[i:endIndex],
ScheduledAt: params.ScheduledAt[i:endIndex],
State: params.State[i:endIndex],
}
jobRowsSubBatch, err := completeSubBatch(subBatch)
if err != nil {
Expand Down
62 changes: 31 additions & 31 deletions internal/jobcompleter/job_completer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestInlineJobCompleter_Complete(t *testing.T) {
t.Cleanup(completer.Stop)
completer.disableSleep = true

err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(1, time.Now()))
err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(1, time.Now(), nil))
if !errors.Is(err, expectedErr) {
t.Errorf("expected %v, got %v", expectedErr, err)
}
Expand Down Expand Up @@ -167,14 +167,14 @@ func TestAsyncJobCompleter_Complete(t *testing.T) {

// launch 4 completions, only 2 can be inline due to the concurrency limit:
for i := range int64(2) {
if err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(i, time.Now())); err != nil {
if err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(i, time.Now(), nil)); err != nil {
t.Errorf("expected nil err, got %v", err)
}
}
bgCompletionsStarted := make(chan struct{})
go func() {
for i := int64(2); i < 4; i++ {
if err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(i, time.Now())); err != nil {
if err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(i, time.Now(), nil)); err != nil {
t.Errorf("expected nil err, got %v", err)
}
}
Expand Down Expand Up @@ -267,7 +267,7 @@ func testCompleterSubscribe(t *testing.T, constructor func(riverdriver.Executor,
}()

for i := range 4 {
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(int64(i), time.Now())))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(int64(i), time.Now(), nil)))
}

completer.Stop() // closes subscribeChan
Expand Down Expand Up @@ -312,7 +312,7 @@ func testCompleterWait(t *testing.T, constructor func(riverdriver.Executor, Subs
// launch 4 completions:
for i := range 4 {
go func() {
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(int64(i), time.Now())))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(int64(i), time.Now(), nil)))
}()
<-completeStartedCh // wait for func to actually start
}
Expand Down Expand Up @@ -547,9 +547,9 @@ func testCompleter[TCompleter JobCompleter](
job3 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)})
)

require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job1.ID, finalizedAt1)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job2.ID, finalizedAt2)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job3.ID, finalizedAt3)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job1.ID, finalizedAt1, nil)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job2.ID, finalizedAt2, nil)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job3.ID, finalizedAt3, nil)))

completer.Stop()

Expand Down Expand Up @@ -599,7 +599,7 @@ func testCompleter[TCompleter JobCompleter](
t.Cleanup(riverinternaltest.DiscardContinuously(bundle.subscribeCh))

for i := range jobs {
require.NoError(t, completer.JobSetStateIfRunning(ctx, &stats[i], riverdriver.JobSetStateCompleted(jobs[i].ID, time.Now())))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &stats[i], riverdriver.JobSetStateCompleted(jobs[i].ID, time.Now(), nil)))
}

completer.Stop()
Expand Down Expand Up @@ -679,13 +679,13 @@ func testCompleter[TCompleter JobCompleter](
job7 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)})
)

require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCancelled(job1.ID, time.Now(), []byte("{}"))))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job2.ID, time.Now())))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateDiscarded(job3.ID, time.Now(), []byte("{}"))))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateErrorAvailable(job4.ID, time.Now(), []byte("{}"))))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateErrorRetryable(job5.ID, time.Now(), []byte("{}"))))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateSnoozed(job6.ID, time.Now(), 10)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateSnoozedAvailable(job7.ID, time.Now(), 10)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCancelled(job1.ID, time.Now(), []byte("{}"), nil)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job2.ID, time.Now(), nil)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateDiscarded(job3.ID, time.Now(), []byte("{}"), nil)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateErrorAvailable(job4.ID, time.Now(), []byte("{}"), nil)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateErrorRetryable(job5.ID, time.Now(), []byte("{}"), nil)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateSnoozed(job6.ID, time.Now(), 10, nil)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateSnoozedAvailable(job7.ID, time.Now(), 10, nil)))

completer.Stop()

Expand All @@ -705,7 +705,7 @@ func testCompleter[TCompleter JobCompleter](

job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)})

require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now())))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now(), nil)))

completer.Stop()

Expand All @@ -722,7 +722,7 @@ func testCompleter[TCompleter JobCompleter](
{
job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)})

require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now())))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now(), nil)))

completer.Stop()

Expand All @@ -737,7 +737,7 @@ func testCompleter[TCompleter JobCompleter](

job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)})

require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now())))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now(), nil)))

completer.Stop()

Expand Down Expand Up @@ -777,7 +777,7 @@ func testCompleter[TCompleter JobCompleter](

job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)})

require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now())))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now(), nil)))

completer.Stop()

Expand Down Expand Up @@ -805,7 +805,7 @@ func testCompleter[TCompleter JobCompleter](

job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)})

err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now()))
err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now(), nil))

// The error returned will be nil for asynchronous completers, but
// returned immediately for synchronous ones.
Expand Down Expand Up @@ -838,7 +838,7 @@ func testCompleter[TCompleter JobCompleter](

job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)})

err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now()))
err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now(), nil))

// The error returned will be nil for asynchronous completers, but
// returned immediately for synchronous ones.
Expand Down Expand Up @@ -963,7 +963,7 @@ func benchmarkCompleter(
b.ResetTimer()

for i := range b.N {
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateCompleted(bundle.jobs[i].ID, time.Now()))
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateCompleted(bundle.jobs[i].ID, time.Now(), nil))
require.NoError(b, err)
}

Expand All @@ -978,31 +978,31 @@ func benchmarkCompleter(
for i := range b.N {
switch i % 7 {
case 0:
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateCancelled(bundle.jobs[i].ID, time.Now(), []byte("{}")))
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateCancelled(bundle.jobs[i].ID, time.Now(), []byte("{}"), nil))
require.NoError(b, err)

case 1:
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateCompleted(bundle.jobs[i].ID, time.Now()))
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateCompleted(bundle.jobs[i].ID, time.Now(), nil))
require.NoError(b, err)

case 2:
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateDiscarded(bundle.jobs[i].ID, time.Now(), []byte("{}")))
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateDiscarded(bundle.jobs[i].ID, time.Now(), []byte("{}"), nil))
require.NoError(b, err)

case 3:
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateErrorAvailable(bundle.jobs[i].ID, time.Now(), []byte("{}")))
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateErrorAvailable(bundle.jobs[i].ID, time.Now(), []byte("{}"), nil))
require.NoError(b, err)

case 4:
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateErrorRetryable(bundle.jobs[i].ID, time.Now(), []byte("{}")))
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateErrorRetryable(bundle.jobs[i].ID, time.Now(), []byte("{}"), nil))
require.NoError(b, err)

case 5:
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateSnoozed(bundle.jobs[i].ID, time.Now(), 10))
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateSnoozed(bundle.jobs[i].ID, time.Now(), 10, nil))
require.NoError(b, err)

case 6:
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateSnoozedAvailable(bundle.jobs[i].ID, time.Now(), 10))
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateSnoozedAvailable(bundle.jobs[i].ID, time.Now(), 10, nil))
require.NoError(b, err)

default:
Expand Down Expand Up @@ -1043,7 +1043,7 @@ func doContinuousInsertionInterval(ctx context.Context, t *testing.T, completer

for {
job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)})
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now())))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now(), nil)))
numInserted.Add(1)

select {
Expand Down
Loading

0 comments on commit 0f6987b

Please sign in to comment.