Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recorded job output, allow metadata updates upon job completion #758

Merged
merged 1 commit into from
Feb 20, 2025

Conversation

bgentry
Copy link
Contributor

@bgentry bgentry commented Feb 15, 2025

This adds a feature we've had several requests for: the ability to record some output from a job that can be stashed alongside the job itself, rather than needing to create your own separate table for it. (Ex #500) This will be particularly useful for workflows where it's common to have a task that depends on something an earlier task did, and in these cases there's not always another existing table where it's convenient to store that result (such as an upload URL) for the next task to make use of.

This PR implements this functionality via a river.RecordOutput helper. The helper takes an any as input and serializes it to JSON in the "output" metadata key on the job row. There are two possible mechanisms explored in this PR's commits as of now:

  1. Stash the pending metadata changes into an unexported field on rivertype.JobRow, along with helpers to set and retrieve these values (since they're in another package from the executor).
  2. Stash the pending metadata changes in the context so they can be retrieved following the job's execution (or in JobCompleteTx if used).

Each of these has tradeoffs, with (1) adding extra methods to JobRow that aren't usable outside of an executing job, and (2) relying on stuffing data in the context (which again won't do anything useful outside an executing job). Interested to hear your thoughts on these @brandur, I will save mine for after you form your own opinions 😄 Easy to flip it back to option (1) as I had in earlier commits, or stick with (2) if preferable.

Additionally, this output feature is implemented using a lower level feature that enables any metadata key & value to be "put" / overwritten as part of the job's completion. For now, I've leveraged this feature to replace the specialized snooze increment argument, as well as for recorded output. It could also potentially be used in middleware to i.e. record job timing details into a metadata key, but without requiring additional SQL updates.

The completion queries were updated to facilitate this, particularly by making sure this metadata merge process still happens even if the job is somehow already completed. This would cover an edge case where a middleware puts a metadata key after the job has finished executing and completing itself via JobCompleteTx; information such as the timing example above would still need to be saved with the job post-execution, even though it'd happen via a separate SQL UPDATE.

@bgentry bgentry requested a review from brandur February 15, 2025 05:18
@bgentry bgentry force-pushed the bg-meta-updates-upon-completion branch 4 times, most recently from 0938475 to ccd9a2e Compare February 15, 2025 20:27
if !gjson.GetBytes(j.Metadata, MetadataKeyOutput).Exists() {
return nil
}
return []byte(gjson.GetBytes(j.Metadata, MetadataKeyOutput).Raw)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're repeating the same work here twice by invoking into GetBytes().

Copy link
Contributor

@brandur brandur Feb 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% sure about this, but figured I'd call it out.

IMO, GJSON's APIs are very wrong — it doesn't return an error for covenience's sake, but there's no way it can guarantee that the bytes it's looking at are valid JSON. It optimizes for some user convenience at the cost of behaving degenerately under any edge condition encountered (not sure what it does, but it'd have to be either (1) panic, or (2) return the wrong thing).

By using JSON, we're bubbling up that API. I think in our case it's a little more defensible because we have a strong guarantee of well formed JSON because Postgres will guarantee it at the row level, but you can still imagine edge cases like if someone manually initialize a JobRow with bad JSON by accident, invokes Output and can't figure out why they're getting the totally wrong thing.

So not sure which way I'd go, but there's a possibility the return type of this function should fit the rest of what you see in Go and be a ([]byte, error).

Larger convo, but IMO we should remove GJSON/SJSON back out of the project. They give you a little syntatic sugar, but at the cost of two four more dependencies, poor hygiene around perf (e.g. the doubled up work we see on this line), and poor hygiene around considering edges (i.e. degenerate cases and error conditions).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's easy enough to remove the usage here, though the options have tradeoffs:

  1. Use a struct and duplicate the output key name in a struct tag
  2. Decode and allocate memory for all the other keys and values I don't care about, only to throw them away
  3. Use lower level json.Decoder primitives to extract only what I need, with a lot more code than I have now.

Given your desire to nuke this dependency I'll choose (1) for now. Maybe encoding/json/v2 will make this easy to do without these drawbacks.

Copy link
Contributor

@brandur brandur left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome dude! Looking good to me.

"github.com/riverqueue/river/rivertype"
)

func RecordOutput(ctx context.Context, output any) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets get a docstring on this one. I'm thinking generally:

  • What it is/what it does.
  • What "output" is and what it's used for.
  • Perhaps some edge ramifications. i.e. Explain that output gets stored in the job row, so there may be the usual Postgres problems if the amount of output stored is overly large.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great comment. Thanks.

if !gjson.GetBytes(j.Metadata, MetadataKeyOutput).Exists() {
return nil
}
return []byte(gjson.GetBytes(j.Metadata, MetadataKeyOutput).Raw)
Copy link
Contributor

@brandur brandur Feb 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% sure about this, but figured I'd call it out.

IMO, GJSON's APIs are very wrong — it doesn't return an error for covenience's sake, but there's no way it can guarantee that the bytes it's looking at are valid JSON. It optimizes for some user convenience at the cost of behaving degenerately under any edge condition encountered (not sure what it does, but it'd have to be either (1) panic, or (2) return the wrong thing).

By using JSON, we're bubbling up that API. I think in our case it's a little more defensible because we have a strong guarantee of well formed JSON because Postgres will guarantee it at the row level, but you can still imagine edge cases like if someone manually initialize a JobRow with bad JSON by accident, invokes Output and can't figure out why they're getting the totally wrong thing.

So not sure which way I'd go, but there's a possibility the return type of this function should fit the rest of what you see in Go and be a ([]byte, error).

Larger convo, but IMO we should remove GJSON/SJSON back out of the project. They give you a little syntatic sugar, but at the cost of two four more dependencies, poor hygiene around perf (e.g. the doubled up work we see on this line), and poor hygiene around considering edges (i.e. degenerate cases and error conditions).

@bgentry bgentry force-pushed the bg-meta-updates-upon-completion branch from ccd9a2e to e062ff7 Compare February 20, 2025 03:17
@bgentry bgentry marked this pull request as ready for review February 20, 2025 03:18
Leverage this to update the snooze count in metadata without requiring
sqlc param specifically for that purpose. This is more generalizable and
can be used to solve other problems too.
@bgentry bgentry force-pushed the bg-meta-updates-upon-completion branch from e062ff7 to 51e31b4 Compare February 20, 2025 03:25
@bgentry bgentry requested a review from brandur February 20, 2025 04:37
Copy link
Contributor

@brandur brandur left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good stuff man! Ty.

@bgentry bgentry merged commit 0f6987b into master Feb 20, 2025
10 checks passed
@bgentry bgentry deleted the bg-meta-updates-upon-completion branch February 20, 2025 15:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants