Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce query response size limit after decompression in query-frontend #6607

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
inject orgId to ctx for unit tests
Signed-off-by: Ahmed Hassan <[email protected]>
afhassan committed Feb 26, 2025
commit 0360296e2ac49fec6d47127513544e0853cfd37e
30 changes: 22 additions & 8 deletions pkg/querier/tripperware/instantquery/instant_query_test.go
Original file line number Diff line number Diff line change
@@ -26,7 +26,14 @@ import (
"github.com/cortexproject/cortex/pkg/util/validation"
)

var testInstantQueryCodec = NewInstantQueryCodec(string(tripperware.NonCompression), string(tripperware.ProtobufCodecType), &validation.Overrides{})
var userLimit = validation.Limits{
MaxFetchedSeriesPerQuery: 0,
MaxFetchedChunkBytesPerQuery: 0,
MaxChunksPerQuery: 0,
MaxFetchedDataBytesPerQuery: 0,
}
var overrides, _ = validation.NewOverrides(userLimit, nil)
var testInstantQueryCodec = NewInstantQueryCodec(string(tripperware.NonCompression), string(tripperware.ProtobufCodecType), overrides)

var jsonHttpReq = &http.Request{
Header: map[string][]string{
@@ -191,7 +198,9 @@ func TestCompressedResponse(t *testing.T) {
Header: h,
Body: io.NopCloser(responseBody),
}
resp, err := testInstantQueryCodec.DecodeResponse(context.Background(), response, nil)

ctx := user.InjectOrgID(context.Background(), "1")
resp, err := testInstantQueryCodec.DecodeResponse(ctx, response, nil)
require.Equal(t, tc.err, err)

if err == nil {
@@ -455,7 +464,8 @@ func TestResponse(t *testing.T) {
}
}

resp, err := testInstantQueryCodec.DecodeResponse(context.Background(), response, nil)
ctx := user.InjectOrgID(context.Background(), "1")
resp, err := testInstantQueryCodec.DecodeResponse(ctx, response, nil)
require.NoError(t, err)

// Reset response, as the above call will have consumed the body reader.
@@ -465,7 +475,7 @@ func TestResponse(t *testing.T) {
Body: io.NopCloser(bytes.NewBuffer([]byte(tc.jsonBody))),
ContentLength: int64(len(tc.jsonBody)),
}
resp2, err := testInstantQueryCodec.EncodeResponse(context.Background(), jsonHttpReq, resp)
resp2, err := testInstantQueryCodec.EncodeResponse(ctx, jsonHttpReq, resp)
require.NoError(t, err)
assert.Equal(t, response, resp2)
})
@@ -711,7 +721,7 @@ func TestMergeResponse(t *testing.T) {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
ctx, cancelCtx := context.WithCancel(context.Background())
ctx, cancelCtx := context.WithCancel(user.InjectOrgID(context.Background(), "1"))

var resps []tripperware.Response
for _, r := range tc.resps {
@@ -1724,7 +1734,7 @@ func TestMergeResponseProtobuf(t *testing.T) {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
ctx, cancelCtx := context.WithCancel(context.Background())
ctx, cancelCtx := context.WithCancel(user.InjectOrgID(context.Background(), "1"))

var resps []tripperware.Response
for _, r := range tc.resps {
@@ -1871,7 +1881,9 @@ func Benchmark_Decode(b *testing.B) {
StatusCode: 200,
Body: io.NopCloser(bytes.NewBuffer(body)),
}
_, err := testInstantQueryCodec.DecodeResponse(context.Background(), response, nil)

ctx := user.InjectOrgID(context.Background(), "1")
_, err := testInstantQueryCodec.DecodeResponse(ctx, response, nil)
require.NoError(b, err)
}
})
@@ -1934,7 +1946,9 @@ func Benchmark_Decode_Protobuf(b *testing.B) {
Header: http.Header{"Content-Type": []string{"application/x-protobuf"}},
Body: io.NopCloser(bytes.NewBuffer(body)),
}
_, err := testInstantQueryCodec.DecodeResponse(context.Background(), response, nil)

ctx := user.InjectOrgID(context.Background(), "1")
_, err := testInstantQueryCodec.DecodeResponse(ctx, response, nil)
require.NoError(b, err)
}
})
9 changes: 8 additions & 1 deletion pkg/querier/tripperware/instantquery/shard_by_query_test.go
Original file line number Diff line number Diff line change
@@ -10,5 +10,12 @@ import (

func Test_shardQuery(t *testing.T) {
t.Parallel()
tripperware.TestQueryShardQuery(t, testInstantQueryCodec, queryrange.NewPrometheusCodec(true, "", "protobuf", &validation.Overrides{}))
userLimit := validation.Limits{
MaxFetchedSeriesPerQuery: 0,
MaxFetchedChunkBytesPerQuery: 0,
MaxChunksPerQuery: 0,
MaxFetchedDataBytesPerQuery: 0,
}
overrides, _ := validation.NewOverrides(userLimit, nil)
tripperware.TestQueryShardQuery(t, testInstantQueryCodec, queryrange.NewPrometheusCodec(true, "", "protobuf", overrides))
}
Original file line number Diff line number Diff line change
@@ -21,8 +21,15 @@ import (
)

var (
PrometheusCodec = NewPrometheusCodec(false, "", "protobuf", &validation.Overrides{})
ShardedPrometheusCodec = NewPrometheusCodec(false, "", "protobuf", &validation.Overrides{})
userLimit = validation.Limits{
MaxFetchedSeriesPerQuery: 0,
MaxFetchedChunkBytesPerQuery: 0,
MaxChunksPerQuery: 0,
MaxFetchedDataBytesPerQuery: 0,
}
overrides, _ = validation.NewOverrides(userLimit, nil)
PrometheusCodec = NewPrometheusCodec(false, "", "protobuf", overrides)
ShardedPrometheusCodec = NewPrometheusCodec(false, "", "protobuf", overrides)
)

func TestRoundTrip(t *testing.T) {
11 changes: 7 additions & 4 deletions pkg/querier/tripperware/queryrange/query_range_test.go
Original file line number Diff line number Diff line change
@@ -269,7 +269,7 @@ func TestResponse(t *testing.T) {
t.Parallel()
protobuf, err := proto.Marshal(tc.promBody)
require.NoError(t, err)
ctx, cancelCtx := context.WithCancel(context.Background())
ctx, cancelCtx := context.WithCancel(user.InjectOrgID(context.Background(), "1"))

var response *http.Response
if tc.isProtobuf {
@@ -420,7 +420,8 @@ func TestResponseWithStats(t *testing.T) {
tc.promBody.Headers = respHeadersJson
}

resp, err := PrometheusCodec.DecodeResponse(context.Background(), response, nil)
ctx := user.InjectOrgID(context.Background(), "1")
resp, err := PrometheusCodec.DecodeResponse(ctx, response, nil)
require.NoError(t, err)
assert.Equal(t, tc.promBody, resp)

@@ -1183,7 +1184,7 @@ func TestMergeAPIResponses(t *testing.T) {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
ctx, cancelCtx := context.WithCancel(context.Background())
ctx, cancelCtx := context.WithCancel(user.InjectOrgID(context.Background(), "1"))
if tc.cancelCtx {
cancelCtx()
}
@@ -1286,7 +1287,9 @@ func TestCompressedResponse(t *testing.T) {
Header: h,
Body: io.NopCloser(responseBody),
}
resp, err := PrometheusCodec.DecodeResponse(context.Background(), response, nil)

ctx := user.InjectOrgID(context.Background(), "1")
resp, err := PrometheusCodec.DecodeResponse(ctx, response, nil)
require.Equal(t, tc.err, err)

if err == nil {
Loading