Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recorded job output, allow metadata updates upon job completion #758

Merged
merged 1 commit into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

⚠️ Version 0.18.0 has breaking changes for the `rivertest.Worker` type that was just introduced. While attempting to round out some edge cases with its design, we realized some of them simply couldn't be solved adequately without changing the overall design such that all tested jobs are inserted into the database. Given the short duration since it was released (over a weekend) it's unlikely many users have adopted it and it seemed best to rip off the bandaid to fix it before it gets widely used.

### Added

- Jobs can now store a recorded "output" value, a JSON-encoded payload set by the job during execution and stored in the job's metadata. The `river.RecordOutput` function makes it easy to use the job row to store transient/temporary values that are needed for introspection or for other downstream jobs. The output can be accessed using the `JobRow.Output()` helper method.

This output is stored at the same time as the job is completed following execution, so it does not require additional database calls or overhead. Output can be anything that can be stored in a Postgres JSONB field, though for performance reasons it should be limited in size. [PR #758](https://github.com/riverqueue/river/pull/758).

### Changed

- **Breaking change:** The `rivertest.Worker` type now requires all jobs to be inserted into the database. The original design allowed workers to be tested without hitting the database at all. Ultimately this design made it hard to correctly simulate features like `JobCompleteTx` and the other potential solutions seemed undesirable.
Expand Down
48 changes: 26 additions & 22 deletions internal/jobcompleter/job_completer.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,14 @@ func (c *InlineCompleter) Start(ctx context.Context) error {

func setStateParamsToMany(params *riverdriver.JobSetStateIfRunningParams) *riverdriver.JobSetStateIfRunningManyParams {
return &riverdriver.JobSetStateIfRunningManyParams{
Attempt: []*int{params.Attempt},
ErrData: [][]byte{params.ErrData},
FinalizedAt: []*time.Time{params.FinalizedAt},
ID: []int64{params.ID},
ScheduledAt: []*time.Time{params.ScheduledAt},
SnoozeDoIncrement: []bool{params.SnoozeDoIncrement},
State: []rivertype.JobState{params.State},
Attempt: []*int{params.Attempt},
ErrData: [][]byte{params.ErrData},
FinalizedAt: []*time.Time{params.FinalizedAt},
ID: []int64{params.ID},
MetadataDoMerge: []bool{params.MetadataDoMerge},
MetadataUpdates: [][]byte{params.MetadataUpdates},
ScheduledAt: []*time.Time{params.ScheduledAt},
State: []rivertype.JobState{params.State},
}
}

Expand Down Expand Up @@ -421,22 +422,24 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error {
// it's done this way to allocate as few new slices as necessary.
mapBatch := func(setStateBatch map[int64]*batchCompleterSetState) *riverdriver.JobSetStateIfRunningManyParams {
params := &riverdriver.JobSetStateIfRunningManyParams{
ID: make([]int64, len(setStateBatch)),
Attempt: make([]*int, len(setStateBatch)),
ErrData: make([][]byte, len(setStateBatch)),
FinalizedAt: make([]*time.Time, len(setStateBatch)),
ScheduledAt: make([]*time.Time, len(setStateBatch)),
SnoozeDoIncrement: make([]bool, len(setStateBatch)),
State: make([]rivertype.JobState, len(setStateBatch)),
ID: make([]int64, len(setStateBatch)),
Attempt: make([]*int, len(setStateBatch)),
ErrData: make([][]byte, len(setStateBatch)),
FinalizedAt: make([]*time.Time, len(setStateBatch)),
MetadataDoMerge: make([]bool, len(setStateBatch)),
MetadataUpdates: make([][]byte, len(setStateBatch)),
ScheduledAt: make([]*time.Time, len(setStateBatch)),
State: make([]rivertype.JobState, len(setStateBatch)),
}
var i int
for _, setState := range setStateBatch {
params.ID[i] = setState.Params.ID
params.Attempt[i] = setState.Params.Attempt
params.ErrData[i] = setState.Params.ErrData
params.FinalizedAt[i] = setState.Params.FinalizedAt
params.MetadataDoMerge[i] = setState.Params.MetadataDoMerge
params.MetadataUpdates[i] = setState.Params.MetadataUpdates
params.ScheduledAt[i] = setState.Params.ScheduledAt
params.SnoozeDoIncrement[i] = setState.Params.SnoozeDoIncrement
params.State[i] = setState.Params.State
i++
}
Expand All @@ -458,13 +461,14 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error {
for i := 0; i < len(setStateBatch); i += c.completionMaxSize {
endIndex := min(i+c.completionMaxSize, len(params.ID)) // beginning of next sub-batch or end of slice
subBatch := &riverdriver.JobSetStateIfRunningManyParams{
ID: params.ID[i:endIndex],
Attempt: params.Attempt[i:endIndex],
ErrData: params.ErrData[i:endIndex],
FinalizedAt: params.FinalizedAt[i:endIndex],
ScheduledAt: params.ScheduledAt[i:endIndex],
SnoozeDoIncrement: params.SnoozeDoIncrement[i:endIndex],
State: params.State[i:endIndex],
ID: params.ID[i:endIndex],
Attempt: params.Attempt[i:endIndex],
ErrData: params.ErrData[i:endIndex],
FinalizedAt: params.FinalizedAt[i:endIndex],
MetadataDoMerge: params.MetadataDoMerge[i:endIndex],
MetadataUpdates: params.MetadataUpdates[i:endIndex],
ScheduledAt: params.ScheduledAt[i:endIndex],
State: params.State[i:endIndex],
}
jobRowsSubBatch, err := completeSubBatch(subBatch)
if err != nil {
Expand Down
62 changes: 31 additions & 31 deletions internal/jobcompleter/job_completer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestInlineJobCompleter_Complete(t *testing.T) {
t.Cleanup(completer.Stop)
completer.disableSleep = true

err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(1, time.Now()))
err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(1, time.Now(), nil))
if !errors.Is(err, expectedErr) {
t.Errorf("expected %v, got %v", expectedErr, err)
}
Expand Down Expand Up @@ -167,14 +167,14 @@ func TestAsyncJobCompleter_Complete(t *testing.T) {

// launch 4 completions, only 2 can be inline due to the concurrency limit:
for i := range int64(2) {
if err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(i, time.Now())); err != nil {
if err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(i, time.Now(), nil)); err != nil {
t.Errorf("expected nil err, got %v", err)
}
}
bgCompletionsStarted := make(chan struct{})
go func() {
for i := int64(2); i < 4; i++ {
if err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(i, time.Now())); err != nil {
if err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(i, time.Now(), nil)); err != nil {
t.Errorf("expected nil err, got %v", err)
}
}
Expand Down Expand Up @@ -267,7 +267,7 @@ func testCompleterSubscribe(t *testing.T, constructor func(riverdriver.Executor,
}()

for i := range 4 {
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(int64(i), time.Now())))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(int64(i), time.Now(), nil)))
}

completer.Stop() // closes subscribeChan
Expand Down Expand Up @@ -312,7 +312,7 @@ func testCompleterWait(t *testing.T, constructor func(riverdriver.Executor, Subs
// launch 4 completions:
for i := range 4 {
go func() {
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(int64(i), time.Now())))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(int64(i), time.Now(), nil)))
}()
<-completeStartedCh // wait for func to actually start
}
Expand Down Expand Up @@ -547,9 +547,9 @@ func testCompleter[TCompleter JobCompleter](
job3 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)})
)

