Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Enforce attachment size in rate limits #639

Merged
merged 2 commits into from
Jun 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
**Internal**:

- Restructure the envelope and event ingestion paths into a pipeline and apply rate limits to all envelopes. ([#635](https://github.com/getsentry/relay/pull/635), [#636](https://github.com/getsentry/relay/pull/636))
- Pass the combined size of all attachments in an envelope to the Redis rate limiter as quantity to enforce attachment quotas. ([#639](https://github.com/getsentry/relay/pull/639))

## 20.6.0

Expand Down
67 changes: 43 additions & 24 deletions relay-quotas/src/is_rate_limited.lua
Original file line number Diff line number Diff line change
@@ -1,45 +1,64 @@
-- Check a collection of quota counters to identify if an item should be rate
-- limited. Values provided as ``KEYS`` specify the keys of the counters to
-- check and the keys of counters to subtract, and values provided as ``ARGV``
-- specify the maximum value (quota limit) and expiration time for each key.
-- limited. For each quota, repeat the same set of ``KEYS`` and ``ARGV``:
--
-- For example, to check a quota ``foo`` that has a corresponding refund/negative
-- counter "subtract_from_foo", a limit of 10 items and expires at the Unix timestamp
-- ``100``, as well as a quota ``bar`` that has a corresponding refund/negative
-- counter "subtract_from_bar" limit of 20 items and should expire at the Unix
-- timestamp ``100``, the ``KEYS`` and ``ARGV`` values would be as follows:
-- ``KEYS`` (2 per quota):
-- * [string] Key of the counter.
-- * [string] Key of the refund counter.
--
-- KEYS = {"foo", "subtract_from_foo", "bar", "subtract_from_bar"}
-- ARGV = {10, 100, 20, 100}
-- ``ARGV`` (3 per quota):
-- * [number] Quota limit. Can be ``-1`` for unlimited quotas.
-- * [number] Expiration time in seconds for the key.
-- * [number] Quantity to increment the quota by.
--
-- If all checks pass (the item is accepted), the counters for all quotas are
-- incremented. If any checks fail (the item is rejected), the counters for all
-- quotas are unaffected. The result is a Lua table/array (Redis multi bulk
-- reply) that specifies whether or not the item was *rejected* based on the
-- provided limit.
assert(#KEYS == #ARGV, "incorrect number of keys and arguments provided")
assert(#KEYS % 2 == 0, "there must be an even number of keys")
-- For example, to check the following two quotas each with a timeout of 10 minutes:
-- * Key ``foo``, refund key ``foo_refund``, limit ``10``; quantity ``5``
-- * Key ``bar``, refund key ``bar_refund``, limit ``20``; quantity ``1``
--
-- Send these values:
--
-- KEYS = {"foo", "foo_refund", "bar", "bar_refund"}
-- ARGV = {10, 600, 5, 20, 600, 1}
--
-- The script applies the following logic:
-- * If all checks pass, the item is accepted and the counters for all quotas
-- are incremented.
-- * If any check fails, the item is rejected and the counters for all remain
-- unchanged.
--
-- The result is a Lua table/array (Redis multi bulk reply) that specifies
-- whether or not the item was *rejected* based on the provided limit.
assert(#KEYS % 2 == 0, "there must be 2 keys per quota")
assert(#ARGV % 3 == 0, "there must be 3 args per quota")
assert(#KEYS / 2 == #ARGV / 3, "incorrect number of keys and arguments provided")

local results = {}
local failed = false
for i=1, #KEYS, 2 do
local limit = tonumber(ARGV[i])
local num_quotas = #KEYS / 2
for i=0, num_quotas - 1 do
local k = i * 2 + 1
local v = i * 3 + 1

local limit = tonumber(ARGV[v])
local quantity = tonumber(ARGV[v+2])
local rejected = false
-- limit=-1 means "no limit"
if limit >= 0 then
rejected = (redis.call('GET', KEYS[i]) or 0) - (redis.call('GET', KEYS[i + 1]) or 0) + 1 > limit
rejected = (redis.call('GET', KEYS[k]) or 0) - (redis.call('GET', KEYS[k + 1]) or 0) + quantity > limit
end

if rejected then
failed = true
end
results[(i + 1) / 2] = rejected
results[i + 1] = rejected
end

if not failed then
for i=1, #KEYS, 2 do
redis.call('INCR', KEYS[i])
redis.call('EXPIREAT', KEYS[i], ARGV[i + 1])
for i=0, num_quotas - 1 do
local k = i * 2 + 1
local v = i * 3 + 1

redis.call('INCRBY', KEYS[k], ARGV[v + 2])
redis.call('EXPIREAT', KEYS[k], ARGV[v + 1])
end
end

Expand Down
87 changes: 73 additions & 14 deletions relay-quotas/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ impl RedisRateLimiter {
&self,
quotas: &[Quota],
item_scoping: ItemScoping<'_>,
quantity: usize,
) -> Result<RateLimits, RateLimitingError> {
let timestamp = UnixTimestamp::now();

Expand All @@ -197,6 +198,7 @@ impl RedisRateLimiter {

invocation.arg(quota.limit());
invocation.arg(quota.expiry().as_secs());
invocation.arg(quantity);

tracked_quotas.push(quota);
} else {
Expand Down Expand Up @@ -296,7 +298,7 @@ mod tests {
};

let rate_limits: Vec<RateLimit> = RATE_LIMITER
.is_rate_limited(quotas, scoping)
.is_rate_limited(quotas, scoping, 1)
.expect("rate limiting failed")
.into_iter()
.collect();
Expand Down Expand Up @@ -336,7 +338,7 @@ mod tests {

for i in 0..10 {
let rate_limits: Vec<RateLimit> = RATE_LIMITER
.is_rate_limited(quotas, scoping)
.is_rate_limited(quotas, scoping, 1)
.expect("rate limiting failed")
.into_iter()
.collect();
Expand Down Expand Up @@ -370,7 +372,7 @@ mod tests {
};

let rate_limits: Vec<RateLimit> = RATE_LIMITER
.is_rate_limited(&[], scoping)
.is_rate_limited(&[], scoping, 1)
.expect("rate limiting failed")
.into_iter()
.collect();
Expand Down Expand Up @@ -413,7 +415,7 @@ mod tests {

for i in 0..1 {
let rate_limits: Vec<RateLimit> = RATE_LIMITER
.is_rate_limited(quotas, scoping)
.is_rate_limited(quotas, scoping, 1)
.expect("rate limiting failed")
.into_iter()
.collect();
Expand All @@ -434,6 +436,51 @@ mod tests {
}
}

#[test]
fn test_quota_with_quantity() {
let quotas = &[Quota {
id: Some(format!("test_quantity_quota_{:?}", SystemTime::now())),
categories: DataCategories::new(),
scope: QuotaScope::Organization,
scope_id: None,
limit: Some(500),
window: Some(60),
reason_code: Some(ReasonCode::new("get_lost")),
}];

let scoping = ItemScoping {
category: DataCategory::Error,
scoping: &Scoping {
organization_id: 42,
project_id: ProjectId::new(43),
public_key: "a94ae32be2584e0bbd7a4cbb95971fee".to_owned(),
key_id: Some(44),
},
};

for i in 0..10 {
let rate_limits: Vec<RateLimit> = RATE_LIMITER
.is_rate_limited(quotas, scoping, 100)
.expect("rate limiting failed")
.into_iter()
.collect();

if i >= 5 {
assert_eq!(
rate_limits,
vec![RateLimit {
categories: DataCategories::new(),
scope: RateLimitScope::Organization(42),
reason_code: Some(ReasonCode::new("get_lost")),
retry_after: rate_limits[0].retry_after,
}]
);
} else {
assert_eq!(rate_limits, vec![]);
}
}
}

#[test]
fn test_get_redis_key_scoped() {
let quota = Quota {
Expand Down Expand Up @@ -512,14 +559,16 @@ mod tests {

let mut invocation = script.prepare_invoke();
invocation
.key(&foo)
.key(&r_foo)
.key(&bar)
.key(&r_bar)
.arg(1)
.arg(now + 60)
.arg(2)
.arg(now + 120);
.key(&foo) // key
.key(&r_foo) // refund key
.key(&bar) // key
.key(&r_bar) // refund key
.arg(1) // limit
.arg(now + 60) // expiry
.arg(1) // quantity
.arg(2) // limit
.arg(now + 120) // expiry
.arg(1); // quantity

// The item should not be rate limited by either key.
assert_eq!(
Expand Down Expand Up @@ -560,7 +609,12 @@ mod tests {
let () = conn.set(&apple, 5).unwrap();

let mut invocation = script.prepare_invoke();
invocation.key(&orange).key(&baz).arg(1).arg(now + 60);
invocation
.key(&orange) // key
.key(&baz) // refund key
.arg(1) // limit
.arg(now + 60) // expiry
.arg(1); // quantity

// increment
assert_eq!(
Expand All @@ -575,7 +629,12 @@ mod tests {
);

let mut invocation = script.prepare_invoke();
invocation.key(&orange).key(&apple).arg(1).arg(now + 60);
invocation
.key(&orange) // key
.key(&apple) // refund key
.arg(1) // limit
.arg(now + 60) // expiry
.arg(1); // quantity

// test that refund key is used
assert_eq!(
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/actors/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -912,8 +912,8 @@ impl EventProcessor {

// When invoking the rate limiter, capture if the event item has been rate limited to also
// remove it from the processing state eventually.
let mut envelope_limiter = EnvelopeLimiter::new(|item_scope, _quantity| {
let limits = rate_limiter.is_rate_limited(quotas, item_scope)?;
let mut envelope_limiter = EnvelopeLimiter::new(|item_scope, quantity| {
let limits = rate_limiter.is_rate_limited(quotas, item_scope, quantity)?;
remove_event ^= Some(item_scope.category) == category && limits.is_limited();
Ok(limits)
});
Expand Down
9 changes: 6 additions & 3 deletions tests/integration/test_attachments.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ def test_attachments_quotas(
mini_sentry, relay_with_processing, attachments_consumer, outcomes_consumer,
):
event_id = "515539018c9b4260a6f999572f1661ee"
attachment_body = b"blabla"

relay = relay_with_processing()
relay.wait_relay_healthcheck()
Expand All @@ -181,17 +182,19 @@ def test_attachments_quotas(
"id": "test_rate_limiting_{}".format(uuid.uuid4().hex),
"categories": ["attachment"],
"window": 3600,
"limit": 5, # TODO: Test attachment size quota once implemented
"limit": 5 * len(attachment_body),
"reasonCode": "attachments_exceeded",
}
]

attachments_consumer = attachments_consumer()
outcomes_consumer = outcomes_consumer()
attachments = [("att_1", "foo.txt", b"blabla")]
attachments = [("att_1", "foo.txt", attachment_body)]

for i in range(5):
relay.send_attachments(42, event_id, [("att_1", "%s.txt" % i, b"")])
relay.send_attachments(42, event_id, [("att_1", "%s.txt" % i, attachment_body)])
chunk, _ = attachments_consumer.get_attachment_chunk()
assert chunk == attachment_body
attachment = attachments_consumer.get_individual_attachment()
assert attachment["attachment"]["name"] == "%s.txt" % i

Expand Down