Skip to content

Commit

Permalink
Better thought out caching mechanism that actually works
Browse files Browse the repository at this point in the history
This one follows up #794 again. That change included a template cache,
with the idea being that we could reuse a rendered sqlc query when all
input values were expected to be stable. For example, when replacing
something like a schema name, we'd presumably always be replacing the
template value with the same schema over and over again, so it'd be
better to save on the work.

But that caching system hadn't adequately been thought through, and
wouldn't really work. I realized when I was putting #798 (explicit
schemas) together that if you injected two schema values from two
different clients then you'd end up using the same cache value, which is
wrong.

Here, we tool out the cache layer a little more so that it considers
input values and named args, which all must be consistent for a cached
value to be returned. We also add tests to make sure of it.

Building a cache key unfortunately has to rely on concatenating some
strings together and presorting map keys for stability, but even with
the extra work involved, a cache hit still comes out to be quite a bit
faster than a miss (~2.5x faster), so it seems worth doing:

    $ go test ./rivershared/sqlctemplate -bench=.
    goos: darwin
    goarch: arm64
    pkg: github.com/riverqueue/river/rivershared/sqlctemplate
    cpu: Apple M4
    BenchmarkReplacer/WithCache-10           1626988               735.7 ns/op
    BenchmarkReplacer/WithoutCache-10         695517              1710 ns/op
    PASS
    ok      github.com/riverqueue/river/rivershared/sqlctemplate    3.419s
  • Loading branch information
