Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce query response size limit after decompression in query-frontend #6607

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
add query response size limit after decompression in query-frontend
Signed-off-by: Ahmed Hassan <[email protected]>
afhassan committed Feb 25, 2025
commit 8166600248a0dafdf6d963207ad2879b9d166f6e
6 changes: 3 additions & 3 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
@@ -477,10 +477,10 @@ func (t *Cortex) initFlusher() (serv services.Service, err error) {
func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err error) {
queryAnalyzer := querysharding.NewQueryAnalyzer()
// PrometheusCodec is a codec to encode and decode Prometheus query range requests and responses.
prometheusCodec := queryrange.NewPrometheusCodec(false, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)
prometheusCodec := queryrange.NewPrometheusCodec(false, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec, t.Overrides)
// ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats)
shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)
instantQueryCodec := instantquery.NewInstantQueryCodec(t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)
shardedPrometheusCodec := queryrange.NewPrometheusCodec(true, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec, t.Overrides)
instantQueryCodec := instantquery.NewInstantQueryCodec(t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec, t.Overrides)

queryRangeMiddlewares, cache, err := queryrange.Middlewares(
t.Cfg.QueryRange,
2 changes: 1 addition & 1 deletion pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
@@ -274,7 +274,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// If the response status code is not 2xx, try to get the
// error message from response body.
if resp.StatusCode/100 != 2 {
body, err2 := tripperware.BodyBuffer(resp, f.log)
body, err2 := tripperware.BodyBuffer(resp, 0, f.log)
if err2 == nil {
err = httpgrpc.Errorf(resp.StatusCode, string(body))
}
23 changes: 20 additions & 3 deletions pkg/querier/tripperware/instantquery/instant_query.go
Original file line number Diff line number Diff line change
@@ -21,8 +21,11 @@ import (

"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/limiter"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/cortexproject/cortex/pkg/util/validation"
)

var (
@@ -40,10 +43,11 @@ type instantQueryCodec struct {
tripperware.Codec
compression tripperware.Compression
defaultCodecType tripperware.CodecType
limits *validation.Overrides
now func() time.Time
}

func NewInstantQueryCodec(compressionStr string, defaultCodecTypeStr string) instantQueryCodec {
func NewInstantQueryCodec(compressionStr string, defaultCodecTypeStr string, limits *validation.Overrides) instantQueryCodec {
compression := tripperware.NonCompression // default
if compressionStr == string(tripperware.GzipCompression) {
compression = tripperware.GzipCompression
@@ -57,6 +61,7 @@ func NewInstantQueryCodec(compressionStr string, defaultCodecTypeStr string) ins
return instantQueryCodec{
compression: compression,
defaultCodecType: defaultCodecType,
limits: limits,
now: time.Now,
}
}
@@ -92,19 +97,31 @@ func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, for
return &result, nil
}

func (instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response, _ tripperware.Request) (tripperware.Response, error) {
func (c instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response, _ tripperware.Request) (tripperware.Response, error) {
log, ctx := spanlogger.New(ctx, "DecodeQueryInstantResponse") //nolint:ineffassign,staticcheck
defer log.Finish()

if err := ctx.Err(); err != nil {
return nil, err
}

buf, err := tripperware.BodyBuffer(r, log)
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

queryLimiter := limiter.NewQueryLimiter(c.limits.MaxFetchedSeriesPerQuery(userID), c.limits.MaxFetchedChunkBytesPerQuery(userID), c.limits.MaxChunksPerQuery(userID), c.limits.MaxFetchedDataBytesPerQuery(userID))

buf, err := tripperware.BodyBuffer(r, c.limits.MaxFetchedDataBytesPerQuery(userID), log)
if err != nil {
log.Error(err)
return nil, err
}

if dataBytesLimitErr := queryLimiter.AddDataBytes(len(buf)); dataBytesLimitErr != nil {
return nil, validation.LimitError(dataBytesLimitErr.Error())
}

if r.StatusCode/100 != 2 {
return nil, httpgrpc.Errorf(r.StatusCode, string(buf))
}
3 changes: 2 additions & 1 deletion pkg/querier/tripperware/instantquery/instant_query_test.go
Original file line number Diff line number Diff line change
@@ -23,9 +23,10 @@ import (

"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/util/validation"
)

var testInstantQueryCodec = NewInstantQueryCodec(string(tripperware.NonCompression), string(tripperware.ProtobufCodecType))
var testInstantQueryCodec = NewInstantQueryCodec(string(tripperware.NonCompression), string(tripperware.ProtobufCodecType), &validation.Overrides{})

var jsonHttpReq = &http.Request{
Header: map[string][]string{
3 changes: 2 additions & 1 deletion pkg/querier/tripperware/instantquery/shard_by_query_test.go
Original file line number Diff line number Diff line change
@@ -5,9 +5,10 @@ import (

"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
"github.com/cortexproject/cortex/pkg/util/validation"
)

func Test_shardQuery(t *testing.T) {
t.Parallel()
tripperware.TestQueryShardQuery(t, testInstantQueryCodec, queryrange.NewPrometheusCodec(true, "", "protobuf"))
tripperware.TestQueryShardQuery(t, testInstantQueryCodec, queryrange.NewPrometheusCodec(true, "", "protobuf", &validation.Overrides{}))
}
12 changes: 10 additions & 2 deletions pkg/querier/tripperware/query.go
Original file line number Diff line number Diff line change
@@ -448,7 +448,7 @@ type Buffer interface {
Bytes() []byte
}

func BodyBuffer(res *http.Response, logger log.Logger) ([]byte, error) {
func BodyBuffer(res *http.Response, maxBytes int, logger log.Logger) ([]byte, error) {
var buf *bytes.Buffer

// Attempt to cast the response body to a Buffer and use it if possible.
@@ -473,10 +473,18 @@ func BodyBuffer(res *http.Response, logger log.Logger) ([]byte, error) {
return nil, err
}
defer runutil.CloseWithLogOnErr(logger, gReader, "close gzip reader")

if maxBytes > 0 {
gReaderLimited := io.LimitReader(gReader, int64(maxBytes)+1)
return io.ReadAll(gReaderLimited)
}
return io.ReadAll(gReader)

} else if strings.EqualFold(res.Header.Get("Content-Encoding"), "snappy") {
sReader := snappy.NewReader(buf)
if maxBytes > 0 {
sReaderLimited := io.LimitReader(sReader, int64(maxBytes)+1)
return io.ReadAll(sReaderLimited)
}
return io.ReadAll(sReader)
}

22 changes: 20 additions & 2 deletions pkg/querier/tripperware/queryrange/query_range.go
Original file line number Diff line number Diff line change
@@ -20,8 +20,12 @@ import (

"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"

"github.com/cortexproject/cortex/pkg/util/limiter"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/cortexproject/cortex/pkg/util/validation"
)

// StatusSuccess Prometheus success result.
@@ -61,9 +65,10 @@ type prometheusCodec struct {
sharded bool
compression tripperware.Compression
defaultCodecType tripperware.CodecType
limits *validation.Overrides
}

func NewPrometheusCodec(sharded bool, compressionStr string, defaultCodecTypeStr string) *prometheusCodec { //nolint:revive
func NewPrometheusCodec(sharded bool, compressionStr string, defaultCodecTypeStr string, limits *validation.Overrides) *prometheusCodec { //nolint:revive
compression := tripperware.NonCompression // default
if compressionStr == string(tripperware.GzipCompression) {
compression = tripperware.GzipCompression
@@ -78,6 +83,7 @@ func NewPrometheusCodec(sharded bool, compressionStr string, defaultCodecTypeStr
sharded: sharded,
compression: compression,
defaultCodecType: defaultCodecType,
limits: limits,
}
}

@@ -198,11 +204,23 @@ func (c prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _
return nil, err
}

buf, err := tripperware.BodyBuffer(r, log)
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

queryLimiter := limiter.NewQueryLimiter(c.limits.MaxFetchedSeriesPerQuery(userID), c.limits.MaxFetchedChunkBytesPerQuery(userID), c.limits.MaxChunksPerQuery(userID), c.limits.MaxFetchedDataBytesPerQuery(userID))

buf, err := tripperware.BodyBuffer(r, c.limits.MaxFetchedDataBytesPerQuery(userID), log)
if err != nil {
log.Error(err)
return nil, err
}

if dataBytesLimitErr := queryLimiter.AddDataBytes(len(buf)); dataBytesLimitErr != nil {
return nil, validation.LimitError(dataBytesLimitErr.Error())
}

if r.StatusCode/100 != 2 {
return nil, httpgrpc.Errorf(r.StatusCode, string(buf))
}
Original file line number Diff line number Diff line change
@@ -17,11 +17,12 @@ import (
"github.com/weaveworks/common/user"

"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/util/validation"
)

var (
PrometheusCodec = NewPrometheusCodec(false, "", "protobuf")
ShardedPrometheusCodec = NewPrometheusCodec(false, "", "protobuf")
PrometheusCodec = NewPrometheusCodec(false, "", "protobuf", &validation.Overrides{})
ShardedPrometheusCodec = NewPrometheusCodec(false, "", "protobuf", &validation.Overrides{})
)

func TestRoundTrip(t *testing.T) {
Loading