Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change JobList ergnomics - add filtering by states, return cursor from JobList function #236

Closed
32 changes: 28 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1351,6 +1351,13 @@ func validateQueueName(queueName string) error {
return nil
}

// JobListResult is the result of a job list operation. It contains a list of
// jobs and a cursor for fetching the next page of results.
type JobListResult struct {
Jobs []*rivertype.JobRow
Cursor *JobListCursor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure it makes sense to output a single Cursor field here. There could be a cursor for any of the entries in the Jobs list, so with this name it's not clear if it's the start cursor or end cursor.

I doubt there's much use case for the start cursor, and people can always use the JobListCursorFromJob function to create one if they want. Maybe let's just name this LastCursor to be unambiguous?

}

// JobList returns a paginated list of jobs matching the provided filters. The
// provided context is used for the underlying Postgres query and can be used to
// cancel the operation or apply a timeout.
Expand All @@ -1360,7 +1367,7 @@ func validateQueueName(queueName string) error {
// if err != nil {
// // handle error
// }
func (c *Client[TTx]) JobList(ctx context.Context, params *JobListParams) ([]*rivertype.JobRow, error) {
func (c *Client[TTx]) JobList(ctx context.Context, params *JobListParams) (*JobListResult, error) {
if !c.driver.HasPool() {
return nil, errNoDriverDBPool
}
Expand All @@ -1373,7 +1380,15 @@ func (c *Client[TTx]) JobList(ctx context.Context, params *JobListParams) ([]*ri
return nil, err
}

return dblist.JobList(ctx, c.driver.GetExecutor(), dbParams)
jobs, err := dblist.JobList(ctx, c.driver.GetExecutor(), dbParams)
if err != nil {
return nil, err
}
res := &JobListResult{Jobs: jobs}
if len(jobs) > 0 {
res.Cursor = JobListCursorFromJob(jobs[len(jobs)-1], params.sortField)
}
return res, nil
}

// JobListTx returns a paginated list of jobs matching the provided filters. The
Expand All @@ -1385,14 +1400,23 @@ func (c *Client[TTx]) JobList(ctx context.Context, params *JobListParams) ([]*ri
// if err != nil {
// // handle error
// }
func (c *Client[TTx]) JobListTx(ctx context.Context, tx TTx, params *JobListParams) ([]*rivertype.JobRow, error) {
func (c *Client[TTx]) JobListTx(ctx context.Context, tx TTx, params *JobListParams) (*JobListResult, error) {
if params == nil {
params = NewJobListParams()
}

dbParams, err := params.toDBParams()
if err != nil {
return nil, err
}

return dblist.JobList(ctx, c.driver.UnwrapExecutor(tx), dbParams)
jobs, err := dblist.JobList(ctx, c.driver.UnwrapExecutor(tx), dbParams)
if err != nil {
return nil, err
}
res := &JobListResult{Jobs: jobs}
if len(jobs) > 0 {
res.Cursor = JobListCursorFromJob(jobs[len(jobs)-1], params.sortField)
}
return res, nil
}
101 changes: 55 additions & 46 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,9 +639,9 @@ func Test_Client_Stop(t *testing.T) {

require.NoError(t, client.Stop(ctx))

runningJobs, err := client.JobList(ctx, NewJobListParams().State(rivertype.JobStateRunning))
res, err := client.JobList(ctx, NewJobListParams().States(JobStateRunning))
require.NoError(t, err)
require.Empty(t, runningJobs, "expected no jobs to be left running")
require.Empty(t, res.Jobs, "expected no jobs to be left running")
})

t.Run("WithSubscriber", func(t *testing.T) {
Expand Down Expand Up @@ -1386,14 +1386,14 @@ func Test_Client_JobList(t *testing.T) {
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr("test_kind_1")})
job3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Kind: ptrutil.Ptr("test_kind_2")})

jobs, err := client.JobList(ctx, NewJobListParams().Kinds("test_kind_1"))
res, err := client.JobList(ctx, NewJobListParams().Kinds("test_kind_1"))
require.NoError(t, err)
// jobs ordered by ScheduledAt ASC by default
require.Equal(t, []int64{job1.ID, job2.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, []int64{job1.ID, job2.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))

jobs, err = client.JobList(ctx, NewJobListParams().Kinds("test_kind_2"))
res, err = client.JobList(ctx, NewJobListParams().Kinds("test_kind_2"))
require.NoError(t, err)
require.Equal(t, []int64{job3.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, []int64{job3.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
})

t.Run("FiltersByQueue", func(t *testing.T) { //nolint:dupl
Expand All @@ -1405,14 +1405,14 @@ func Test_Client_JobList(t *testing.T) {
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue_1")})
job3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Queue: ptrutil.Ptr("queue_2")})

jobs, err := client.JobList(ctx, NewJobListParams().Queues("queue_1"))
res, err := client.JobList(ctx, NewJobListParams().Queues("queue_1"))
require.NoError(t, err)
// jobs ordered by ScheduledAt ASC by default
require.Equal(t, []int64{job1.ID, job2.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, []int64{job1.ID, job2.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))

jobs, err = client.JobList(ctx, NewJobListParams().Queues("queue_2"))
res, err = client.JobList(ctx, NewJobListParams().Queues("queue_2"))
require.NoError(t, err)
require.Equal(t, []int64{job3.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, []int64{job3.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
})

t.Run("FiltersByState", func(t *testing.T) {
Expand All @@ -1424,14 +1424,14 @@ func Test_Client_JobList(t *testing.T) {
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateAvailable)})
job3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)})

jobs, err := client.JobList(ctx, NewJobListParams().State(JobStateAvailable))
res, err := client.JobList(ctx, NewJobListParams().States(JobStateAvailable))
require.NoError(t, err)
// jobs ordered by ScheduledAt ASC by default
require.Equal(t, []int64{job1.ID, job2.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, []int64{job1.ID, job2.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))

jobs, err = client.JobList(ctx, NewJobListParams().State(JobStateRunning))
res, err = client.JobList(ctx, NewJobListParams().States(JobStateRunning))
require.NoError(t, err)
require.Equal(t, []int64{job3.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, []int64{job3.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
})

t.Run("SortsAvailableRetryableAndScheduledJobsByScheduledAt", func(t *testing.T) {
Expand All @@ -1450,13 +1450,13 @@ func Test_Client_JobList(t *testing.T) {
job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(dbState), ScheduledAt: &now})
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(dbState), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))})

jobs, err := client.JobList(ctx, NewJobListParams().State(state))
res, err := client.JobList(ctx, NewJobListParams().States(state))
require.NoError(t, err)
require.Equal(t, []int64{job2.ID, job1.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, []int64{job2.ID, job1.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))

jobs, err = client.JobList(ctx, NewJobListParams().State(state).OrderBy(JobListOrderByTime, SortOrderDesc))
res, err = client.JobList(ctx, NewJobListParams().States(state).OrderBy(JobListOrderByTime, SortOrderDesc))
require.NoError(t, err)
require.Equal(t, []int64{job1.ID, job2.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, []int64{job1.ID, job2.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
}
})

Expand All @@ -1476,13 +1476,13 @@ func Test_Client_JobList(t *testing.T) {
job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(dbState), FinalizedAt: ptrutil.Ptr(now.Add(-10 * time.Second))})
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(dbState), FinalizedAt: ptrutil.Ptr(now.Add(-15 * time.Second))})

jobs, err := client.JobList(ctx, NewJobListParams().State(state))
res, err := client.JobList(ctx, NewJobListParams().States(state))
require.NoError(t, err)
require.Equal(t, []int64{job2.ID, job1.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, []int64{job2.ID, job1.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))

jobs, err = client.JobList(ctx, NewJobListParams().State(state).OrderBy(JobListOrderByTime, SortOrderDesc))
res, err = client.JobList(ctx, NewJobListParams().States(state).OrderBy(JobListOrderByTime, SortOrderDesc))
require.NoError(t, err)
require.Equal(t, []int64{job1.ID, job2.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, []int64{job1.ID, job2.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
}
})

Expand All @@ -1495,30 +1495,30 @@ func Test_Client_JobList(t *testing.T) {
job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: &now})
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(now.Add(-5 * time.Second))})

