Skip to content

Commit

Permalink
Add capacity to list jobs by ID + make default (#307)
Browse files Browse the repository at this point in the history
Here, in addition to the new job list sort orders added by #304, add one
more for sorting by job ID, and make it the default.

This is another breaking change in the job list API, and I wouldn't
normally make it at this point, but our next release will contain a
number of breaking changes, including to the job list API, so the time
is now.

My rationale for the change:

* Listing and paginating by ID is an overwhelmingly common pattern in
  web and database APIs, and I think it being the default would be more
  intuitive for more people.

* Ordering by ID is more ergonomic because no `JobListParams.States`
  invocation needs to made. It's shorter, and especially when a fairly
  normal use case will be to iterate across all job rows, this is a
  minor nicety.

* Ordering by ID allows the entire job collection to be iterated
  regardless of job state. `JobListOrderByTime` requires a state, and
  some order is needed for cursors to work.

* The behavior of `JobListOrderByTime` is a little odd in that it
  changes dynamically based on the requested list states, and there's no
  way to intuit what the order will be without knowing a lot about River
  internals and thinking very carefully about it. Furthermore, the time
  that'd be chosen wasn't documented anywhere, so the only way to know
  for sure what it would be was to read River's source code.

* With the inclusion of #304, `JobListOrderByTime`'s behavior has gotten
  even a little more surprising because the state to be chosen to select
  a timestamp was the _first_ value sent to `JobListParams.States`, with
  any additional values sent ignored, also creating a somewhat nonsensical
  result (e.g. `States(running, available)` would select `attempted_at`,
  but would be `NULL` for any jobs in the `available` state). This
  behavior was not documented.

I also found a bug that was a hold over from #304 as more than one sort
order became available. The function `JobListCursorFromJob` took a sort
order, but would produce the wrong result unless the user remembered to
set the exact same sort order on their job list parameters. For example,
this would do the wrong thing:

    res, err = client.JobList(ctx, NewJobListParams().After(JobListCursorFromJob(job4, JobListOrderByScheduledAt)))

`JobListCursorFromJob` would extract `scheduled_at` from `job4`, but
then list using to the default job list order. Previously that was based
on time, so this result would've been wrong _unless_ the job list
parameters filtered to state `available`, `retryable`, or `scheduled` so
that `scheduled_at` was also used when comparing to other jobs.

A caller could compensate by specifying sort order in both places, but
this is pretty ugly, and there was no check to make sure that the list
paramaters and cursor shared the same order:

    res, err = client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByScheduledAt, SortOrderAsc).After(JobListCursorFromJob(job4, JobListOrderByScheduledAt)))

The corrected version of this doesn't use an order when initializing the
cursor, instead using the one from the job list params, meaning that the
same time field is always used between list query and what's extracted
from the cursor's job row.

    res, err = client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByScheduledAt, SortOrderAsc).After(JobListCursorFromJob(job4)))

This also makes the invocation shorter and more ergonomic to call.

Along with the above, we also make a few tweaks to documentation.
`JobListOrderByTime` now documents which timestamps it uses, and
indicates that only the first value sent to `JobListParams.States` will
be respected.
  • Loading branch information
