Skip to content

Commit 729c20f

Browse files
committed
Merge branch 'master' into feat/metrics-ingestion
* master: feat(stats): Add quantity to TrackOutcome and new attachment outcomes (#942)
2 parents 7168aea + dacdb00 commit 729c20f

File tree

10 files changed

+113
-57
lines changed

10 files changed

+113
-57
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
**Internal**:
66

7+
- Emit the `quantity` field for outcomes of events. This field describes the total size in bytes for attachments or the event count for all other categories. A separate outcome is emitted for attachments in a rejected envelope, if any, in addition to the event outcome. ([#942](https://github.com/getsentry/relay/pull/942))
78
- Add experimental metrics ingestion without bucketing or pre-aggregation. ([#948](https://github.com/getsentry/relay/pull/948))
89

910
## 21.3.0

relay-server/src/actors/events.rs

+34-19
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ use crate::envelope::{self, AttachmentType, ContentType, Envelope, Item, ItemTyp
3737
use crate::http::{HttpError, RequestBuilder};
3838
use crate::metrics::{RelayCounters, RelayHistograms, RelaySets, RelayTimers};
3939
use crate::service::ServerError;
40-
use crate::utils::{self, ChunkedFormDataAggregator, FormDataIter, FutureExt, SamplingResult};
40+
use crate::utils::{
41+
self, ChunkedFormDataAggregator, EnvelopeSummary, FormDataIter, FutureExt, SamplingResult,
42+
};
4143

4244
#[cfg(feature = "processing")]
4345
use {
@@ -1522,19 +1524,19 @@ impl Handler<HandleEnvelope> for EventManager {
15221524
start_time,
15231525
sampling_project,
15241526
} = message;
1525-
let event_category = envelope.get_event_category();
15261527

15271528
let event_id = envelope.event_id();
15281529
let remote_addr = envelope.meta().client_addr();
15291530

15301531
let scoping = Rc::new(RefCell::new(envelope.meta().get_partial_scoping()));
15311532
let is_received = Rc::new(AtomicBool::from(false));
1533+
let envelope_summary = Rc::new(RefCell::new(EnvelopeSummary::compute(&envelope)));
15321534

15331535
let future = project
15341536
.send(CheckEnvelope::fetched(envelope))
15351537
.map_err(ProcessingError::ScheduleFailed)
15361538
.and_then(|result| result.map_err(ProcessingError::ProjectFailed))
1537-
.and_then(clone!(scoping, |response| {
1539+
.and_then(clone!(scoping, envelope_summary, |response| {
15381540
// Use the project id from the loaded project state to account for redirects.
15391541
let project_id = response.scoping.project_id.value();
15401542
metric!(set(RelaySets::UniqueProjects) = project_id as i64);
@@ -1543,7 +1545,10 @@ impl Handler<HandleEnvelope> for EventManager {
15431545

15441546
let checked = response.result.map_err(ProcessingError::EventRejected)?;
15451547
match checked.envelope {
1546-
Some(envelope) => Ok(envelope),
1548+
Some(envelope) => {
1549+
envelope_summary.replace(EnvelopeSummary::compute(&envelope));
1550+
Ok(envelope)
1551+
}
15471552
None => Err(ProcessingError::RateLimited(checked.rate_limits)),
15481553
}
15491554
}))
@@ -1577,6 +1582,7 @@ impl Handler<HandleEnvelope> for EventManager {
15771582
})
15781583
.map_err(ProcessingError::ScheduleFailed)
15791584
.flatten()
1585+
// TODO: Update envelope_summary once the rate-limiting code emits its own outcomes.
15801586
})
15811587
.and_then(clone!(project, |processed| {
15821588
let rate_limits = processed.rate_limits;
@@ -1714,13 +1720,6 @@ impl Handler<HandleEnvelope> for EventManager {
17141720
}
17151721
}
17161722

1717-
// Envelopes not containing events (such as standalone attachment uploads or user
1718-
// reports) should never create outcomes.
1719-
let category = match event_category {
1720-
Some(event_category) => event_category,
1721-
None => return,
1722-
};
1723-
17241723
let outcome = error.to_outcome();
17251724
if let Some(Outcome::Invalid(DiscardReason::Internal)) = outcome {
17261725
// Errors are only logged for what we consider an internal discard reason. These
@@ -1738,15 +1737,31 @@ impl Handler<HandleEnvelope> for EventManager {
17381737
return;
17391738
}
17401739

1740+
let envelope_summary = envelope_summary.borrow();
17411741
if let Some(outcome) = outcome {
1742-
outcome_producer.do_send(TrackOutcome {
1743-
timestamp: Instant::now(),
1744-
scoping: *scoping.borrow(),
1745-
outcome,
1746-
event_id,
1747-
remote_addr,
1748-
category,
1749-
})
1742+
if let Some(category) = envelope_summary.event_category {
1743+
outcome_producer.do_send(TrackOutcome {
1744+
timestamp: Instant::now(),
1745+
scoping: *scoping.borrow(),
1746+
outcome: outcome.clone(),
1747+
event_id,
1748+
remote_addr,
1749+
category,
1750+
quantity: 1,
1751+
});
1752+
}
1753+
1754+
if envelope_summary.attachment_quantity > 0 {
1755+
outcome_producer.do_send(TrackOutcome {
1756+
timestamp: start_time,
1757+
scoping: *scoping.borrow(),
1758+
outcome,
1759+
event_id,
1760+
remote_addr,
1761+
category: DataCategory::Attachment,
1762+
quantity: envelope_summary.attachment_quantity,
1763+
});
1764+
}
17501765
}
17511766
})
17521767
.then(move |x, slf, _| {

relay-server/src/actors/outcome.rs

+6
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ pub struct TrackOutcome {
8484
pub remote_addr: Option<IpAddr>,
8585
/// The event's data category.
8686
pub category: DataCategory,
87+
/// The number of events or total attachment size in bytes.
88+
pub quantity: usize,
8789
}
8890

8991
impl Message for TrackOutcome {
@@ -316,6 +318,9 @@ pub struct TrackRawOutcome {
316318
/// The event's data category.
317319
#[serde(default, skip_serializing_if = "Option::is_none")]
318320
pub category: Option<u8>,
321+
/// The number of events or total attachment size in bytes.
322+
#[serde(default, skip_serializing_if = "Option::is_none")]
323+
pub quantity: Option<usize>,
319324
}
320325

321326
impl TrackRawOutcome {
@@ -351,6 +356,7 @@ impl TrackRawOutcome {
351356
remote_addr: msg.remote_addr.map(|addr| addr.to_string()),
352357
source,
353358
category: msg.category.value(),
359+
quantity: Some(msg.quantity),
354360
}
355361
}
356362
}

relay-server/src/endpoints/common.rs

+25-9
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use failure::Fail;
1111
use futures::prelude::*;
1212
use serde::Deserialize;
1313

14-
use relay_common::{clone, metric, tryf};
14+
use relay_common::{clone, metric, tryf, DataCategory};
1515
use relay_config::Config;
1616
use relay_general::protocol::{EventId, EventType};
1717
use relay_log::LogError;
@@ -26,7 +26,7 @@ use crate::envelope::{AttachmentType, Envelope, EnvelopeError, ItemType, Items};
2626
use crate::extractors::RequestMeta;
2727
use crate::metrics::RelayCounters;
2828
use crate::service::{ServiceApp, ServiceState};
29-
use crate::utils::{self, ApiErrorResponse, FormDataIter, MultipartError};
29+
use crate::utils::{self, ApiErrorResponse, EnvelopeSummary, FormDataIter, MultipartError};
3030

3131
#[derive(Fail, Debug)]
3232
pub enum BadStoreRequest {
@@ -411,19 +411,19 @@ where
411411

412412
let scoping = Rc::new(RefCell::new(meta.get_partial_scoping()));
413413
let event_id = Rc::new(RefCell::new(None));
414-
let event_category = Rc::new(RefCell::new(None));
414+
let envelope_summary = Rc::new(RefCell::new(EnvelopeSummary::empty()));
415415
let config = request.state().config();
416416
let processing_enabled = config.processing_enabled();
417417

418418
let future = project_manager
419419
.send(GetProject { public_key })
420420
.map_err(BadStoreRequest::ScheduleFailed)
421-
.and_then(clone!(event_id, event_category, scoping, |project| {
421+
.and_then(clone!(event_id, envelope_summary, scoping, |project| {
422422
extract_envelope(&request, meta)
423423
.into_future()
424-
.and_then(clone!(event_id, event_category, |envelope| {
424+
.and_then(clone!(event_id, envelope_summary, |envelope| {
425425
event_id.replace(envelope.event_id());
426-
event_category.replace(envelope.get_event_category());
426+
envelope_summary.replace(EnvelopeSummary::compute(&envelope));
427427

428428
if envelope.is_empty() {
429429
Err(BadStoreRequest::EmptyEnvelope)
@@ -447,6 +447,8 @@ where
447447
Some(envelope) => envelope,
448448
None => return Err(BadStoreRequest::RateLimited(checked.rate_limits)),
449449
};
450+
// TODO: Update envelope_summary from checked.envelope, once the rate-limiting
451+
// code in CheckEnvelope emits its own outcomes.
450452

451453
if check_envelope_size_limits(&config, &envelope) {
452454
Ok((envelope, checked.rate_limits))
@@ -531,15 +533,29 @@ where
531533
.or_else(move |error: BadStoreRequest| {
532534
metric!(counter(RelayCounters::EnvelopeRejected) += 1);
533535

534-
if let Some(category) = *event_category.borrow() {
535-
if let Some(outcome) = error.to_outcome() {
536+
let envelope_summary = envelope_summary.borrow();
537+
if let Some(outcome) = error.to_outcome() {
538+
if let Some(category) = envelope_summary.event_category {
536539
outcome_producer.do_send(TrackOutcome {
537540
timestamp: start_time,
538541
scoping: *scoping.borrow(),
539-
outcome,
542+
outcome: outcome.clone(),
540543
event_id: *event_id.borrow(),
541544
remote_addr,
542545
category,
546+
quantity: 1,
547+
});
548+
}
549+
550+
if envelope_summary.attachment_quantity > 0 {
551+
outcome_producer.do_send(TrackOutcome {
552+
timestamp: start_time,
553+
scoping: *scoping.borrow(),
554+
outcome,
555+
event_id: *event_id.borrow(),
556+
remote_addr,
557+
category: DataCategory::Attachment,
558+
quantity: envelope_summary.attachment_quantity,
543559
});
544560
}
545561
}

relay-server/src/envelope.rs

+1-9
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,13 @@ use failure::Fail;
4141
use serde::{de::DeserializeOwned, Deserialize, Serialize};
4242
use smallvec::SmallVec;
4343

44-
use relay_common::DataCategory;
4544
use relay_general::protocol::{EventId, EventType};
4645
use relay_general::types::Value;
4746
use relay_sampling::TraceContext;
4847

4948
use crate::constants::DEFAULT_EVENT_RETENTION;
5049
use crate::extractors::{PartialMeta, RequestMeta};
51-
use crate::utils::{infer_event_category, ErrorBoundary};
50+
use crate::utils::ErrorBoundary;
5251

5352
pub const CONTENT_TYPE: &str = "application/x-sentry-envelope";
5453

@@ -961,13 +960,6 @@ impl Envelope {
961960
.write_all(buf)
962961
.map_err(EnvelopeError::PayloadIoFailed)
963962
}
964-
965-
/// Return the data category type of the event item, if any, in this envelope.
966-
pub fn get_event_category(&self) -> Option<DataCategory> {
967-
self.items().find_map(infer_event_category)
968-
// There are some cases where multiple items may have different categories, but returning
969-
// the first is good enough for now.
970-
}
971963
}
972964

973965
#[cfg(test)]

relay-server/src/utils/rate_limits.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ pub fn parse_rate_limits(scoping: &Scoping, string: &str) -> RateLimits {
8585
/// to be set on the event item.
8686
/// - `Attachment`: If the attachment creates an event (e.g. for minidumps), the category is assumed
8787
/// to be `Error`.
88-
pub fn infer_event_category(item: &Item) -> Option<DataCategory> {
88+
fn infer_event_category(item: &Item) -> Option<DataCategory> {
8989
match item.ty() {
9090
ItemType::Event => Some(DataCategory::Error),
9191
ItemType::Transaction => Some(DataCategory::Transaction),

tests/integration/fixtures/processing.py

+34-15
Original file line numberDiff line numberDiff line change
@@ -207,23 +207,42 @@ def category_value(category):
207207

208208

209209
class OutcomesConsumer(ConsumerBase):
210-
def get_outcome(self):
211-
outcome = self.poll()
212-
assert outcome is not None
213-
assert outcome.error() is None
214-
return json.loads(outcome.value())
210+
def _poll_all(self):
211+
while True:
212+
outcome = self.poll()
213+
if outcome is None:
214+
return
215+
else:
216+
yield outcome
217+
218+
def get_outcomes(self):
219+
outcomes = list(self._poll_all())
220+
for outcome in outcomes:
221+
assert outcome.error() is None
222+
return [json.loads(outcome.value()) for outcome in outcomes]
215223

216-
def assert_rate_limited(self, reason, key_id=None, category=None):
217-
outcome = self.get_outcome()
218-
assert outcome["outcome"] == 2, outcome
219-
assert outcome["reason"] == reason
220-
if key_id is not None:
221-
assert outcome["key_id"] == key_id
222-
if category is not None:
223-
value = category_value(category)
224-
assert outcome["category"] == value, outcome["category"]
225-
else:
224+
def get_outcome(self):
225+
outcomes = self.get_outcomes()
226+
assert len(outcomes) > 0, "No outcomes were consumed"
227+
assert len(outcomes) == 1, "More than one outcome was consumed"
228+
return outcomes[0]
229+
230+
def assert_rate_limited(self, reason, key_id=None, categories=None):
231+
if categories is None:
232+
outcome = self.get_outcome()
226233
assert isinstance(outcome["category"], int)
234+
outcomes = [outcome]
235+
else:
236+
outcomes = self.get_outcomes()
237+
expected = set(category_value(category) for category in categories)
238+
actual = set(outcome["category"] for outcome in outcomes)
239+
assert actual == expected, (actual, expected)
240+
241+
for outcome in outcomes:
242+
assert outcome["outcome"] == 2, outcome
243+
assert outcome["reason"] == reason
244+
if key_id is not None:
245+
assert outcome["key_id"] == key_id
227246

228247
def assert_dropped_internal(self):
229248
outcome = self.get_outcome()

tests/integration/test_minidump.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -464,11 +464,15 @@ def test_minidump_ratelimit(
464464

465465
# First minidump returns 200 but is rate limited in processing
466466
relay.send_minidump(project_id=project_id, files=attachments)
467-
outcomes_consumer.assert_rate_limited("static_disabled_quota", category="error")
467+
outcomes_consumer.assert_rate_limited(
468+
"static_disabled_quota", categories=["error", "attachment"]
469+
)
468470

469471
# Minidumps never return rate limits
470472
relay.send_minidump(project_id=project_id, files=attachments)
471-
outcomes_consumer.assert_rate_limited("static_disabled_quota", category="error")
473+
outcomes_consumer.assert_rate_limited(
474+
"static_disabled_quota", categories=["error", "attachment"]
475+
)
472476

473477

474478
def test_crashpad_annotations(mini_sentry, relay_with_processing, attachments_consumer):

tests/integration/test_outcome.py

+3
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ def test_outcomes_non_processing(relay, mini_sentry, event_type):
104104
"event_id": event_id,
105105
"remote_addr": "127.0.0.1",
106106
"category": 2 if event_type == "transaction" else 1,
107+
"quantity": 1,
107108
}
108109
assert outcome == expected_outcome
109110

@@ -303,6 +304,7 @@ def test_outcome_forwarding(
303304
"event_id": event_id,
304305
"remote_addr": "127.0.0.1",
305306
"category": 2 if event_type == "transaction" else 1,
307+
"quantity": 1,
306308
}
307309
outcome.pop("timestamp")
308310

@@ -377,6 +379,7 @@ def test_outcomes_forwarding_rate_limited(
377379
"event_id": event_id,
378380
"source": "processing-layer",
379381
"category": 1,
382+
"quantity": 1,
380383
}
381384
assert outcome == expected_outcome
382385

tests/integration/test_store.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ def test_processing_quotas(
538538

539539
if outcomes_consumer is not None:
540540
outcomes_consumer.assert_rate_limited(
541-
"get_lost", key_id=key_id, category=category
541+
"get_lost", key_id=key_id, categories=[category]
542542
)
543543
else:
544544
# since we don't wait for the outcome, wait a little for the event to go through
@@ -558,7 +558,7 @@ def test_processing_quotas(
558558
assert rest == "%s:key:get_lost" % category
559559
if outcomes_consumer is not None:
560560
outcomes_consumer.assert_rate_limited(
561-
"get_lost", key_id=key_id, category=category
561+
"get_lost", key_id=key_id, categories=[category]
562562
)
563563

564564
for i in range(10):

0 commit comments

Comments
 (0)