Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid duplicated Cached promises when invalidating expanded postings cache #6452

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Avoid duplicated Cached promises when invalidating expanded postings …
…cache

Signed-off-by: alanprot <[email protected]>
alanprot committed Dec 21, 2024
commit 5724c70d00a2092976dd806fb7dbddb5ea02ac5f
65 changes: 44 additions & 21 deletions pkg/storage/tsdb/expanded_postings_cache.go
Original file line number Diff line number Diff line change
@@ -5,7 +5,6 @@ import (
"context"
"flag"
"slices"
"strconv"
"strings"
"sync"
"time"
@@ -166,7 +165,7 @@ func (c *blocksPostingsForMatchersCache) PostingsForMatchers(ctx context.Context
}

func (c *blocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsdb.IndexReader, ms ...*labels.Matcher) func(context.Context) (index.Postings, error) {
var seed string
var seed int
cache := c.blocksCache

// If is a head block, lets add the seed on the cache key so we can
@@ -208,8 +207,8 @@ func (c *blocksPostingsForMatchersCache) fetchPostings(blockID ulid.ULID, ix tsd
return nil, 0, err
}

key := cacheKey(seed, blockID, ms...)
promise, loaded := cache.getPromiseForKey(key, fetch)
key := cacheKey(blockID, ms...)
promise, loaded := cache.getPromiseForKey(seed, key, fetch)
if loaded {
c.metrics.CacheHits.WithLabelValues(cache.name).Inc()
}
@@ -231,11 +230,11 @@ func (c *blocksPostingsForMatchersCache) result(ce *cacheEntryPromise[[]storage.
}
}

func (c *blocksPostingsForMatchersCache) getSeedForMetricName(metricName string) string {
func (c *blocksPostingsForMatchersCache) getSeedForMetricName(metricName string) int {
return c.seedByHash.getSeed(c.userId, metricName)
}

func cacheKey(seed string, blockID ulid.ULID, ms ...*labels.Matcher) string {
func cacheKey(blockID ulid.ULID, ms ...*labels.Matcher) string {
slices.SortFunc(ms, func(i, j *labels.Matcher) int {
if i.Type != j.Type {
return int(i.Type - j.Type)
@@ -254,14 +253,12 @@ func cacheKey(seed string, blockID ulid.ULID, ms ...*labels.Matcher) string {
sepLen = 1
)

size := len(seed) + len(blockID.String()) + 2*sepLen
size := len(blockID.String()) + sepLen
for _, m := range ms {
size += len(m.Name) + len(m.Value) + typeLen + sepLen
}
sb := strings.Builder{}
sb.Grow(size)
sb.WriteString(seed)
sb.WriteByte('|')
sb.WriteString(blockID.String())
sb.WriteByte('|')
for _, m := range ms {
@@ -300,13 +297,13 @@ func newSeedByHash(size int) *seedByHash {
}
}

func (s *seedByHash) getSeed(userId string, v string) string {
func (s *seedByHash) getSeed(userId string, v string) int {
h := memHashString(userId, v)
i := h % uint64(len(s.seedByHash))
l := i % uint64(len(s.strippedLock))
s.strippedLock[l].RLock()
defer s.strippedLock[l].RUnlock()
return strconv.Itoa(s.seedByHash[i])
return s.seedByHash[i]
}

func (s *seedByHash) incrementSeed(userId string, v string) {
@@ -360,9 +357,10 @@ func (c *fifoCache[V]) expire() {
}
}

func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) {
func (c *fifoCache[V]) getPromiseForKey(seed int, k string, fetch func() (V, int64, error)) (*cacheEntryPromise[V], bool) {
r := &cacheEntryPromise[V]{
done: make(chan struct{}),
seed: seed,
}
defer close(r.done)

@@ -385,15 +383,39 @@ func (c *fifoCache[V]) getPromiseForKey(k string, fetch func() (V, int64, error)
// If the promise is already in the cache, lets wait it to fetch the data.
<-loaded.(*cacheEntryPromise[V]).done

// If is cached but is expired, lets try to replace the cache value.
if loaded.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()) && c.cachedValues.CompareAndSwap(k, loaded, r) {
c.metrics.CacheEvicts.WithLabelValues(c.name, "expired").Inc()
r.v, r.sizeBytes, r.err = fetch()
r.sizeBytes += int64(len(k))
c.updateSize(loaded.(*cacheEntryPromise[V]).sizeBytes, r.sizeBytes)
loaded = r
r.ts = c.timeNow()
ok = false
var reason string
invalidated, expired := false, false

switch {
// If the seed from the cached promise is not equal with the incoming sample, it means that the cache key was invalidated
case loaded.(*cacheEntryPromise[V]).seed != seed:
invalidated = true
reason = "invalidated"
case loaded.(*cacheEntryPromise[V]).isExpired(c.cfg.Ttl, c.timeNow()):
expired = true
reason = "expired"
}

if invalidated || expired {
c.metrics.CacheEvicts.WithLabelValues(c.name, reason).Inc()

// If the cache is invalid of expired, lets try to replace its value
if c.cachedValues.CompareAndSwap(k, loaded, r) {
r.v, r.sizeBytes, r.err = fetch()
r.sizeBytes += int64(len(k))
c.updateSize(loaded.(*cacheEntryPromise[V]).sizeBytes, r.sizeBytes)
loaded = r
r.ts = c.timeNow()
ok = false
} else if invalidated {
// If we cannot perform the swap, it indicates that another goroutine is attempting to set the cache key concurrently.
// In this scenario, fetch the key if it was invalidated, as we cannot be certain whether the other goroutine holds
// the most up-to-date value. Loading from the cache in this state may result in returning a stale value.
r.v, r.sizeBytes, r.err = fetch()
r.sizeBytes += int64(len(k))
r.ts = c.timeNow()
loaded = r
}
}
}

@@ -459,6 +481,7 @@ func (c *fifoCache[V]) updateSize(oldSize, newSizeBytes int64) {
type cacheEntryPromise[V any] struct {
ts time.Time
sizeBytes int64
seed int

done chan struct{}
v V
36 changes: 22 additions & 14 deletions pkg/storage/tsdb/expanded_postings_cache_test.go
Original file line number Diff line number Diff line change
@@ -18,7 +18,6 @@ import (

func TestCacheKey(t *testing.T) {
blockID := ulid.MustNew(1, nil)
seed := "seed123"
matchers := []*labels.Matcher{
{
Type: labels.MatchEqual,
@@ -41,8 +40,8 @@ func TestCacheKey(t *testing.T) {
Value: "value_4",
},
}
r := cacheKey(seed, blockID, matchers...)
require.Equal(t, "seed123|00000000010000000000000000|name_1=value_1|name_2!=value_2|name_3=~value_4|name_5!~value_4|", r)
r := cacheKey(blockID, matchers...)
require.Equal(t, "00000000010000000000000000|name_1=value_1|name_2!=value_2|name_3=~value_4|name_5!~value_4|", r)
}

func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) {
@@ -67,7 +66,7 @@ func Test_ShouldFetchPromiseOnlyOnce(t *testing.T) {
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()
cache.getPromiseForKey("key1", fetchFunc)
cache.getPromiseForKey(1, "key1", fetchFunc)
}()
}

@@ -82,7 +81,7 @@ func TestFifoCacheDisabled(t *testing.T) {
m := NewPostingCacheMetrics(prometheus.NewPedanticRegistry())
timeNow := time.Now
cache := newFifoCache[int](cfg, "test", m, timeNow)
old, loaded := cache.getPromiseForKey("key1", func() (int, int64, error) {
old, loaded := cache.getPromiseForKey(1, "key1", func() (int, int64, error) {
return 1, 0, nil
})
require.False(t, loaded)
@@ -91,7 +90,6 @@ func TestFifoCacheDisabled(t *testing.T) {
}

func TestFifoCacheExpire(t *testing.T) {

keySize := 20
numberOfKeys := 100

@@ -128,17 +126,24 @@ func TestFifoCacheExpire(t *testing.T) {

for i := 0; i < numberOfKeys; i++ {
key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize)
p, loaded := cache.getPromiseForKey(key, func() (int, int64, error) {
p, loaded := cache.getPromiseForKey(1, key, func() (int, int64, error) {
return 1, 8, nil
})
require.False(t, loaded)
require.Equal(t, 1, p.v)
require.True(t, cache.contains(key))
p, loaded = cache.getPromiseForKey(key, func() (int, int64, error) {
return 1, 0, nil
p, loaded = cache.getPromiseForKey(1, key, func() (int, int64, error) {
return 1, 8, nil
})
require.True(t, loaded)
require.Equal(t, 1, p.v)

// Changing seed and make sure the key is reloaded
p, loaded = cache.getPromiseForKey(2, key, func() (int, int64, error) {
return 2, 8, nil
})
require.False(t, loaded)
require.Equal(t, 2, p.v)
}

totalCacheSize := 0
@@ -156,8 +161,9 @@ func TestFifoCacheExpire(t *testing.T) {
err := testutil.GatherAndCompare(r, bytes.NewBufferString(fmt.Sprintf(`
# HELP cortex_ingester_expanded_postings_cache_evicts Total number of evictions in the cache, excluding items that got evicted due to TTL.
# TYPE cortex_ingester_expanded_postings_cache_evicts counter
cortex_ingester_expanded_postings_cache_evicts{cache="test",reason="invalidated"} %v
cortex_ingester_expanded_postings_cache_evicts{cache="test",reason="full"} %v
`, numberOfKeys-c.expectedFinalItems)), "cortex_ingester_expanded_postings_cache_evicts")
`, numberOfKeys, numberOfKeys-c.expectedFinalItems)), "cortex_ingester_expanded_postings_cache_evicts")
require.NoError(t, err)

}
@@ -170,7 +176,7 @@ func TestFifoCacheExpire(t *testing.T) {
for i := 0; i < numberOfKeys; i++ {
key := RepeatStringIfNeeded(fmt.Sprintf("key%d", i), keySize)
originalSize := cache.cachedBytes
p, loaded := cache.getPromiseForKey(key, func() (int, int64, error) {
p, loaded := cache.getPromiseForKey(2, key, func() (int, int64, error) {
return 2, 18, nil
})
require.False(t, loaded)
@@ -183,15 +189,16 @@ func TestFifoCacheExpire(t *testing.T) {
err := testutil.GatherAndCompare(r, bytes.NewBufferString(fmt.Sprintf(`
# HELP cortex_ingester_expanded_postings_cache_evicts Total number of evictions in the cache, excluding items that got evicted due to TTL.
# TYPE cortex_ingester_expanded_postings_cache_evicts counter
cortex_ingester_expanded_postings_cache_evicts{cache="test",reason="invalidated"} %v
cortex_ingester_expanded_postings_cache_evicts{cache="test",reason="expired"} %v
`, numberOfKeys)), "cortex_ingester_expanded_postings_cache_evicts")
`, numberOfKeys, numberOfKeys)), "cortex_ingester_expanded_postings_cache_evicts")
require.NoError(t, err)

cache.timeNow = func() time.Time {
return timeNow().Add(5 * c.cfg.Ttl)
}

cache.getPromiseForKey("newKwy", func() (int, int64, error) {
cache.getPromiseForKey(1, "newKwy", func() (int, int64, error) {
return 2, 18, nil
})

@@ -200,7 +207,8 @@ func TestFifoCacheExpire(t *testing.T) {
# HELP cortex_ingester_expanded_postings_cache_evicts Total number of evictions in the cache, excluding items that got evicted due to TTL.
# TYPE cortex_ingester_expanded_postings_cache_evicts counter
cortex_ingester_expanded_postings_cache_evicts{cache="test",reason="expired"} %v
`, numberOfKeys*2)), "cortex_ingester_expanded_postings_cache_evicts")
cortex_ingester_expanded_postings_cache_evicts{cache="test",reason="invalidated"} %v
`, numberOfKeys*2, numberOfKeys)), "cortex_ingester_expanded_postings_cache_evicts")
require.NoError(t, err)
}
})
Loading