require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job1.ID, finalizedAt1)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job2.ID, finalizedAt2)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job3.ID, finalizedAt3)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job1.ID, finalizedAt1, nil)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job2.ID, finalizedAt2, nil)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job3.ID, finalizedAt3, nil)))

completer.Stop()

Expand Down Expand Up @@ -599,7 +599,7 @@ func testCompleter[TCompleter JobCompleter](
t.Cleanup(riverinternaltest.DiscardContinuously(bundle.subscribeCh))

for i := range jobs {
require.NoError(t, completer.JobSetStateIfRunning(ctx, &stats[i], riverdriver.JobSetStateCompleted(jobs[i].ID, time.Now())))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &stats[i], riverdriver.JobSetStateCompleted(jobs[i].ID, time.Now(), nil)))
}

completer.Stop()
Expand Down Expand Up @@ -679,13 +679,13 @@ func testCompleter[TCompleter JobCompleter](
job7 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)})
)

require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCancelled(job1.ID, time.Now(), []byte("{}"))))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job2.ID, time.Now())))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateDiscarded(job3.ID, time.Now(), []byte("{}"))))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateErrorAvailable(job4.ID, time.Now(), []byte("{}"))))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateErrorRetryable(job5.ID, time.Now(), []byte("{}"))))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateSnoozed(job6.ID, time.Now(), 10)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateSnoozedAvailable(job7.ID, time.Now(), 10)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCancelled(job1.ID, time.Now(), []byte("{}"), nil)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job2.ID, time.Now(), nil)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateDiscarded(job3.ID, time.Now(), []byte("{}"), nil)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateErrorAvailable(job4.ID, time.Now(), []byte("{}"), nil)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateErrorRetryable(job5.ID, time.Now(), []byte("{}"), nil)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateSnoozed(job6.ID, time.Now(), 10, nil)))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateSnoozedAvailable(job7.ID, time.Now(), 10, nil)))