jobs, err := client.JobList(ctx, NewJobListParams().State(JobStateRunning))
res, err := client.JobList(ctx, NewJobListParams().States(JobStateRunning))
require.NoError(t, err)
require.Equal(t, []int64{job2.ID, job1.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, []int64{job2.ID, job1.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))

jobs, err = client.JobList(ctx, NewJobListParams().State(JobStateRunning).OrderBy(JobListOrderByTime, SortOrderDesc))
res, err = client.JobList(ctx, NewJobListParams().States(JobStateRunning).OrderBy(JobListOrderByTime, SortOrderDesc))
require.NoError(t, err)
// Sort order was explicitly reversed:
require.Equal(t, []int64{job1.ID, job2.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, []int64{job1.ID, job2.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
})

t.Run("WithNilParamsFiltersToAvailableByDefault", func(t *testing.T) {
t.Run("WithNilParamsFiltersToAllStatesByDefault", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)

now := time.Now().UTC()
job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateAvailable), ScheduledAt: &now})
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateAvailable), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))})
_ = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)})
job3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning), ScheduledAt: ptrutil.Ptr(now.Add(-2 * time.Second))})

jobs, err := client.JobList(ctx, nil)
res, err := client.JobList(ctx, nil)
require.NoError(t, err)
// sort order is switched by ScheduledAt values:
require.Equal(t, []int64{job2.ID, job1.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, []int64{job2.ID, job3.ID, job1.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
})

t.Run("PaginatesWithAfter", func(t *testing.T) {
Expand All @@ -1529,22 +1529,31 @@ func Test_Client_JobList(t *testing.T) {
now := time.Now().UTC()
job1 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateAvailable), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second))})
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateAvailable), ScheduledAt: &now})
job3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: ptrutil.Ptr(now.Add(-5 * time.Second))})
job4 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning), AttemptedAt: &now})
job5 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: ptrutil.Ptr(now.Add(-5 * time.Second))})
job6 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), FinalizedAt: &now})
job3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning), ScheduledAt: ptrutil.Ptr(now.Add(-5 * time.Second)), AttemptedAt: ptrutil.Ptr(now.Add(-5 * time.Second))})
job4 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning), ScheduledAt: ptrutil.Ptr(now.Add(-6 * time.Second)), AttemptedAt: &now})
job5 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), ScheduledAt: ptrutil.Ptr(now.Add(-7 * time.Second)), FinalizedAt: ptrutil.Ptr(now.Add(-5 * time.Second))})
job6 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateCompleted), ScheduledAt: ptrutil.Ptr(now.Add(-7 * time.Second)), FinalizedAt: &now})

