-
Notifications
You must be signed in to change notification settings - Fork 94
/
Copy pathcommon.rs
587 lines (508 loc) · 21.2 KB
/
common.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
//! Common facilities for ingesting events through store-like endpoints.
use std::cell::RefCell;
use std::rc::Rc;
use actix::prelude::*;
use actix_web::http::{header, StatusCode};
use actix_web::middleware::cors::{Cors, CorsBuilder};
use actix_web::{HttpRequest, HttpResponse, ResponseError};
use failure::Fail;
use futures::prelude::*;
use serde::Deserialize;
use relay_common::{clone, metric, tryf, LogError};
use relay_general::protocol::{EventId, EventType};
use relay_quotas::RateLimits;
use crate::actors::events::{QueueEnvelope, QueueEnvelopeError};
use crate::actors::outcome::{DiscardReason, Outcome, TrackOutcome};
use crate::actors::project::CheckEnvelope;
use crate::actors::project_cache::{GetProject, ProjectError};
use crate::body::StorePayloadError;
use crate::envelope::{AttachmentType, Envelope, EnvelopeError, ItemType, Items};
use crate::extractors::RequestMeta;
use crate::metrics::RelayCounters;
use crate::service::{ServiceApp, ServiceState};
use crate::utils::{self, ApiErrorResponse, FormDataIter, MultipartError};
use relay_config::Config;
#[derive(Fail, Debug)]
pub enum BadStoreRequest {
#[fail(display = "unsupported protocol version ({})", _0)]
UnsupportedProtocolVersion(u16),
#[fail(display = "could not schedule event processing")]
ScheduleFailed(#[cause] MailboxError),
#[fail(display = "failed to fetch project information")]
ProjectFailed(#[cause] ProjectError),
#[fail(display = "empty request body")]
EmptyBody,
#[fail(display = "invalid JSON data")]
InvalidJson(#[cause] serde_json::Error),
#[fail(display = "invalid messagepack data")]
InvalidMsgpack(#[cause] rmp_serde::decode::Error),
#[fail(display = "invalid event envelope")]
InvalidEnvelope(#[cause] EnvelopeError),
#[fail(display = "invalid multipart data")]
InvalidMultipart(#[cause] MultipartError),
#[fail(display = "invalid minidump")]
InvalidMinidump,
#[fail(display = "missing minidump")]
MissingMinidump,
#[fail(display = "invalid unreal crash report")]
InvalidUnrealReport,
#[fail(display = "invalid event id")]
InvalidEventId,
#[fail(display = "failed to queue envelope")]
QueueFailed(#[cause] QueueEnvelopeError),
#[fail(display = "failed to read request body")]
PayloadError(#[cause] StorePayloadError),
#[fail(display = "event rejected due to rate limit")]
RateLimited(RateLimits),
#[fail(display = "event submission rejected with_reason: {:?}", _0)]
EventRejected(DiscardReason),
}
impl BadStoreRequest {
fn to_outcome(&self) -> Option<Outcome> {
Some(match self {
BadStoreRequest::UnsupportedProtocolVersion(_) => {
Outcome::Invalid(DiscardReason::AuthVersion)
}
BadStoreRequest::InvalidUnrealReport => {
Outcome::Invalid(DiscardReason::MissingMinidumpUpload)
}
BadStoreRequest::EmptyBody => Outcome::Invalid(DiscardReason::NoData),
BadStoreRequest::InvalidJson(_) => Outcome::Invalid(DiscardReason::InvalidJson),
BadStoreRequest::InvalidMsgpack(_) => Outcome::Invalid(DiscardReason::InvalidMsgpack),
BadStoreRequest::InvalidMultipart(_) => {
Outcome::Invalid(DiscardReason::InvalidMultipart)
}
BadStoreRequest::InvalidMinidump => Outcome::Invalid(DiscardReason::InvalidMinidump),
BadStoreRequest::MissingMinidump => {
Outcome::Invalid(DiscardReason::MissingMinidumpUpload)
}
BadStoreRequest::InvalidEnvelope(_) => Outcome::Invalid(DiscardReason::InvalidEnvelope),
BadStoreRequest::QueueFailed(event_error) => match event_error {
QueueEnvelopeError::TooManyEvents => Outcome::Invalid(DiscardReason::Internal),
},
BadStoreRequest::ProjectFailed(project_error) => match project_error {
ProjectError::FetchFailed => Outcome::Invalid(DiscardReason::ProjectState),
_ => Outcome::Invalid(DiscardReason::Internal),
},
BadStoreRequest::ScheduleFailed(_) => Outcome::Invalid(DiscardReason::Internal),
BadStoreRequest::EventRejected(reason) => Outcome::Invalid(*reason),
BadStoreRequest::PayloadError(payload_error) => {
Outcome::Invalid(payload_error.discard_reason())
}
BadStoreRequest::RateLimited(rate_limits) => {
return rate_limits
.longest_error()
.map(|r| Outcome::RateLimited(r.reason_code.clone()));
}
// should actually never create an outcome
BadStoreRequest::InvalidEventId => Outcome::Invalid(DiscardReason::Internal),
})
}
}
impl ResponseError for BadStoreRequest {
fn error_response(&self) -> HttpResponse {
let body = ApiErrorResponse::from_fail(self);
match self {
BadStoreRequest::RateLimited(rate_limits) => {
let retry_after_header = rate_limits
.longest()
.map(|limit| limit.retry_after.remaining_seconds().to_string())
.unwrap_or_default();
let rate_limits_header = utils::format_rate_limits(rate_limits);
// For rate limits, we return a special status code and indicate the client to hold
// off until the rate limit period has expired. Currently, we only support the
// delay-seconds variant of the Rate-Limit header.
HttpResponse::build(StatusCode::TOO_MANY_REQUESTS)
.header(header::RETRY_AFTER, retry_after_header)
.header(utils::RATE_LIMITS_HEADER, rate_limits_header)
.json(&body)
}
BadStoreRequest::ProjectFailed(project_error) => match project_error {
ProjectError::FetchFailed => {
// This particular project is somehow broken. We could treat this as 503 but it's
// more likely that the error is local to this project.
HttpResponse::InternalServerError().json(&body)
}
ProjectError::ScheduleFailed(_) => HttpResponse::ServiceUnavailable().json(&body),
},
BadStoreRequest::ScheduleFailed(_) | BadStoreRequest::QueueFailed(_) => {
// These errors indicate that something's wrong with our actor system, most likely
// mailbox congestion or a faulty shutdown. Indicate an unavailable service to the
// client. It might retry event submission at a later time.
HttpResponse::ServiceUnavailable().json(&body)
}
BadStoreRequest::EventRejected(_) => {
// The event has been discarded, which is generally indicated with a 403 error.
// Originally, Sentry also used this status code for event filters, but these are
// now executed asynchronously in `EventProcessor`.
HttpResponse::Forbidden().json(&body)
}
BadStoreRequest::PayloadError(StorePayloadError::Overflow) => {
HttpResponse::PayloadTooLarge().json(&body)
}
_ => {
// In all other cases, we indicate a generic bad request to the client and render
// the cause. This was likely the client's fault.
HttpResponse::BadRequest().json(&body)
}
}
}
}
#[derive(Debug, Deserialize, PartialEq)]
pub struct MinimalEvent {
#[serde(default, rename = "event_id")]
pub id: Option<EventId>,
#[serde(default, rename = "type")]
pub ty: EventType,
}
/// Parses a minimal subset of the event payload.
///
/// This function validates that the provided payload is valid and returns an `Err` on parse errors.
pub fn minimal_event_from_json(data: &[u8]) -> Result<MinimalEvent, BadStoreRequest> {
serde_json::from_slice(data).map_err(BadStoreRequest::InvalidJson)
}
/// Extracts the event id from a JSON payload.
///
/// If the payload contains no event id, `Ok(None)` is returned. This function also validates that
/// the provided is valid and returns an `Err` on parse errors. If the event id itself is malformed,
/// an `Err` is returned.
pub fn event_id_from_json(data: &[u8]) -> Result<Option<EventId>, BadStoreRequest> {
minimal_event_from_json(data).map(|event| event.id)
}
/// Extracts the event id from a MessagePack payload.
///
/// If the payload contains no event id, `Ok(None)` is returned. This function also validates that
/// the provided is valid and returns an `Err` on parse errors. If the event id itself is malformed,
/// an `Err` is returned.
pub fn event_id_from_msgpack(data: &[u8]) -> Result<Option<EventId>, BadStoreRequest> {
rmp_serde::from_slice(data)
.map(|MinimalEvent { id, .. }| id)
.map_err(BadStoreRequest::InvalidMsgpack)
}
/// Extracts the event id from `sentry` JSON payload or the `sentry[event_id]` formdata key.
///
/// If the event id itself is malformed, an `Err` is returned. If there is a `sentry` key containing
/// malformed JSON, an error is returned.
pub fn event_id_from_formdata(data: &[u8]) -> Result<Option<EventId>, BadStoreRequest> {
for entry in FormDataIter::new(data) {
if entry.key() == "sentry" {
return event_id_from_json(entry.value().as_bytes());
} else if entry.key() == "sentry[event_id]" {
return entry
.value()
.parse()
.map(Some)
.map_err(|_| BadStoreRequest::InvalidEventId);
}
}
Ok(None)
}
/// Extracts the event id from multiple items.
///
/// Submitting multiple event payloads is undefined behavior. This function will check for an event
/// id in the following precedence:
///
/// 1. The `Event` item.
/// 2. The `__sentry-event` event attachment.
/// 3. The `sentry` JSON payload.
/// 4. The `sentry[event_id]` formdata key.
///
/// # Limitations
///
/// Extracting the event id from chunked formdata fields on the Minidump endpoint (`sentry__1`,
/// `sentry__2`, ...) is not supported. In this case, `None` is returned.
pub fn event_id_from_items(items: &Items) -> Result<Option<EventId>, BadStoreRequest> {
if let Some(item) = items.iter().find(|item| item.ty() == ItemType::Event) {
if let Some(event_id) = event_id_from_json(&item.payload())? {
return Ok(Some(event_id));
}
}
if let Some(item) = items
.iter()
.find(|item| item.attachment_type() == Some(AttachmentType::EventPayload))
{
if let Some(event_id) = event_id_from_msgpack(&item.payload())? {
return Ok(Some(event_id));
}
}
if let Some(item) = items.iter().find(|item| item.ty() == ItemType::FormData) {
// Swallow all other errors here since it is quite common to receive invalid secondary
// payloads. `EventProcessor` also retains events in such cases.
if let Ok(Some(event_id)) = event_id_from_formdata(&item.payload()) {
return Ok(Some(event_id));
}
}
Ok(None)
}
/// Creates a preconfigured CORS middleware builder for store requests.
///
/// To configure CORS, register endpoints using `resource()` and finalize by calling `register()`, which
/// returns an App. This configures POST as allowed method, allows default sentry headers, and
/// exposes the return headers.
pub fn cors(app: ServiceApp) -> CorsBuilder<ServiceState> {
let mut builder = Cors::for_app(app);
builder
.allowed_methods(vec!["POST"])
.allowed_headers(vec![
"x-sentry-auth",
"x-requested-with",
"x-forwarded-for",
"origin",
"referer",
"accept",
"content-type",
"authentication",
"authorization",
"content-encoding",
"transfer-encoding",
])
.expose_headers(vec![
"x-sentry-error",
"x-sentry-rate-limits",
"retry-after",
])
.max_age(3600);
builder
}
/// Checks for size limits of items in this envelope.
///
/// Returns `true`, if the envelope adheres to the configured size limits. Otherwise, returns
/// `false`, in which case the envelope should be discarded and a `413 Payload Too Large` response
/// shoult be given.
///
/// The following limits are checked:
///
/// - `max_event_size`
/// - `max_attachment_size`
/// - `max_attachments_size`
/// - `max_session_count`
fn check_envelope_size_limits(config: &Config, envelope: &Envelope) -> bool {
let mut event_size = 0;
let mut attachments_size = 0;
let mut session_count = 0;
for item in envelope.items() {
match item.ty() {
ItemType::Event
| ItemType::Transaction
| ItemType::Security
| ItemType::RawSecurity
| ItemType::FormData => event_size += item.len(),
ItemType::Attachment | ItemType::UnrealReport => {
if item.len() > config.max_attachment_size() {
return false;
}
attachments_size += item.len()
}
ItemType::Session => session_count += 1,
ItemType::UserReport => (),
}
}
event_size <= config.max_event_size()
&& attachments_size <= config.max_attachments_size()
&& session_count <= config.max_session_count()
}
/// Handles Sentry events.
///
/// Sentry events may come either directly from a http request ( the store endpoint calls this
/// method directly) or are generated inside Relay from requests to other endpoints (e.g. the
/// security endpoint)
///
/// If store_event receives a non empty store_body it will use it as the body of the event otherwise
/// it will try to create a store_body from the request.
pub fn handle_store_like_request<F, R, I>(
meta: RequestMeta,
is_event: bool,
request: HttpRequest<ServiceState>,
extract_envelope: F,
create_response: R,
emit_rate_limit: bool,
) -> ResponseFuture<HttpResponse, BadStoreRequest>
where
F: FnOnce(&HttpRequest<ServiceState>, RequestMeta) -> I + 'static,
I: IntoFuture<Item = Envelope, Error = BadStoreRequest> + 'static,
R: FnOnce(Option<EventId>) -> HttpResponse + Copy + 'static,
{
let start_time = meta.start_time();
// For now, we only handle <= v8 and drop everything else
let version = meta.version();
if version > relay_common::PROTOCOL_VERSION {
// TODO: Delegate to forward_upstream here
tryf!(Err(BadStoreRequest::UnsupportedProtocolVersion(version)));
}
metric!(
counter(RelayCounters::EventProtocol) += 1,
version = &format!("{}", version)
);
let public_key = meta.public_key();
let event_manager = request.state().event_manager();
let project_manager = request.state().project_cache();
let outcome_producer = request.state().outcome_producer();
let remote_addr = meta.client_addr();
let scoping = Rc::new(RefCell::new(meta.get_partial_scoping()));
let event_id = Rc::new(RefCell::new(None));
let config = request.state().config();
let future = project_manager
.send(GetProject { public_key })
.map_err(BadStoreRequest::ScheduleFailed)
.and_then(clone!(event_id, scoping, |project| {
extract_envelope(&request, meta)
.into_future()
.and_then(clone!(project, |envelope| {
event_id.replace(envelope.event_id());
project
.send(CheckEnvelope::cached(envelope))
.map_err(BadStoreRequest::ScheduleFailed)
.and_then(|result| result.map_err(BadStoreRequest::ProjectFailed))
}))
.and_then(clone!(scoping, |response| {
scoping.replace(response.scoping);
let checked = response.result.map_err(BadStoreRequest::EventRejected)?;
// Skip over queuing and issue a rate limit right away
let envelope = match checked.envelope {
Some(envelope) => envelope,
None => return Err(BadStoreRequest::RateLimited(checked.rate_limits)),
};
if check_envelope_size_limits(&config, &envelope) {
Ok((envelope, checked.rate_limits))
} else {
Err(BadStoreRequest::PayloadError(StorePayloadError::Overflow))
}
}))
.and_then(move |(envelope, rate_limits)| {
event_manager
.send(QueueEnvelope {
envelope,
project,
start_time,
})
.map_err(BadStoreRequest::ScheduleFailed)
.and_then(|result| result.map_err(BadStoreRequest::QueueFailed))
.map(move |event_id| (event_id, rate_limits))
})
.and_then(move |(event_id, rate_limits)| {
if rate_limits.is_limited() {
Err(BadStoreRequest::RateLimited(rate_limits))
} else {
Ok(create_response(event_id))
}
})
}))
.or_else(move |error: BadStoreRequest| {
metric!(counter(RelayCounters::EnvelopeRejected) += 1);
if is_event {
if let Some(outcome) = error.to_outcome() {
outcome_producer.do_send(TrackOutcome {
timestamp: start_time,
scoping: *scoping.borrow(),
outcome,
event_id: *event_id.borrow(),
remote_addr,
});
}
}
if !emit_rate_limit && matches!(error, BadStoreRequest::RateLimited(_)) {
return Ok(create_response(*event_id.borrow()));
}
let response = error.error_response();
if response.status().is_server_error() {
log::error!("error handling request: {}", LogError(&error));
}
Ok(response)
});
Box::new(future)
}
/// Creates a HttpResponse containing the textual representation of the given EventId
pub fn create_text_event_id_response(id: Option<EventId>) -> HttpResponse {
// Event id is set statically in the ingest path.
let id = id.unwrap_or_default();
debug_assert!(!id.is_nil());
// the minidump client expects the response to contain an event id as a hyphenated UUID
// i.e. xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
HttpResponse::Ok()
.content_type("text/plain")
.body(format!("{}", id.0.to_hyphenated()))
}
/// A helper for creating Actix routes that are resilient against double-slashes
///
/// Write `normpath("api/store")` to create a route pattern that matches "/api/store/",
/// "api//store", "api//store////", etc.
pub fn normpath(route: &str) -> String {
let mut pattern = String::new();
for (i, segment) in route.trim_matches('/').split('/').enumerate() {
// Apparently the leading slash needs to be explicit and cannot be part of a pattern
pattern.push_str(&format!(
"/{{multislash{i}:/*}}{segment}",
i = i,
segment = segment
));
}
if route.ends_with('/') {
pattern.push_str("{trailing_slash:/+}");
} else {
pattern.push_str("{trailing_slash:/*}");
}
pattern
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_normpath() {
assert_eq!(
normpath("/api/store/"),
"/{multislash0:/*}api/{multislash1:/*}store{trailing_slash:/+}"
);
assert_eq!(
normpath("/api/store"),
"/{multislash0:/*}api/{multislash1:/*}store{trailing_slash:/*}"
);
}
#[test]
fn test_minimal_empty_event() {
let json = r#"{}"#;
let minimal = minimal_event_from_json(json.as_ref()).unwrap();
assert_eq!(
minimal,
MinimalEvent {
id: None,
ty: EventType::Default
}
);
}
#[test]
fn test_minimal_event_id() {
let json = r#"{"event_id": "037af9ac1b49494bacd7ec5114f801d9"}"#;
let minimal = minimal_event_from_json(json.as_ref()).unwrap();
assert_eq!(
minimal,
MinimalEvent {
id: Some("037af9ac1b49494bacd7ec5114f801d9".parse().unwrap()),
ty: EventType::Default
}
);
}
#[test]
fn test_minimal_event_type() {
let json = r#"{"type": "expectct"}"#;
let minimal = minimal_event_from_json(json.as_ref()).unwrap();
assert_eq!(
minimal,
MinimalEvent {
id: None,
ty: EventType::ExpectCT,
}
);
}
#[test]
fn test_minimal_event_invalid_type() {
let json = r#"{"type": "invalid"}"#;
let minimal = minimal_event_from_json(json.as_ref()).unwrap();
assert_eq!(
minimal,
MinimalEvent {
id: None,
ty: EventType::Default,
}
);
}
}