-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Recorded job output, allow metadata updates upon job completion #758
Conversation
0938475
to
ccd9a2e
Compare
rivertype/river_type.go
Outdated
if !gjson.GetBytes(j.Metadata, MetadataKeyOutput).Exists() { | ||
return nil | ||
} | ||
return []byte(gjson.GetBytes(j.Metadata, MetadataKeyOutput).Raw) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're repeating the same work here twice by invoking into GetBytes()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not 100% sure about this, but figured I'd call it out.
IMO, GJSON's APIs are very wrong — it doesn't return an error for covenience's sake, but there's no way it can guarantee that the bytes it's looking at are valid JSON. It optimizes for some user convenience at the cost of behaving degenerately under any edge condition encountered (not sure what it does, but it'd have to be either (1) panic, or (2) return the wrong thing).
By using JSON, we're bubbling up that API. I think in our case it's a little more defensible because we have a strong guarantee of well formed JSON because Postgres will guarantee it at the row level, but you can still imagine edge cases like if someone manually initialize a JobRow
with bad JSON by accident, invokes Output
and can't figure out why they're getting the totally wrong thing.
So not sure which way I'd go, but there's a possibility the return type of this function should fit the rest of what you see in Go and be a ([]byte, error)
.
Larger convo, but IMO we should remove GJSON/SJSON back out of the project. They give you a little syntatic sugar, but at the cost of two four more dependencies, poor hygiene around perf (e.g. the doubled up work we see on this line), and poor hygiene around considering edges (i.e. degenerate cases and error conditions).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's easy enough to remove the usage here, though the options have tradeoffs:
- Use a struct and duplicate the output key name in a struct tag
- Decode and allocate memory for all the other keys and values I don't care about, only to throw them away
- Use lower level
json.Decoder
primitives to extract only what I need, with a lot more code than I have now.
Given your desire to nuke this dependency I'll choose (1) for now. Maybe encoding/json/v2
will make this easy to do without these drawbacks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome dude! Looking good to me.
"github.com/riverqueue/river/rivertype" | ||
) | ||
|
||
func RecordOutput(ctx context.Context, output any) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets get a docstring on this one. I'm thinking generally:
- What it is/what it does.
- What "output" is and what it's used for.
- Perhaps some edge ramifications. i.e. Explain that output gets stored in the job row, so there may be the usual Postgres problems if the amount of output stored is overly large.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great comment. Thanks.
rivertype/river_type.go
Outdated
if !gjson.GetBytes(j.Metadata, MetadataKeyOutput).Exists() { | ||
return nil | ||
} | ||
return []byte(gjson.GetBytes(j.Metadata, MetadataKeyOutput).Raw) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not 100% sure about this, but figured I'd call it out.
IMO, GJSON's APIs are very wrong — it doesn't return an error for covenience's sake, but there's no way it can guarantee that the bytes it's looking at are valid JSON. It optimizes for some user convenience at the cost of behaving degenerately under any edge condition encountered (not sure what it does, but it'd have to be either (1) panic, or (2) return the wrong thing).
By using JSON, we're bubbling up that API. I think in our case it's a little more defensible because we have a strong guarantee of well formed JSON because Postgres will guarantee it at the row level, but you can still imagine edge cases like if someone manually initialize a JobRow
with bad JSON by accident, invokes Output
and can't figure out why they're getting the totally wrong thing.
So not sure which way I'd go, but there's a possibility the return type of this function should fit the rest of what you see in Go and be a ([]byte, error)
.
Larger convo, but IMO we should remove GJSON/SJSON back out of the project. They give you a little syntatic sugar, but at the cost of two four more dependencies, poor hygiene around perf (e.g. the doubled up work we see on this line), and poor hygiene around considering edges (i.e. degenerate cases and error conditions).
ccd9a2e
to
e062ff7
Compare
Leverage this to update the snooze count in metadata without requiring sqlc param specifically for that purpose. This is more generalizable and can be used to solve other problems too.
e062ff7
to
51e31b4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good stuff man! Ty.
This adds a feature we've had several requests for: the ability to record some output from a job that can be stashed alongside the job itself, rather than needing to create your own separate table for it. (Ex #500) This will be particularly useful for workflows where it's common to have a task that depends on something an earlier task did, and in these cases there's not always another existing table where it's convenient to store that result (such as an upload URL) for the next task to make use of.
This PR implements this functionality via a
river.RecordOutput
helper. The helper takes anany
as input and serializes it to JSON in the"output"
metadata key on the job row. There are two possible mechanisms explored in this PR's commits as of now:rivertype.JobRow
, along with helpers to set and retrieve these values (since they're in another package from the executor).JobCompleteTx
if used).Each of these has tradeoffs, with (1) adding extra methods to
JobRow
that aren't usable outside of an executing job, and (2) relying on stuffing data in the context (which again won't do anything useful outside an executing job). Interested to hear your thoughts on these @brandur, I will save mine for after you form your own opinions 😄 Easy to flip it back to option (1) as I had in earlier commits, or stick with (2) if preferable.Additionally, this output feature is implemented using a lower level feature that enables any metadata key & value to be "put" / overwritten as part of the job's completion. For now, I've leveraged this feature to replace the specialized snooze increment argument, as well as for recorded output. It could also potentially be used in middleware to i.e. record job timing details into a metadata key, but without requiring additional SQL updates.
The completion queries were updated to facilitate this, particularly by making sure this metadata merge process still happens even if the job is somehow already completed. This would cover an edge case where a middleware puts a metadata key after the job has finished executing and completing itself via
JobCompleteTx
; information such as the timing example above would still need to be saved with the job post-execution, even though it'd happen via a separate SQLUPDATE
.