brandur authored Apr 21, 2024
1 parent 6fc6d8e commit e791aef
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 64 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- **Breaking change:** JobList/JobListTx now support querying Jobs by a list of Job Kinds and States. Also allows for filtering by specific timestamp values. Thank you Jos Kraaijeveld (@thatjos)! 🙏🏻 [PR #236](https://github.com/riverqueue/river/pull/236).
- **Breaking change:** There are a number of small breaking changes in the job list API using `JobList`/`JobListTx`:
- Now support querying jobs by a list of Job Kinds and States. Also allows for filtering by specific timestamp values. Thank you Jos Kraaijeveld (@thatjos)! 🙏🏻 [PR #236](https://github.com/riverqueue/river/pull/236).
- Job listing now defaults to ordering by job ID (`JobListOrderByID`) instead of a job timestamp dependent on on requested job state. The previous ordering behavior is still available with `NewJobListParams().OrderBy(JobListOrderByTime, SortOrderAsc)`. [PR #XXX](https://github.com/riverqueue/river/pull/XXX).
- The function `JobListCursorFromJob` no longer needs a sort order parameter. Instead, sort order is determined based on the job list parameters that the cursor is subsequently used with. [PR #XXX](https://github.com/riverqueue/river/pull/XXX).
- **Breaking change:** Client `Insert` and `InsertTx` functions now return a `JobInsertResult` struct instead of a `JobRow`. This allows the result to include metadata like the new `UniqueSkippedAsDuplicate` property, so callers can tell whether an inserted job was skipped due to unique constraint. [PR #292](https://github.com/riverqueue/river/pull/292).
- **Breaking change:** Client `InsertMany` and `InsertManyTx` now return number of jobs inserted as `int` instead of `int64`. This change was made to make the type in use a little more idiomatic. [PR #293](https://github.com/riverqueue/river/pull/293).
- **Breaking change:** `river.JobState*` type aliases have been removed. All job state constants should be accessed through `rivertype.JobState*` instead. [PR #300](https://github.com/riverqueue/river/pull/300).
Expand Down
4 changes: 2 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1489,7 +1489,7 @@ func (c *Client[TTx]) JobList(ctx context.Context, params *JobListParams) (*JobL
}
res := &JobListResult{Jobs: jobs}
if len(jobs) > 0 {
res.LastCursor = JobListCursorFromJob(jobs[len(jobs)-1], params.sortField)
res.LastCursor = jobListCursorFromJobAndParams(jobs[len(jobs)-1], params)
}
return res, nil
}
Expand Down Expand Up @@ -1519,7 +1519,7 @@ func (c *Client[TTx]) JobListTx(ctx context.Context, tx TTx, params *JobListPara
}
res := &JobListResult{Jobs: jobs}
if len(jobs) > 0 {
res.LastCursor = JobListCursorFromJob(jobs[len(jobs)-1], params.sortField)
res.LastCursor = jobListCursorFromJobAndParams(jobs[len(jobs)-1], params)
}
return res, nil
}
Expand Down
109 changes: 91 additions & 18 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1500,7 +1500,24 @@ func Test_Client_JobList(t *testing.T) {
require.Equal(t, []int64{job3.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
})

t.Run("SortsAvailableRetryableAndScheduledJobsByScheduledAt", func(t *testing.T) {
t.Run("DefaultsToOrderingByID", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)

job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{})
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{})

res, err := client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByTime, SortOrderAsc))
require.NoError(t, err)
require.Equal(t, []int64{job1.ID, job2.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))

res, err = client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByTime, SortOrderDesc))
require.NoError(t, err)
require.Equal(t, []int64{job2.ID, job1.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
})

t.Run("OrderByTimeSortsAvailableRetryableAndScheduledJobsByScheduledAt", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)
Expand All @@ -1516,7 +1533,7 @@ func Test_Client_JobList(t *testing.T) {
job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(state), ScheduledAt: &now})
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(state), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))})

res, err := client.JobList(ctx, NewJobListParams().States(state))
res, err := client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByTime, SortOrderAsc).States(state))
require.NoError(t, err)
require.Equal(t, []int64{job2.ID, job1.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))

Expand All @@ -1526,7 +1543,7 @@ func Test_Client_JobList(t *testing.T) {
}
})

t.Run("SortsCancelledCompletedAndDiscardedJobsByFinalizedAt", func(t *testing.T) {
t.Run("OrderByTimeSortsCancelledCompletedAndDiscardedJobsByFinalizedAt", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)
Expand All @@ -1542,7 +1559,7 @@ func Test_Client_JobList(t *testing.T) {
job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(state), FinalizedAt: ptrutil.Ptr(now.Add(-10 * time.Second))})
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(state), FinalizedAt: ptrutil.Ptr(now.Add(-15 * time.Second))})

res, err := client.JobList(ctx, NewJobListParams().States(state))
res, err := client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByTime, SortOrderAsc).States(state))
require.NoError(t, err)
require.Equal(t, []int64{job2.ID, job1.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))

Expand All @@ -1552,7 +1569,7 @@ func Test_Client_JobList(t *testing.T) {
}
})

t.Run("SortsRunningJobsByAttemptedAt", func(t *testing.T) {
t.Run("OrderByTimeSortsRunningJobsByAttemptedAt", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)
Expand All @@ -1561,7 +1578,7 @@ func Test_Client_JobList(t *testing.T) {
job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: &now})
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(now.Add(-5 * time.Second))})

res, err := client.JobList(ctx, NewJobListParams().States(rivertype.JobStateRunning))
res, err := client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByTime, SortOrderAsc).States(rivertype.JobStateRunning))
require.NoError(t, err)
require.Equal(t, []int64{job2.ID, job1.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))

Expand All @@ -1583,11 +1600,70 @@ func Test_Client_JobList(t *testing.T) {

res, err := client.JobList(ctx, nil)
require.NoError(t, err)
// sort order is switched by ScheduledAt values:
require.Equal(t, []int64{job2.ID, job3.ID, job1.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
// sort order defaults to ID
require.Equal(t, []int64{job1.ID, job2.ID, job3.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
})

t.Run("PaginatesWithAfter_JobListOrderByID", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)

job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{})
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{})
job3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{})