jobs, err := client.JobList(ctx, NewJobListParams().After(JobListCursorFromJob(job1)))
res, err := client.JobList(ctx, NewJobListParams().States(rivertype.JobStateAvailable).After(JobListCursorFromJob(job1, JobListOrderByTime)))
require.NoError(t, err)
require.Equal(t, []int64{job2.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, []int64{job2.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, job2.ID, res.Cursor.id)

jobs, err = client.JobList(ctx, NewJobListParams().State(rivertype.JobStateRunning).After(JobListCursorFromJob(job3)))
res, err = client.JobList(ctx, NewJobListParams().States(rivertype.JobStateRunning).After(JobListCursorFromJob(job3, JobListOrderByTime)))
require.NoError(t, err)
require.Equal(t, []int64{job4.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, []int64{job4.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, job4.ID, res.Cursor.id)

jobs, err = client.JobList(ctx, NewJobListParams().State(rivertype.JobStateCompleted).After(JobListCursorFromJob(job5)))
res, err = client.JobList(ctx, NewJobListParams().States(rivertype.JobStateCompleted).After(JobListCursorFromJob(job5, JobListOrderByTime)))
require.NoError(t, err)
require.Equal(t, []int64{job6.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, []int64{job6.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, job6.ID, res.Cursor.id)

res, err = client.JobList(ctx, NewJobListParams().OrderBy(JobListOrderByScheduledAt, SortOrderAsc).After(JobListCursorFromJob(job4, JobListOrderByScheduledAt)))
require.NoError(t, err)
require.Equal(t, []int64{job1.ID, job3.ID, job2.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, JobListOrderByScheduledAt, res.Cursor.sortField)
require.Equal(t, job2.ID, res.Cursor.id)
})

t.Run("MetadataOnly", func(t *testing.T) {
Expand All @@ -1556,14 +1565,14 @@ func Test_Client_JobList(t *testing.T) {
job2 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Metadata: []byte(`{"baz": "value"}`)})
job3 := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{Metadata: []byte(`{"baz": "value"}`)})

jobs, err := client.JobList(ctx, NewJobListParams().State("").Metadata(`{"foo": "bar"}`))
res, err := client.JobList(ctx, NewJobListParams().Metadata(`{"foo": "bar"}`))
require.NoError(t, err)
require.Equal(t, []int64{job1.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, []int64{job1.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))

jobs, err = client.JobList(ctx, NewJobListParams().State("").Metadata(`{"baz": "value"}`).OrderBy(JobListOrderByTime, SortOrderDesc))
res, err = client.JobList(ctx, NewJobListParams().Metadata(`{"baz": "value"}`).OrderBy(JobListOrderByTime, SortOrderDesc))
require.NoError(t, err)
// Sort order was explicitly reversed:
require.Equal(t, []int64{job3.ID, job2.ID}, sliceutil.Map(jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
require.Equal(t, []int64{job3.ID, job2.ID}, sliceutil.Map(res.Jobs, func(job *rivertype.JobRow) int64 { return job.ID }))
})

t.Run("WithCancelledContext", func(t *testing.T) {
Expand All @@ -1574,9 +1583,9 @@ func Test_Client_JobList(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
cancel() // cancel immediately

jobs, err := client.JobList(ctx, NewJobListParams().State(JobStateRunning))
res, err := client.JobList(ctx, NewJobListParams().States(JobStateRunning))
require.ErrorIs(t, context.Canceled, err)
require.Empty(t, jobs)
require.Nil(t, res)
})
}

Expand Down
10 changes: 6 additions & 4 deletions internal/dblist/db_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"strings"

"github.com/lib/pq"

"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivertype"
)
Expand Down Expand Up @@ -42,7 +44,7 @@ type JobListParams struct {
OrderBy []JobListOrderBy
Priorities []int16
Queues []string
State rivertype.JobState
States []rivertype.JobState
}

func JobList(ctx context.Context, exec riverdriver.Executor, params *JobListParams) ([]*rivertype.JobRow, error) {
Expand Down Expand Up @@ -81,10 +83,10 @@ func JobList(ctx context.Context, exec riverdriver.Executor, params *JobListPara
namedArgs["queues"] = params.Queues
}

if params.State != "" {
if len(params.States) > 0 {
writeWhereOrAnd()
conditionsBuilder.WriteString("state = @state::river_job_state")
namedArgs["state"] = params.State
conditionsBuilder.WriteString("state = any(@states::river_job_state[])")
namedArgs["states"] = pq.Array(params.States)
}

if params.Conditions != "" {
Expand Down
Loading
Loading