use std::fmt; use std::str::FromStr; use std::sync::{Arc, Mutex, PoisonError}; use std::time::{Duration, Instant}; use relay_base_schema::metrics::MetricNamespace; use relay_base_schema::organization::OrganizationId; use relay_base_schema::project::{ProjectId, ProjectKey}; use smallvec::SmallVec; use crate::quota::{DataCategories, ItemScoping, Quota, QuotaScope, ReasonCode, Scoping}; use crate::REJECT_ALL_SECS; /// A monotonic expiration marker for `RateLimit`s. /// /// `RetryAfter` marks an instant at which a rate limit expires, which is indicated by `expired`. It /// can convert into the remaining time until expiration. #[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct RetryAfter { when: Instant, } impl RetryAfter { /// Creates a retry after instance. #[inline] pub fn from_secs(seconds: u64) -> Self { let now = Instant::now(); let when = now.checked_add(Duration::from_secs(seconds)).unwrap_or(now); Self { when } } /// Returns the remaining duration until the rate limit expires using the passed instant as the /// reference point. #[inline] pub fn remaining_at(self, at: Instant) -> Option<Duration> { if at >= self.when { None } else { Some(self.when - at) } } /// Returns the remaining duration until the rate limit expires. #[inline] pub fn remaining(self) -> Option<Duration> { self.remaining_at(Instant::now()) } /// Returns the remaining seconds until the rate limit expires using the passed instant as the /// reference point. /// /// This is a shortcut to `retry_after.remaining().as_secs()` with one exception: If the rate /// limit has expired, this function returns `0`. #[inline] pub fn remaining_seconds_at(self, at: Instant) -> u64 { match self.remaining_at(at) { // Compensate for the missing subsec part by adding 1s Some(duration) if duration.subsec_nanos() == 0 => duration.as_secs(), Some(duration) => duration.as_secs() + 1, None => 0, } } /// Returns the remaining seconds until the rate limit expires. #[inline] pub fn remaining_seconds(self) -> u64 { self.remaining_seconds_at(Instant::now()) } /// Returns whether this rate limit has expired at the passed instant. #[inline] pub fn expired_at(self, at: Instant) -> bool { self.remaining_at(at).is_none() } /// Returns whether this rate limit has expired. #[inline] pub fn expired(self) -> bool { self.remaining_at(Instant::now()).is_none() } } impl fmt::Debug for RetryAfter { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self.remaining_seconds() { 0 => write!(f, "RetryAfter(expired)"), remaining => write!(f, "RetryAfter({remaining}s)"), } } } #[cfg(test)] impl serde::Serialize for RetryAfter { fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where S: serde::Serializer, { use serde::ser::SerializeTupleStruct; let mut tup = serializer.serialize_tuple_struct("RetryAfter", 1)?; tup.serialize_field(&self.remaining_seconds())?; tup.end() } } #[derive(Debug)] /// Error parsing a `RetryAfter`. pub enum InvalidRetryAfter { /// The supplied delay in seconds was not valid. InvalidDelay(std::num::ParseFloatError), } impl FromStr for RetryAfter { type Err = InvalidRetryAfter; fn from_str(s: &str) -> Result<Self, Self::Err> { let float = s.parse::<f64>().map_err(InvalidRetryAfter::InvalidDelay)?; let seconds = float.max(0.0).ceil() as u64; Ok(RetryAfter::from_secs(seconds)) } } /// The scope that a rate limit applied to. /// /// As opposed to `QuotaScope`, which only declared the class of the scope, this also carries /// information about the scope instance. That is, the specific identifiers of the individual scopes /// that a rate limit applied to. #[derive(Clone, Debug, Eq, PartialEq)] #[cfg_attr(test, derive(serde::Serialize))] pub enum RateLimitScope { /// Global scope. Global, /// An organization with identifier. Organization(OrganizationId), /// A project with identifier. Project(ProjectId), /// A DSN public key. Key(ProjectKey), } impl RateLimitScope { /// Extracts a rate limiting scope from the given item scoping for a specific quota. pub fn for_quota(scoping: &Scoping, scope: QuotaScope) -> Self { match scope { QuotaScope::Global => Self::Global, QuotaScope::Organization => Self::Organization(scoping.organization_id), QuotaScope::Project => Self::Project(scoping.project_id), QuotaScope::Key => Self::Key(scoping.project_key), // For unknown scopes, assume the most specific scope: QuotaScope::Unknown => Self::Key(scoping.project_key), } } /// Returns the canonical name of this scope. pub fn name(&self) -> &'static str { match *self { Self::Global => QuotaScope::Global.name(), Self::Key(_) => QuotaScope::Key.name(), Self::Project(_) => QuotaScope::Project.name(), Self::Organization(_) => QuotaScope::Organization.name(), } } } /// A bounded rate limit. #[derive(Clone, Debug, PartialEq)] #[cfg_attr(test, derive(serde::Serialize))] pub struct RateLimit { /// A set of data categories that this quota applies to. If missing or empty, this rate limit /// applies to all data. pub categories: DataCategories, /// The scope of this rate limit. pub scope: RateLimitScope, /// A machine readable reason indicating which quota caused it. pub reason_code: Option<ReasonCode>, /// A marker when this rate limit expires. pub retry_after: RetryAfter, /// The metric namespace of this rate limit. /// /// Ignored on all data categories except for `MetricBucket`. If empty, this rate limit applies /// to metrics of all namespaces. pub namespaces: SmallVec<[MetricNamespace; 1]>, } impl RateLimit { /// Creates a new rate limit for the given `Quota`. pub fn from_quota(quota: &Quota, scoping: &Scoping, retry_after: RetryAfter) -> Self { Self { categories: quota.categories.clone(), scope: RateLimitScope::for_quota(scoping, quota.scope), reason_code: quota.reason_code.clone(), retry_after, namespaces: quota.namespace.into_iter().collect(), } } /// Checks whether the rate limit applies to the given item. pub fn matches(&self, scoping: ItemScoping<'_>) -> bool { self.matches_scope(scoping) && scoping.matches_categories(&self.categories) && scoping.matches_namespaces(&self.namespaces) } /// Returns `true` if the rate limiting scope matches the given item. fn matches_scope(&self, scoping: ItemScoping<'_>) -> bool { match self.scope { RateLimitScope::Global => true, RateLimitScope::Organization(org_id) => scoping.organization_id == org_id, RateLimitScope::Project(project_id) => scoping.project_id == project_id, RateLimitScope::Key(ref key) => scoping.project_key == *key, } } } /// A collection of scoped rate limits. /// /// This collection may be empty, indicated by `is_ok`. If this instance carries rate limits, they /// can be iterated over using `iter`. Additionally, rate limits can be checked for items by /// invoking `check` with the respective `ItemScoping`. #[derive(Clone, Debug, Default)] #[cfg_attr(test, derive(serde::Serialize))] pub struct RateLimits { limits: Vec<RateLimit>, } impl RateLimits { /// Creates an empty RateLimits instance. pub fn new() -> Self { Self::default() } /// Adds a limit to this collection. /// /// If a rate limit with an overlapping scope already exists, the `retry_after` count is merged /// with the existing limit. Otherwise, the new rate limit is added. pub fn add(&mut self, mut limit: RateLimit) { // Categories are logically a set, but not implemented as such. limit.categories.sort(); let limit_opt = self.limits.iter_mut().find(|l| { let RateLimit { categories, scope, reason_code: _, retry_after: _, namespaces: namespace, } = &limit; *categories == l.categories && *scope == l.scope && *namespace == l.namespaces }); match limit_opt { None => self.limits.push(limit), Some(existing) if existing.retry_after < limit.retry_after => *existing = limit, Some(_) => (), // keep existing, longer limit } } /// Merges all limits into this instance. /// /// This keeps all existing rate limits, adding new ones, and updating ones with a longer /// `retry_after` count. The resulting `RateLimits` contains the merged maximum. pub fn merge(&mut self, limits: Self) { for limit in limits { self.add(limit); } } /// Returns `true` if this instance contains no active limits. pub fn is_ok(&self) -> bool { !self.is_limited() } /// Returns `true` if this instance contains active rate limits. pub fn is_limited(&self) -> bool { let now = Instant::now(); self.iter().any(|limit| !limit.retry_after.expired_at(now)) } /// Removes expired rate limits from this instance. pub fn clean_expired(&mut self, now: Instant) { self.limits .retain(|limit| !limit.retry_after.expired_at(now)); } /// Checks whether any rate limits apply to the given scoping. /// /// If no limits match, then the returned `RateLimits` instance evaluates `is_ok`. Otherwise, it /// contains rate limits that match the given scoping. pub fn check(&self, scoping: ItemScoping<'_>) -> Self { self.check_with_quotas(&[], scoping) } /// Checks whether any rate limits apply to the given scoping. /// /// This is similar to `check`. Additionally, it checks for quotas with a static limit `0`, and /// rejects items even if there is no active rate limit in this instance. /// /// If no limits or quotas match, then the returned `RateLimits` instance evaluates `is_ok`. /// Otherwise, it contains rate limits that match the given scoping. pub fn check_with_quotas<'a>( &self, quotas: impl IntoIterator<Item = &'a Quota>, scoping: ItemScoping<'_>, ) -> Self { let mut applied_limits = Self::new(); for quota in quotas { if quota.limit == Some(0) && quota.matches(scoping) { let retry_after = RetryAfter::from_secs(REJECT_ALL_SECS); applied_limits.add(RateLimit::from_quota(quota, &scoping, retry_after)); } } for limit in &self.limits { if limit.matches(scoping) { applied_limits.add(limit.clone()); } } applied_limits } /// Returns an iterator over the rate limits. pub fn iter(&self) -> RateLimitsIter<'_> { RateLimitsIter { iter: self.limits.iter(), } } /// Returns the longest rate limit. /// /// If multiple rate limits have the same retry after count, any of the limits is returned. pub fn longest(&self) -> Option<&RateLimit> { self.iter().max_by_key(|limit| limit.retry_after) } /// Returns `true` if there are any limits contained. /// /// This is equivalent to checking whether [`Self::longest`] returns `Some` /// or [`Self::iter`] returns an iterator with at least one item. pub fn is_empty(&self) -> bool { self.limits.is_empty() } } /// Immutable rate limits iterator. /// /// This struct is created by the `iter` method on `RateLimits`. pub struct RateLimitsIter<'a> { iter: std::slice::Iter<'a, RateLimit>, } impl<'a> Iterator for RateLimitsIter<'a> { type Item = &'a RateLimit; fn next(&mut self) -> Option<Self::Item> { self.iter.next() } } impl IntoIterator for RateLimits { type IntoIter = RateLimitsIntoIter; type Item = RateLimit; fn into_iter(self) -> Self::IntoIter { RateLimitsIntoIter { iter: self.limits.into_iter(), } } } /// An iterator that moves out of `RateLimtis`. /// /// This struct is created by the `into_iter` method on `RateLimits`, provided by the `IntoIterator` /// trait. pub struct RateLimitsIntoIter { iter: std::vec::IntoIter<RateLimit>, } impl Iterator for RateLimitsIntoIter { type Item = RateLimit; fn next(&mut self) -> Option<Self::Item> { self.iter.next() } } impl<'a> IntoIterator for &'a RateLimits { type IntoIter = RateLimitsIter<'a>; type Item = &'a RateLimit; fn into_iter(self) -> Self::IntoIter { self.iter() } } /// Like [`RateLimits`], a collection of scoped rate limits but with all the checks /// necessary to cache the limits. /// /// The data structure makes sure no expired rate limits are enforced. #[derive(Debug, Default)] pub struct CachedRateLimits(Mutex<Arc<RateLimits>>); impl CachedRateLimits { /// Creates a new, empty instance without any rate limits enforced. pub fn new() -> Self { Self::default() } /// Adds a limit to this collection. /// /// See also: [`RateLimits::add`]. pub fn add(&self, limit: RateLimit) { let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner); let current = Arc::make_mut(&mut inner); current.add(limit); } /// Merges more rate limits into this instance. /// /// See also: [`RateLimits::merge`]. pub fn merge(&self, limits: RateLimits) { let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner); let current = Arc::make_mut(&mut inner); for limit in limits { current.add(limit) } } /// Returns a reference to the contained rate limits. /// /// This call guarantees that at the time of call no returned rate limit is expired. pub fn current_limits(&self) -> Arc<RateLimits> { let now = Instant::now(); let mut inner = self.0.lock().unwrap_or_else(PoisonError::into_inner); Arc::make_mut(&mut inner).clean_expired(now); Arc::clone(&inner) } } #[cfg(test)] mod tests { use smallvec::smallvec; use super::*; use crate::quota::DataCategory; use crate::MetricNamespaceScoping; #[test] fn test_parse_retry_after() { // positive float always rounds up to the next integer let retry_after = "17.1".parse::<RetryAfter>().expect("parse RetryAfter"); assert_eq!(retry_after.remaining_seconds(), 18); assert!(!retry_after.expired()); let retry_after = "17.7".parse::<RetryAfter>().expect("parse RetryAfter"); assert_eq!(retry_after.remaining_seconds(), 18); assert!(!retry_after.expired()); // positive int let retry_after = "17".parse::<RetryAfter>().expect("parse RetryAfter"); assert_eq!(retry_after.remaining_seconds(), 17); assert!(!retry_after.expired()); // negative numbers are treated as zero let retry_after = "-2".parse::<RetryAfter>().expect("parse RetryAfter"); assert_eq!(retry_after.remaining_seconds(), 0); assert!(retry_after.expired()); let retry_after = "-inf".parse::<RetryAfter>().expect("parse RetryAfter"); assert_eq!(retry_after.remaining_seconds(), 0); assert!(retry_after.expired()); // inf and NaN are valid input and treated as zero let retry_after = "inf".parse::<RetryAfter>().expect("parse RetryAfter"); assert_eq!(retry_after.remaining_seconds(), 0); assert!(retry_after.expired()); let retry_after = "NaN".parse::<RetryAfter>().expect("parse RetryAfter"); assert_eq!(retry_after.remaining_seconds(), 0); assert!(retry_after.expired()); // large inputs that would overflow are treated as zero let retry_after = "100000000000000000000" .parse::<RetryAfter>() .expect("parse RetryAfter"); assert_eq!(retry_after.remaining_seconds(), 0); assert!(retry_after.expired()); // invalid strings cause parse error "".parse::<RetryAfter>().expect_err("error RetryAfter"); "nope".parse::<RetryAfter>().expect_err("error RetryAfter"); " 2 ".parse::<RetryAfter>().expect_err("error RetryAfter"); "6 0".parse::<RetryAfter>().expect_err("error RetryAfter"); } #[test] fn test_rate_limit_matches_categories() { let rate_limit = RateLimit { categories: smallvec![DataCategory::Unknown, DataCategory::Error], scope: RateLimitScope::Organization(OrganizationId::new(42)), reason_code: None, retry_after: RetryAfter::from_secs(1), namespaces: smallvec![], }; assert!(rate_limit.matches(ItemScoping { category: DataCategory::Error, scoping: &Scoping { organization_id: OrganizationId::new(42), project_id: ProjectId::new(21), project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: None, }, namespace: MetricNamespaceScoping::None, })); assert!(!rate_limit.matches(ItemScoping { category: DataCategory::Transaction, scoping: &Scoping { organization_id: OrganizationId::new(42), project_id: ProjectId::new(21), project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: None, }, namespace: MetricNamespaceScoping::None, })); } #[test] fn test_rate_limit_matches_organization() { let rate_limit = RateLimit { categories: DataCategories::new(), scope: RateLimitScope::Organization(OrganizationId::new(42)), reason_code: None, retry_after: RetryAfter::from_secs(1), namespaces: smallvec![], }; assert!(rate_limit.matches(ItemScoping { category: DataCategory::Error, scoping: &Scoping { organization_id: OrganizationId::new(42), project_id: ProjectId::new(21), project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: None, }, namespace: MetricNamespaceScoping::None, })); assert!(!rate_limit.matches(ItemScoping { category: DataCategory::Error, scoping: &Scoping { organization_id: OrganizationId::new(0), project_id: ProjectId::new(21), project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: None, }, namespace: MetricNamespaceScoping::None, })); } #[test] fn test_rate_limit_matches_project() { let rate_limit = RateLimit { categories: DataCategories::new(), scope: RateLimitScope::Project(ProjectId::new(21)), reason_code: None, retry_after: RetryAfter::from_secs(1), namespaces: smallvec![], }; assert!(rate_limit.matches(ItemScoping { category: DataCategory::Error, scoping: &Scoping { organization_id: OrganizationId::new(42), project_id: ProjectId::new(21), project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: None, }, namespace: MetricNamespaceScoping::None, })); assert!(!rate_limit.matches(ItemScoping { category: DataCategory::Error, scoping: &Scoping { organization_id: OrganizationId::new(42), project_id: ProjectId::new(0), project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: None, }, namespace: MetricNamespaceScoping::None, })); } #[test] fn test_rate_limit_matches_namespaces() { let rate_limit = RateLimit { categories: smallvec![], scope: RateLimitScope::Organization(OrganizationId::new(42)), reason_code: None, retry_after: RetryAfter::from_secs(1), namespaces: smallvec![MetricNamespace::Custom], }; let scoping = Scoping { organization_id: OrganizationId::new(42), project_id: ProjectId::new(21), project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: None, }; assert!(rate_limit.matches(ItemScoping { category: DataCategory::MetricBucket, scoping: &scoping, namespace: MetricNamespaceScoping::Some(MetricNamespace::Custom), })); assert!(!rate_limit.matches(ItemScoping { category: DataCategory::MetricBucket, scoping: &scoping, namespace: MetricNamespaceScoping::Some(MetricNamespace::Spans), })); let general_rate_limit = RateLimit { categories: smallvec![], scope: RateLimitScope::Organization(OrganizationId::new(42)), reason_code: None, retry_after: RetryAfter::from_secs(1), namespaces: smallvec![], // all namespaces }; assert!(general_rate_limit.matches(ItemScoping { category: DataCategory::MetricBucket, scoping: &scoping, namespace: MetricNamespaceScoping::Some(MetricNamespace::Spans), })); assert!(general_rate_limit.matches(ItemScoping { category: DataCategory::MetricBucket, scoping: &scoping, namespace: MetricNamespaceScoping::None, })); } #[test] fn test_rate_limit_matches_key() { let rate_limit = RateLimit { categories: DataCategories::new(), scope: RateLimitScope::Key( ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), ), reason_code: None, retry_after: RetryAfter::from_secs(1), namespaces: smallvec![], }; assert!(rate_limit.matches(ItemScoping { category: DataCategory::Error, scoping: &Scoping { organization_id: OrganizationId::new(42), project_id: ProjectId::new(21), project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: None, }, namespace: MetricNamespaceScoping::None, })); assert!(!rate_limit.matches(ItemScoping { category: DataCategory::Error, scoping: &Scoping { organization_id: OrganizationId::new(0), project_id: ProjectId::new(21), project_key: ProjectKey::parse("deadbeefdeadbeefdeadbeefdeadbeef").unwrap(), key_id: None, }, namespace: MetricNamespaceScoping::None, })); } #[test] fn test_rate_limits_add_replacement() { let mut rate_limits = RateLimits::new(); rate_limits.add(RateLimit { categories: smallvec![DataCategory::Default, DataCategory::Error], scope: RateLimitScope::Organization(OrganizationId::new(42)), reason_code: Some(ReasonCode::new("first")), retry_after: RetryAfter::from_secs(1), namespaces: smallvec![], }); // longer rate limit shadows shorter one rate_limits.add(RateLimit { categories: smallvec![DataCategory::Error, DataCategory::Default], scope: RateLimitScope::Organization(OrganizationId::new(42)), reason_code: Some(ReasonCode::new("second")), retry_after: RetryAfter::from_secs(10), namespaces: smallvec![], }); insta::assert_ron_snapshot!(rate_limits, @r###" RateLimits( limits: [ RateLimit( categories: [ default, error, ], scope: Organization(OrganizationId(42)), reason_code: Some(ReasonCode("second")), retry_after: RetryAfter(10), namespaces: [], ), ], ) "###); } #[test] fn test_rate_limits_add_shadowing() { let mut rate_limits = RateLimits::new(); rate_limits.add(RateLimit { categories: smallvec![DataCategory::Default, DataCategory::Error], scope: RateLimitScope::Organization(OrganizationId::new(42)), reason_code: Some(ReasonCode::new("first")), retry_after: RetryAfter::from_secs(10), namespaces: smallvec![], }); // shorter rate limit is shadowed by existing one rate_limits.add(RateLimit { categories: smallvec![DataCategory::Error, DataCategory::Default], scope: RateLimitScope::Organization(OrganizationId::new(42)), reason_code: Some(ReasonCode::new("second")), retry_after: RetryAfter::from_secs(1), namespaces: smallvec![], }); insta::assert_ron_snapshot!(rate_limits, @r###" RateLimits( limits: [ RateLimit( categories: [ default, error, ], scope: Organization(OrganizationId(42)), reason_code: Some(ReasonCode("first")), retry_after: RetryAfter(10), namespaces: [], ), ], ) "###); } #[test] fn test_rate_limits_add_buckets() { let mut rate_limits = RateLimits::new(); rate_limits.add(RateLimit { categories: smallvec![DataCategory::Error], scope: RateLimitScope::Organization(OrganizationId::new(42)), reason_code: None, retry_after: RetryAfter::from_secs(1), namespaces: smallvec![], }); // Same scope but different categories rate_limits.add(RateLimit { categories: smallvec![DataCategory::Transaction], scope: RateLimitScope::Organization(OrganizationId::new(42)), reason_code: None, retry_after: RetryAfter::from_secs(1), namespaces: smallvec![], }); // Same categories but different scope rate_limits.add(RateLimit { categories: smallvec![DataCategory::Error], scope: RateLimitScope::Project(ProjectId::new(21)), reason_code: None, retry_after: RetryAfter::from_secs(1), namespaces: smallvec![], }); insta::assert_ron_snapshot!(rate_limits, @r###" RateLimits( limits: [ RateLimit( categories: [ error, ], scope: Organization(OrganizationId(42)), reason_code: None, retry_after: RetryAfter(1), namespaces: [], ), RateLimit( categories: [ transaction, ], scope: Organization(OrganizationId(42)), reason_code: None, retry_after: RetryAfter(1), namespaces: [], ), RateLimit( categories: [ error, ], scope: Project(ProjectId(21)), reason_code: None, retry_after: RetryAfter(1), namespaces: [], ), ], ) "###); } /// Regression test that ensures namespaces are correctly added to rate limits. #[test] fn test_rate_limits_add_namespaces() { let mut rate_limits = RateLimits::new(); rate_limits.add(RateLimit { categories: smallvec![DataCategory::MetricBucket], scope: RateLimitScope::Organization(OrganizationId::new(42)), reason_code: None, retry_after: RetryAfter::from_secs(1), namespaces: smallvec![MetricNamespace::Custom], }); // Same category but different namespaces rate_limits.add(RateLimit { categories: smallvec![DataCategory::MetricBucket], scope: RateLimitScope::Organization(OrganizationId::new(42)), reason_code: None, retry_after: RetryAfter::from_secs(1), namespaces: smallvec![MetricNamespace::Spans], }); insta::assert_ron_snapshot!(rate_limits, @r###" RateLimits( limits: [ RateLimit( categories: [ metric_bucket, ], scope: Organization(OrganizationId(42)), reason_code: None, retry_after: RetryAfter(1), namespaces: [ "custom", ], ), RateLimit( categories: [ metric_bucket, ], scope: Organization(OrganizationId(42)), reason_code: None, retry_after: RetryAfter(1), namespaces: [ "spans", ], ), ], ) "###); } #[test] fn test_rate_limits_longest() { let mut rate_limits = RateLimits::new(); rate_limits.add(RateLimit { categories: smallvec![DataCategory::Error], scope: RateLimitScope::Organization(OrganizationId::new(42)), reason_code: Some(ReasonCode::new("first")), retry_after: RetryAfter::from_secs(1), namespaces: smallvec![], }); // Distinct scope to prevent deduplication rate_limits.add(RateLimit { categories: smallvec![DataCategory::Transaction], scope: RateLimitScope::Organization(OrganizationId::new(42)), reason_code: Some(ReasonCode::new("second")), retry_after: RetryAfter::from_secs(10), namespaces: smallvec![], }); let rate_limit = rate_limits.longest().unwrap(); insta::assert_ron_snapshot!(rate_limit, @r###" RateLimit( categories: [ transaction, ], scope: Organization(OrganizationId(42)), reason_code: Some(ReasonCode("second")), retry_after: RetryAfter(10), namespaces: [], ) "###); } #[test] fn test_rate_limits_clean_expired() { let mut rate_limits = RateLimits::new(); // Active error limit rate_limits.add(RateLimit { categories: smallvec![DataCategory::Error], scope: RateLimitScope::Organization(OrganizationId::new(42)), reason_code: None, retry_after: RetryAfter::from_secs(1), namespaces: smallvec![], }); // Inactive error limit with distinct scope rate_limits.add(RateLimit { categories: smallvec![DataCategory::Error], scope: RateLimitScope::Project(ProjectId::new(21)), reason_code: None, retry_after: RetryAfter::from_secs(0), namespaces: smallvec![], }); // Sanity check before running `clean_expired` assert_eq!(rate_limits.iter().count(), 2); rate_limits.clean_expired(Instant::now()); // Check that the expired limit has been removed insta::assert_ron_snapshot!(rate_limits, @r###" RateLimits( limits: [ RateLimit( categories: [ error, ], scope: Organization(OrganizationId(42)), reason_code: None, retry_after: RetryAfter(1), namespaces: [], ), ], ) "###); } #[test] fn test_rate_limits_check() { let mut rate_limits = RateLimits::new(); // Active error limit rate_limits.add(RateLimit { categories: smallvec![DataCategory::Error], scope: RateLimitScope::Organization(OrganizationId::new(42)), reason_code: None, retry_after: RetryAfter::from_secs(1), namespaces: smallvec![], }); // Active transaction limit rate_limits.add(RateLimit { categories: smallvec![DataCategory::Transaction], scope: RateLimitScope::Organization(OrganizationId::new(42)), reason_code: None, retry_after: RetryAfter::from_secs(1), namespaces: smallvec![], }); let applied_limits = rate_limits.check(ItemScoping { category: DataCategory::Error, scoping: &Scoping { organization_id: OrganizationId::new(42), project_id: ProjectId::new(21), project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: None, }, namespace: MetricNamespaceScoping::None, }); // Check that the error limit is applied insta::assert_ron_snapshot!(applied_limits, @r###" RateLimits( limits: [ RateLimit( categories: [ error, ], scope: Organization(OrganizationId(42)), reason_code: None, retry_after: RetryAfter(1), namespaces: [], ), ], ) "###); } #[test] fn test_rate_limits_check_quotas() { let mut rate_limits = RateLimits::new(); // Active error limit rate_limits.add(RateLimit { categories: smallvec![DataCategory::Error], scope: RateLimitScope::Organization(OrganizationId::new(42)), reason_code: None, retry_after: RetryAfter::from_secs(1), namespaces: smallvec![], }); // Active transaction limit rate_limits.add(RateLimit { categories: smallvec![DataCategory::Transaction], scope: RateLimitScope::Organization(OrganizationId::new(42)), reason_code: None, retry_after: RetryAfter::from_secs(1), namespaces: smallvec![], }); let item_scoping = ItemScoping { category: DataCategory::Error, scoping: &Scoping { organization_id: OrganizationId::new(42), project_id: ProjectId::new(21), project_key: ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(), key_id: None, }, namespace: MetricNamespaceScoping::None, }; let quotas = &[Quota { id: None, categories: smallvec![DataCategory::Error], scope: QuotaScope::Organization, scope_id: Some("42".to_owned()), limit: Some(0), window: None, reason_code: Some(ReasonCode::new("zero")), namespace: None, }]; let applied_limits = rate_limits.check_with_quotas(quotas, item_scoping); insta::assert_ron_snapshot!(applied_limits, @r###" RateLimits( limits: [ RateLimit( categories: [ error, ], scope: Organization(OrganizationId(42)), reason_code: Some(ReasonCode("zero")), retry_after: RetryAfter(60), namespaces: [], ), ], ) "###); } #[test] fn test_rate_limits_merge() { let mut rate_limits1 = RateLimits::new(); let mut rate_limits2 = RateLimits::new(); rate_limits1.add(RateLimit { categories: smallvec![DataCategory::Error], scope: RateLimitScope::Organization(OrganizationId::new(42)), reason_code: Some(ReasonCode::new("first")), retry_after: RetryAfter::from_secs(1), namespaces: smallvec![], }); rate_limits1.add(RateLimit { categories: smallvec![DataCategory::TransactionIndexed], scope: RateLimitScope::Organization(OrganizationId::new(42)), reason_code: None, retry_after: RetryAfter::from_secs(1), namespaces: smallvec![], }); rate_limits2.add(RateLimit { categories: smallvec![DataCategory::Error], scope: RateLimitScope::Organization(OrganizationId::new(42)), reason_code: Some(ReasonCode::new("second")), retry_after: RetryAfter::from_secs(10), namespaces: smallvec![], }); rate_limits1.merge(rate_limits2); insta::assert_ron_snapshot!(rate_limits1, @r###" RateLimits( limits: [ RateLimit( categories: [ error, ], scope: Organization(OrganizationId(42)), reason_code: Some(ReasonCode("second")), retry_after: RetryAfter(10), namespaces: [], ), RateLimit( categories: [ transaction_indexed, ], scope: Organization(OrganizationId(42)), reason_code: None, retry_after: RetryAfter(1), namespaces: [], ), ], ) "###); } #[test] fn test_cached_rate_limits_expired() { let cached = CachedRateLimits::new(); // Active error limit cached.add(RateLimit { categories: smallvec![DataCategory::Error], scope: RateLimitScope::Organization(OrganizationId::new(42)), reason_code: None, retry_after: RetryAfter::from_secs(1), namespaces: smallvec![], }); // Inactive error limit with distinct scope cached.add(RateLimit { categories: smallvec![DataCategory::Error], scope: RateLimitScope::Project(ProjectId::new(21)), reason_code: None, retry_after: RetryAfter::from_secs(0), namespaces: smallvec![], }); let rate_limits = cached.current_limits(); insta::assert_ron_snapshot!(rate_limits, @r###" RateLimits( limits: [ RateLimit( categories: [ error, ], scope: Organization(OrganizationId(42)), reason_code: None, retry_after: RetryAfter(1), namespaces: [], ), ], ) "###); } }