completer.Stop()

Expand All @@ -705,7 +705,7 @@ func testCompleter[TCompleter JobCompleter](

job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)})

require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now())))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now(), nil)))

completer.Stop()

Expand All @@ -722,7 +722,7 @@ func testCompleter[TCompleter JobCompleter](
{
job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)})

require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now())))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now(), nil)))

completer.Stop()

Expand All @@ -737,7 +737,7 @@ func testCompleter[TCompleter JobCompleter](

job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)})

require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now())))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now(), nil)))

completer.Stop()

Expand Down Expand Up @@ -777,7 +777,7 @@ func testCompleter[TCompleter JobCompleter](

job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)})

require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now())))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now(), nil)))

completer.Stop()

Expand Down Expand Up @@ -805,7 +805,7 @@ func testCompleter[TCompleter JobCompleter](

job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)})

err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now()))
err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now(), nil))

// The error returned will be nil for asynchronous completers, but
// returned immediately for synchronous ones.
Expand Down Expand Up @@ -838,7 +838,7 @@ func testCompleter[TCompleter JobCompleter](

job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)})

err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now()))
err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now(), nil))

// The error returned will be nil for asynchronous completers, but
// returned immediately for synchronous ones.
Expand Down Expand Up @@ -963,7 +963,7 @@ func benchmarkCompleter(
b.ResetTimer()

for i := range b.N {
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateCompleted(bundle.jobs[i].ID, time.Now()))
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateCompleted(bundle.jobs[i].ID, time.Now(), nil))
require.NoError(b, err)
}

Expand All @@ -978,31 +978,31 @@ func benchmarkCompleter(
for i := range b.N {
switch i % 7 {
case 0:
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateCancelled(bundle.jobs[i].ID, time.Now(), []byte("{}")))
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateCancelled(bundle.jobs[i].ID, time.Now(), []byte("{}"), nil))
require.NoError(b, err)

case 1:
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateCompleted(bundle.jobs[i].ID, time.Now()))
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateCompleted(bundle.jobs[i].ID, time.Now(), nil))
require.NoError(b, err)

case 2:
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateDiscarded(bundle.jobs[i].ID, time.Now(), []byte("{}")))
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateDiscarded(bundle.jobs[i].ID, time.Now(), []byte("{}"), nil))
require.NoError(b, err)

case 3:
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateErrorAvailable(bundle.jobs[i].ID, time.Now(), []byte("{}")))
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateErrorAvailable(bundle.jobs[i].ID, time.Now(), []byte("{}"), nil))
require.NoError(b, err)

case 4:
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateErrorRetryable(bundle.jobs[i].ID, time.Now(), []byte("{}")))
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateErrorRetryable(bundle.jobs[i].ID, time.Now(), []byte("{}"), nil))
require.NoError(b, err)

case 5:
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateSnoozed(bundle.jobs[i].ID, time.Now(), 10))
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateSnoozed(bundle.jobs[i].ID, time.Now(), 10, nil))
require.NoError(b, err)

case 6:
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateSnoozedAvailable(bundle.jobs[i].ID, time.Now(), 10))
err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateSnoozedAvailable(bundle.jobs[i].ID, time.Now(), 10, nil))
require.NoError(b, err)

default:
Expand Down Expand Up @@ -1043,7 +1043,7 @@ func doContinuousInsertionInterval(ctx context.Context, t *testing.T, completer

for {
job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)})
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now())))
require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now(), nil)))
numInserted.Add(1)

select {
Expand Down
Loading
Loading