res, err := client.JobList(ctx, NewJobListParams().After(JobListCursorFromJob(job1)))
require.NoError(t, err)
require.Equal(t, []int64{job2.ID, job3.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, JobListOrderByID, res.LastCursor.sortField)
require.Equal(t, job3.ID, res.LastCursor.id)

// No more results
res, err = client.JobList(ctx, NewJobListParams().After(JobListCursorFromJob(job3)))
require.NoError(t, err)
require.Equal(t, []int64{}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Nil(t, res.LastCursor)

// Descending
res, err = client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByID, SortOrderDesc).After(JobListCursorFromJob(job3)))
require.NoError(t, err)
require.Equal(t, []int64{job2.ID, job1.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, JobListOrderByID, res.LastCursor.sortField)
require.Equal(t, job1.ID, res.LastCursor.id)
})

t.Run("PaginatesWithAfter", func(t *testing.T) {
t.Run("PaginatesWithAfter_JobListOrderByScheduledAt", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)

now := time.Now().UTC()
job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{ScheduledAt: &now})
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{ScheduledAt: ptrutil.Ptr(now.Add(1 * time.Second))})
job3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{ScheduledAt: ptrutil.Ptr(now.Add(2 * time.Second))})

res, err := client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByScheduledAt, SortOrderAsc).After(JobListCursorFromJob(job1)))
require.NoError(t, err)
require.Equal(t, []int64{job2.ID, job3.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, JobListOrderByScheduledAt, res.LastCursor.sortField)
require.Equal(t, job3.ID, res.LastCursor.id)

// No more results
res, err = client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByScheduledAt, SortOrderAsc).After(JobListCursorFromJob(job3)))
require.NoError(t, err)
require.Equal(t, []int64{}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Nil(t, res.LastCursor)

// Descending
res, err = client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByScheduledAt, SortOrderDesc).After(JobListCursorFromJob(job3)))
require.NoError(t, err)
require.Equal(t, []int64{job2.ID, job1.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, JobListOrderByScheduledAt, res.LastCursor.sortField)
require.Equal(t, job1.ID, res.LastCursor.id)
})

t.Run("PaginatesWithAfter_JobListOrderByTime", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)
Expand All @@ -1600,26 +1676,23 @@ func Test_Client_JobList(t *testing.T) {
job5 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), ScheduledAt: ptrutil.Ptr(now.Add(-7 * time.Second)), FinalizedAt: ptrutil.Ptr(now.Add(-5 * time.Second))})
job6 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), ScheduledAt: ptrutil.Ptr(now.Add(-7 * time.Second)), FinalizedAt: &now})

res, err := client.JobList(ctx, NewJobListParams().States(rivertype.JobStateAvailable).After(JobListCursorFromJob(job1, JobListOrderByTime)))
res, err := client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByTime, SortOrderAsc).States(rivertype.JobStateAvailable).After(JobListCursorFromJob(job1)))
require.NoError(t, err)
require.Equal(t, []int64{job2.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, JobListOrderByTime, res.LastCursor.sortField)
require.Equal(t, job2.ID, res.LastCursor.id)

res, err = client.JobList(ctx, NewJobListParams().States(rivertype.JobStateRunning).After(JobListCursorFromJob(job3, JobListOrderByTime)))
res, err = client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByTime, SortOrderAsc).States(rivertype.JobStateRunning).After(JobListCursorFromJob(job3)))
require.NoError(t, err)
require.Equal(t, []int64{job4.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, JobListOrderByTime, res.LastCursor.sortField)
require.Equal(t, job4.ID, res.LastCursor.id)

res, err = client.JobList(ctx, NewJobListParams().States(rivertype.JobStateCompleted).After(JobListCursorFromJob(job5, JobListOrderByTime)))
res, err = client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByTime, SortOrderAsc).States(rivertype.JobStateCompleted).After(JobListCursorFromJob(job5)))
require.NoError(t, err)
require.Equal(t, []int64{job6.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, JobListOrderByTime, res.LastCursor.sortField)
require.Equal(t, job6.ID, res.LastCursor.id)

res, err = client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByScheduledAt, SortOrderAsc).After(JobListCursorFromJob(job4, JobListOrderByScheduledAt)))
require.NoError(t, err)
require.Equal(t, []int64{job1.ID, job3.ID, job2.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, JobListOrderByScheduledAt, res.LastCursor.sortField)
require.Equal(t, job2.ID, res.LastCursor.id)
})

t.Run("MetadataOnly", func(t *testing.T) {
Expand Down
Loading

0 comments on commit e791aef

Please sign in to comment.