brandur committed Mar 9, 2025
1 parent 6ce45ae commit 3617481
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 21 deletions.
92 changes: 71 additions & 21 deletions rivershared/sqlctemplate/sqlc_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type Replacement struct {
// be initialized with a constructor. This lets it default to a usable instance
// on drivers that may themselves not be initialized.
type Replacer struct {
cache map[string]string
cache map[replacerCacheKey]string
cacheMu sync.RWMutex
}

Expand Down Expand Up @@ -131,22 +131,25 @@ func (r *Replacer) RunSafely(ctx context.Context, sql string, args []any) (strin
return "", nil, errors.New("sqlctemplate found template(s) in SQL, but no context container; bug?")
}

r.cacheMu.RLock()
var (
cachedSQL string
cachedSQLOK bool
)
if r.cache != nil { // protect against map not initialized yet
cachedSQL, cachedSQLOK = r.cache[sql]
}
r.cacheMu.RUnlock()
cacheKey, cacheEligible := replacerCacheKeyFrom(sql, container)
if cacheEligible {
r.cacheMu.RLock()
var (
cachedSQL string
cachedSQLOK bool
)
if r.cache != nil { // protect against map not initialized yet
cachedSQL, cachedSQLOK = r.cache[cacheKey]
}
r.cacheMu.RUnlock()

// If all input templates were stable, the finished SQL will have been cached.
if cachedSQLOK {
if len(container.NamedArgs) > 0 {
args = append(args, maputil.Values(container.NamedArgs)...)
// If all input templates were stable, the finished SQL will have been cached.
if cachedSQLOK {
if len(container.NamedArgs) > 0 {
args = append(args, maputil.Values(container.NamedArgs)...)
}
return cachedSQL, args, nil
}
return cachedSQL, args, nil
}

var (
Expand Down Expand Up @@ -212,17 +215,15 @@ func (r *Replacer) RunSafely(ctx context.Context, sql string, args []any) (strin
}
}

for _, replacement := range container.Replacements {
if !replacement.Stable {
return updatedSQL, args, nil
}
if !cacheEligible {
return updatedSQL, args, nil
}

r.cacheMu.Lock()
if r.cache == nil {
r.cache = make(map[string]string)
r.cache = make(map[replacerCacheKey]string)
}
r.cache[sql] = updatedSQL
r.cache[cacheKey] = updatedSQL
r.cacheMu.Unlock()

return updatedSQL, args, nil
Expand Down Expand Up @@ -254,3 +255,52 @@ func WithReplacements(ctx context.Context, replacements map[string]Replacement,
Replacements: replacements,
})
}

// Comparable struct that's used as a key for template caching.
type replacerCacheKey struct {
namedArgs string // all arg names concatenated together
replacementValues string // all values concatenated together
sql string
}

// Builds a cache key for the given SQL and context container.
//
// A key is only built if the given SQL/templates are cacheable, which means all
// template values must be stable. The second return value is a boolean
// indicating whether a cache key was built or not. If false, the input is not
// eligible for caching, and no check against the cache should be made.
func replacerCacheKeyFrom(sql string, container *contextContainer) (replacerCacheKey, bool) {
// Only eligible for caching if all replacements are stable.
for _, replacement := range container.Replacements {
if !replacement.Stable {
return replacerCacheKey{}, false
}
}

var (
namedArgsBuilder strings.Builder

// Named args must be sorted for key stability because Go maps don't
// provide any ordering guarantees.
sortedNamedArgs = maputil.Keys(container.NamedArgs)
)
slices.Sort(sortedNamedArgs)
for _, namedArg := range sortedNamedArgs {
namedArgsBuilder.WriteString(namedArg)
}

var (
replacementValuesBuilder strings.Builder
sortedReplacements = maputil.Keys(container.Replacements)
)
slices.Sort(sortedReplacements)
for _, template := range sortedReplacements {
replacementValuesBuilder.WriteString(container.Replacements[template].Value)
}

return replacerCacheKey{
namedArgs: namedArgsBuilder.String(),
replacementValues: replacementValuesBuilder.String(),
sql: sql,
}, true
}
123 changes: 123 additions & 0 deletions rivershared/sqlctemplate/sqlc_template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,95 @@ func TestReplacer(t *testing.T) {
require.Empty(t, replacer.cache)
})

t.Run("CacheBasedOnInputValues", func(t *testing.T) {
t.Parallel()

replacer, _ := setup(t)

// SQL stays constant across all runs.
const sql = `
SELECT count(*)
FROM /* TEMPLATE: schema */river_job
WHERE kind = @kind
AND state = @state;
`

// Initially cached value
{
ctx := WithReplacements(ctx, map[string]Replacement{
"schema": {Stable: true, Value: "test_schema."},
}, nil)

_, _, err := replacer.RunSafely(ctx, sql, nil)
require.NoError(t, err)
}
require.Len(t, replacer.cache, 1)

// Same SQL, but new value.
{
ctx := WithReplacements(ctx, map[string]Replacement{
"schema": {Stable: true, Value: "other_schema."},
}, nil)

_, _, err := replacer.RunSafely(ctx, sql, nil)
require.NoError(t, err)
}
require.Len(t, replacer.cache, 2)

// Named arg added to the mix.
{
ctx := WithReplacements(ctx, map[string]Replacement{
"schema": {Stable: true, Value: "test_schema."},
}, map[string]any{
"kind": "kind_value",
})

_, _, err := replacer.RunSafely(ctx, sql, nil)
require.NoError(t, err)
}
require.Len(t, replacer.cache, 3)

// Different named arg value can still use the previous cached value.
{
ctx := WithReplacements(ctx, map[string]Replacement{
"schema": {Stable: true, Value: "test_schema."},
}, map[string]any{
"kind": "other_kind_value",
})

_, _, err := replacer.RunSafely(ctx, sql, nil)
require.NoError(t, err)
}
require.Len(t, replacer.cache, 3) // unchanged

// New named arg adds a new cache value.
{
ctx := WithReplacements(ctx, map[string]Replacement{
"schema": {Stable: true, Value: "test_schema."},
}, map[string]any{
"kind": "kind_value",
"state": "state_value",
})

_, _, err := replacer.RunSafely(ctx, sql, nil)
require.NoError(t, err)
}
require.Len(t, replacer.cache, 4)

// Different input SQL.
{
ctx := WithReplacements(ctx, map[string]Replacement{
"schema": {Stable: true, Value: "test_schema."},
}, nil)

_, _, err := replacer.RunSafely(ctx, `
SELECT /* TEMPLATE: schema */river_job;
`, nil)
require.NoError(t, err)
}
require.Len(t, replacer.cache, 5)
})

t.Run("NamedArgsNoInitialArgs", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -369,3 +458,37 @@ func TestReplacer(t *testing.T) {
wg.Wait()
})
}

func BenchmarkReplacer(b *testing.B) {
ctx := context.Background()

runReplacer := func(b *testing.B, replacer *Replacer, stable bool) {
b.Helper()

ctx := WithReplacements(ctx, map[string]Replacement{
"schema": {Stable: stable, Value: "test_schema."},
}, nil)

_, _, err := replacer.RunSafely(ctx, `
-- name: JobCountByState :one
SELECT count(*)
FROM /* TEMPLATE: schema */river_job
WHERE state = @state;
`, nil)
require.NoError(b, err)
}

b.Run("WithCache", func(b *testing.B) {
var replacer Replacer
for range b.N {
runReplacer(b, &replacer, true)
}
})

b.Run("WithoutCache", func(b *testing.B) {
var replacer Replacer
for range b.N {
runReplacer(b, &replacer, false)
}
})
}

0 comments on commit 3617481

Please sign in to comment.