-
Notifications
You must be signed in to change notification settings - Fork 94
/
Copy pathmanaged_envelope.rs
381 lines (338 loc) · 13.6 KB
/
managed_envelope.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
//! Envelope context type and helpers to ensure outcomes.
use std::mem::size_of;
use std::time::Instant;
use chrono::{DateTime, Utc};
use relay_common::DataCategory;
use relay_quotas::Scoping;
use relay_system::Addr;
use crate::actors::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::actors::test_store::{Capture, TestStore};
use crate::envelope::{Envelope, Item};
use crate::extractors::RequestMeta;
use crate::statsd::{RelayCounters, RelayTimers};
use crate::utils::{EnvelopeSummary, SemaphorePermit};
/// Denotes the success of handling an envelope.
#[derive(Clone, Copy, Debug)]
enum Handling {
/// The envelope was handled successfully.
///
/// This can be the case even if the envelpoe was dropped. For example, if a rate limit is in
/// effect or if the corresponding project is disabled.
Success,
/// Handling the envelope failed due to an error or bug.
Failure,
}
impl Handling {
fn from_outcome(outcome: &Outcome) -> Self {
if outcome.is_unexpected() {
Self::Failure
} else {
Self::Success
}
}
fn as_str(&self) -> &str {
match self {
Handling::Success => "success",
Handling::Failure => "failure",
}
}
}
/// Represents the decision on whether or not to keep an envelope item.
pub enum ItemAction {
/// Keep the item.
Keep,
/// Drop the item and log an outcome for it.
/// The outcome will only be logged if the item has a corresponding [`Item::outcome_category()`].
Drop(Outcome),
/// Drop the item without logging an outcome.
DropSilently,
}
#[derive(Debug)]
struct EnvelopeContext {
summary: EnvelopeSummary,
scoping: Scoping,
slot: Option<SemaphorePermit>,
done: bool,
}
/// Tracks the lifetime of an [`Envelope`] in Relay.
///
/// The managed envelope accompanies envelopes through the processing pipeline in Relay and ensures
/// that outcomes are recorded when the Envelope is dropped. They can be dropped in one of three
/// ways:
///
/// - By calling [`accept`](Self::accept). Responsibility of the envelope has been transferred to
/// another service, and no further outcomes need to be recorded.
/// - By calling [`reject`](Self::reject). The entire envelope was dropped, and the outcome
/// specifies the reason.
/// - By dropping the managed envelope. This indicates an issue or a bug and raises the
/// `"internal"` outcome. There should be additional error handling to report an error to Sentry.
///
/// The managed envelope also holds a processing queue permit which is used for backpressure
/// management. It is automatically reclaimed when the context is dropped along with the envelope.
#[derive(Debug)]
pub struct ManagedEnvelope {
envelope: Box<Envelope>,
context: EnvelopeContext,
outcome_aggregator: Addr<TrackOutcome>,
test_store: Addr<TestStore>,
}
impl ManagedEnvelope {
/// Computes a managed envelope from the given envelope.
fn new_internal(
envelope: Box<Envelope>,
slot: Option<SemaphorePermit>,
outcome_aggregator: Addr<TrackOutcome>,
test_store: Addr<TestStore>,
) -> Self {
let meta = &envelope.meta();
let summary = EnvelopeSummary::compute(envelope.as_ref());
let scoping = meta.get_partial_scoping();
Self {
envelope,
context: EnvelopeContext {
summary,
scoping,
slot,
done: false,
},
outcome_aggregator,
test_store,
}
}
/// Creates a standalone envelope for testing purposes.
///
/// As opposed to [`new`](Self::new), this does not require a queue permit. This makes it
/// suitable for unit testing internals of the processing pipeline.
#[cfg(test)]
pub fn standalone(
envelope: Box<Envelope>,
outcome_aggregator: Addr<TrackOutcome>,
test_store: Addr<TestStore>,
) -> Self {
Self::new_internal(envelope, None, outcome_aggregator, test_store)
}
#[cfg(test)]
pub fn untracked(
envelope: Box<Envelope>,
outcome_aggregator: Addr<TrackOutcome>,
test_store: Addr<TestStore>,
) -> Self {
let mut envelope = Self::new_internal(envelope, None, outcome_aggregator, test_store);
envelope.context.done = true;
envelope
}
/// Computes a managed envelope from the given envelope and binds it to the processing queue.
///
/// To provide additional scoping, use [`ManagedEnvelope::scope`].
pub fn new(
envelope: Box<Envelope>,
slot: SemaphorePermit,
outcome_aggregator: Addr<TrackOutcome>,
test_store: Addr<TestStore>,
) -> Self {
Self::new_internal(envelope, Some(slot), outcome_aggregator, test_store)
}
/// Returns a reference to the contained [`Envelope`].
pub fn envelope(&self) -> &Envelope {
self.envelope.as_ref()
}
/// Returns a mutable reference to the contained [`Envelope`].
pub fn envelope_mut(&mut self) -> &mut Envelope {
self.envelope.as_mut()
}
/// Consumes itself returning the managed envelope.
///
/// This also releases the slot with [`SemaphorePermit`] and sets the internal context
/// to done so there is no rejection issued once the [`ManagedEnvelope`] is consumed.
pub fn into_envelope(mut self) -> Box<Envelope> {
self.context.slot.take();
self.context.done = true;
Box::new(self.envelope.take_items())
}
/// Take the envelope out of the context and replace it with a dummy.
///
/// Note that after taking out the envelope, the envelope summary is incorrect.
pub(crate) fn take_envelope(&mut self) -> Box<Envelope> {
Box::new(self.envelope.take_items())
}
/// Update the context with envelope information.
///
/// This updates the item summary as well as the event id.
pub fn update(&mut self) -> &mut Self {
self.context.summary = EnvelopeSummary::compute(self.envelope());
self
}
/// Retains or drops items based on the [`ItemAction`].
///
///
/// This method operates in place and preserves the order of the retained items.
pub fn retain_items<F>(&mut self, mut f: F)
where
F: FnMut(&mut Item) -> ItemAction,
{
let mut outcomes = vec![];
let use_indexed = self.use_index_category();
self.envelope.retain_items(|item| match f(item) {
ItemAction::Keep => true,
ItemAction::Drop(outcome) => {
if let Some(category) = item.outcome_category(use_indexed) {
outcomes.push((outcome, category, item.quantity()));
}
false
}
ItemAction::DropSilently => false,
});
for (outcome, category, quantity) in outcomes {
self.track_outcome(outcome, category, quantity);
}
// TODO: once `update` is private, it should be called here.
}
/// Record that event metrics have been extracted.
///
/// This is usually done automatically as part of `EnvelopeContext::new` or `update`. However,
/// if the context needs to be updated in-flight without recomputing the entire summary, this
/// method can record that metric extraction for the event item has occurred.
pub fn set_event_metrics_extracted(&mut self) -> &mut Self {
self.context.summary.event_metrics_extracted = true;
self
}
/// Re-scopes this context to the given scoping.
pub fn scope(&mut self, scoping: Scoping) -> &mut Self {
self.context.scoping = scoping;
self
}
/// Records an outcome scoped to this envelope's context.
///
/// This managed envelope should be updated using [`update`](Self::update) soon after this
/// operation to ensure that subsequent outcomes are consistent.
fn track_outcome(&self, outcome: Outcome, category: DataCategory, quantity: usize) {
self.outcome_aggregator.send(TrackOutcome {
timestamp: self.received_at(),
scoping: self.context.scoping,
outcome,
event_id: self.envelope.event_id(),
remote_addr: self.meta().remote_addr(),
category,
// Quantities are usually `usize` which lets us go all the way to 64-bit on our
// machines, but the protocol and data store can only do 32-bit.
quantity: quantity as u32,
});
}
/// Accepts the envelope and drops the context.
///
/// This should be called if the envelope has been accepted by the upstream, which means that
/// the responsibility for logging outcomes has been moved. This function will not log any
/// outcomes.
pub fn accept(mut self) {
if !self.context.done {
self.finish(RelayCounters::EnvelopeAccepted, Handling::Success);
}
}
/// Returns `true` if the indexed data category should be used for reporting.
///
/// If metrics have been extracted from the event item, we use the indexed category
/// (for example, [TransactionIndexed](`DataCategory::TransactionIndexed`)) for reporting
/// rate limits and outcomes, because reporting of the main category
/// (for example, [Transaction](`DataCategory::Transaction`) for processed transactions)
/// will be handled by the metrics aggregator.
fn use_index_category(&self) -> bool {
self.context.summary.event_metrics_extracted
}
/// Returns the data category of the event item in the envelope.
///
/// If metrics have been extracted from the event item, this will return the indexing category.
/// Outcomes for metrics (the base data category) will be logged by the metrics aggregator.
fn event_category(&self) -> Option<DataCategory> {
let category = self.context.summary.event_category?;
match category.index_category() {
Some(index_category) if self.use_index_category() => Some(index_category),
_ => Some(category),
}
}
/// Records rejection outcomes for all items stored in this context.
///
/// This does not send outcomes for empty envelopes or request-only contexts.
pub fn reject(&mut self, outcome: Outcome) {
if self.context.done {
return;
}
// Errors are only logged for what we consider failed request handling. In other cases, we
// "expect" errors and log them as debug level.
let handling = Handling::from_outcome(&outcome);
match handling {
Handling::Success => relay_log::debug!("dropped envelope: {}", outcome),
Handling::Failure => relay_log::error!("dropped envelope: {}", outcome),
}
// TODO: This could be optimized with Capture::should_capture
self.test_store
.send(Capture::rejected(self.envelope.event_id(), &outcome));
if let Some(category) = self.event_category() {
self.track_outcome(outcome.clone(), category, 1);
}
if self.context.summary.attachment_quantity > 0 {
self.track_outcome(
outcome.clone(),
DataCategory::Attachment,
self.context.summary.attachment_quantity,
);
}
if self.context.summary.profile_quantity > 0 {
self.track_outcome(
outcome,
if self.context.summary.event_metrics_extracted {
DataCategory::ProfileIndexed
} else {
DataCategory::Profile
},
self.context.summary.profile_quantity,
);
}
self.finish(RelayCounters::EnvelopeRejected, handling);
}
/// Returns scoping stored in this context.
pub fn scoping(&self) -> Scoping {
self.context.scoping
}
pub fn meta(&self) -> &RequestMeta {
self.envelope().meta()
}
/// Returns estimated size of this envelope.
///
/// This is just an estimated size, which in reality can be somewhat bigger, depending on the
/// list of additional attributes allocated on all of the inner types.
///
/// NOTE: Current implementation counts in only the size of the items payload and stack
/// allocated parts of [`Envelope`] and [`ManagedEnvelope`]. All the heap allocated fields
/// within early mentioned types are skipped.
pub fn estimated_size(&self) -> usize {
// Always round it up to next 1KB.
(f64::ceil(
(self.context.summary.payload_size + size_of::<Self>() + size_of::<Envelope>()) as f64
/ 1000.,
) * 1000.) as usize
}
/// Returns the instant at which the envelope was received at this Relay.
///
/// This is the monotonic time equivalent to [`received_at`](Self::received_at).
pub fn start_time(&self) -> Instant {
self.meta().start_time()
}
/// Returns the time at which the envelope was received at this Relay.
///
/// This is the date time equivalent to [`start_time`](Self::start_time).
pub fn received_at(&self) -> DateTime<Utc> {
relay_common::instant_to_date_time(self.envelope().meta().start_time())
}
/// Resets inner state to ensure there's no more logging.
fn finish(&mut self, counter: RelayCounters, handling: Handling) {
self.context.slot.take();
relay_statsd::metric!(counter(counter) += 1, handling = handling.as_str());
relay_statsd::metric!(timer(RelayTimers::EnvelopeTotalTime) = self.start_time().elapsed());
self.context.done = true;
}
}
impl Drop for ManagedEnvelope {
fn drop(&mut self) {
self.reject(Outcome::Invalid(DiscardReason::Internal));
}
}