From 4a2d030d9755043d1d2926fecb6f14ac752915ff Mon Sep 17 00:00:00 2001 From: Armin Ronacher Date: Mon, 3 Feb 2020 15:42:15 +0100 Subject: [PATCH 01/13] feat(sessions): Initial support for session data in envelopes --- relay-general/src/protocol/mod.rs | 2 + relay-general/src/protocol/sessions.rs | 76 +++++++++++++++++++++ relay-server/src/actors/events.rs | 3 + relay-server/src/endpoints/store.rs | 1 + relay-server/src/envelope.rs | 48 +++++++++++-- relay-server/src/extractors/request_meta.rs | 30 +++++++- 6 files changed, 154 insertions(+), 6 deletions(-) create mode 100644 relay-general/src/protocol/sessions.rs diff --git a/relay-general/src/protocol/mod.rs b/relay-general/src/protocol/mod.rs index e97ce9dd60b..2425b4a5c0d 100644 --- a/relay-general/src/protocol/mod.rs +++ b/relay-general/src/protocol/mod.rs @@ -12,6 +12,7 @@ mod mechanism; mod metrics; mod request; mod security_report; +mod sessions; mod span; mod stacktrace; mod tags; @@ -42,6 +43,7 @@ pub use self::mechanism::{CError, MachException, Mechanism, MechanismMeta, Posix pub use self::metrics::Metrics; pub use self::request::{Cookies, HeaderName, HeaderValue, Headers, Query, Request}; pub use self::security_report::{Csp, ExpectCt, ExpectStaple, Hpkp, SecurityReportType}; +pub use self::sessions::{SessionEvent, SessionEventOperation}; pub use self::span::Span; pub use self::stacktrace::{Frame, FrameData, FrameVars, RawStacktrace, Stacktrace}; pub use self::tags::{TagEntry, Tags}; diff --git a/relay-general/src/protocol/sessions.rs b/relay-general/src/protocol/sessions.rs new file mode 100644 index 00000000000..1ab4142545a --- /dev/null +++ b/relay-general/src/protocol/sessions.rs @@ -0,0 +1,76 @@ +use std::fmt; +use std::str::FromStr; +use uuid::Uuid; + +use chrono::{DateTime, Utc}; +use failure::Fail; +use serde::{Deserialize, Serialize}; + +/// The type of session event we're dealing with. +#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum SessionEventOperation { + Start, + #[serde(rename = "fg")] + Foreground, + #[serde(rename = "bg")] + Background, + Crash, + Close, +} + +/// An error used when parsing `SessionEventOperation`. +#[derive(Debug, Fail)] +#[fail(display = "invalid event type")] +pub struct ParseSessionEventTypeError; + +impl FromStr for SessionEventOperation { + type Err = ParseSessionEventTypeError; + + fn from_str(string: &str) -> Result { + Ok(match string { + "start" => SessionEventOperation::Start, + "fg" => SessionEventOperation::Foreground, + "bg" => SessionEventOperation::Background, + "crash" => SessionEventOperation::Crash, + "close" => SessionEventOperation::Close, + _ => return Err(ParseSessionEventTypeError), + }) + } +} + +impl fmt::Display for SessionEventOperation { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + SessionEventOperation::Start => write!(f, "start"), + SessionEventOperation::Foreground => write!(f, "fg"), + SessionEventOperation::Background => write!(f, "bg"), + SessionEventOperation::Crash => write!(f, "crash"), + SessionEventOperation::Close => write!(f, "close"), + } + } +} + +/// Event specific attributes. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct SessionEventAttributes { + pub os: Option, + pub api_level: Option, + pub release: Option, + pub environment: Option, + pub device_family: Option, +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct SessionEvent { + /// Unique identifier of this event. + pub id: Uuid, + /// The timestamp of the session event. + #[serde(rename = "ts")] + pub timestamp: DateTime, + /// The session operation. + pub op: SessionEventOperation, + /// The session event attributes. + #[serde(rename = "attrs")] + pub attributes: SessionEventAttributes, +} diff --git a/relay-server/src/actors/events.rs b/relay-server/src/actors/events.rs index f5bdb340e81..bcfd3693a0b 100644 --- a/relay-server/src/actors/events.rs +++ b/relay-server/src/actors/events.rs @@ -525,6 +525,9 @@ impl EventProcessor { // These may be forwarded to upstream / store: ItemType::Attachment => false, ItemType::UserReport => false, + + // session data is never considered as part of deduplication + ItemType::Session => false, } } diff --git a/relay-server/src/endpoints/store.rs b/relay-server/src/endpoints/store.rs index c0d0aa62ad0..22a4a3ea18c 100644 --- a/relay-server/src/endpoints/store.rs +++ b/relay-server/src/endpoints/store.rs @@ -83,6 +83,7 @@ fn extract_envelope( #[derive(Serialize)] struct StoreResponse { + #[serde(skip_serializing_if = "Option::is_none")] id: Option, } diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index 7ac63e21438..a09273f452f 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -83,6 +83,8 @@ pub enum ItemType { UnrealReport, /// User feedback encoded as JSON. UserReport, + /// Session data. + Session, } impl fmt::Display for ItemType { @@ -94,6 +96,7 @@ impl fmt::Display for ItemType { Self::SecurityReport => write!(f, "security report"), Self::UnrealReport => write!(f, "unreal report"), Self::UserReport => write!(f, "user feedback"), + Self::Session => write!(f, "session"), } } } @@ -404,8 +407,9 @@ impl Item { // Form data items may contain partial event payloads, but those are only ever valid if they // occur together with an explicit event item, such as a minidump or apple crash report. For - // this reason, FormData alone does not constitute an event item. - ItemType::FormData | ItemType::UserReport => false, + // this reason, FormData alone does not constitute an event item. Same for user reports + // and sessions. + ItemType::FormData | ItemType::UserReport | ItemType::Session => false, } } @@ -413,10 +417,14 @@ impl Item { /// /// This is true for all items except session health events. pub fn requires_event(&self) -> bool { - // NOTE: Item types should be explicitly white listed here. match self.ty() { - // TODO: whitelist item types - _ => true, + ItemType::Event => true, + ItemType::Attachment => true, + ItemType::FormData => true, + ItemType::SecurityReport => true, + ItemType::UnrealReport => true, + ItemType::UserReport => true, + ItemType::Session => false, } } } @@ -981,4 +989,34 @@ mod tests { "###); } + + #[test] + fn test_deserialize_envelope_session_data() { + // With terminating newline + let bytes = Bytes::from(&b"\ + {\"sid\":\"9ec79c33ec9942ab8353589fcb2e04dc\",\"did\":\"319c9827ff324d07954331d1230e9ee3\",\"dsn\":\"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42\"}\n\ + {\"type\":\"session\",\"length\":83}\n\ + {\"op\":\"start\",\"ts\":1580740770.86862,\"attrs\":{\"os\":\"Android\",\"os_version\":\"8.0.0\"}}\n\ + "[..]); + + let envelope = Envelope::parse_bytes(bytes).unwrap(); + + assert_eq!(envelope.len(), 1); + let items: Vec<_> = envelope.items().collect(); + + assert_eq!(items[0].ty(), ItemType::Session); + assert_eq!(items[0].len(), 83); + assert_eq!( + items[0].payload(), + Bytes::from(&b"{\"op\":\"start\",\"ts\":1580740770.86862,\"attrs\":{\"os\":\"Android\",\"os_version\":\"8.0.0\"}}\n"[..]) + ); + assert_eq!(items[0].content_type(), None); + + let meta = envelope.meta(); + assert_eq!( + meta.session_id(), + Some(&"9ec79c33ec9942ab8353589fcb2e04dc".parse().unwrap()) + ); + assert_eq!(meta.distinct_id(), Some("319c9827ff324d07954331d1230e9ee3")); + } } diff --git a/relay-server/src/extractors/request_meta.rs b/relay-server/src/extractors/request_meta.rs index 52ad045fa21..8c7ff2005cd 100644 --- a/relay-server/src/extractors/request_meta.rs +++ b/relay-server/src/extractors/request_meta.rs @@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize}; use url::Url; use relay_common::{ - tryf, Auth, AuthParseError, Dsn, DsnParseError, ProjectId, ProjectIdParseError, + tryf, Auth, AuthParseError, Dsn, DsnParseError, ProjectId, ProjectIdParseError, Uuid, }; use crate::actors::project_keys::GetProjectId; @@ -89,6 +89,14 @@ pub struct RequestMeta { #[serde(default, skip_serializing_if = "Option::is_none")] user_agent: Option, + /// The session id. + #[serde(default, rename = "sid", skip_serializing_if = "Option::is_none")] + session_id: Option, + + /// The distinct id. + #[serde(default, rename = "did", skip_serializing_if = "Option::is_none")] + distinct_id: Option, + /// When the event has been sent, according to the SDK. #[serde(default, skip_serializing_if = "Option::is_none")] sent_at: Option>, @@ -105,6 +113,8 @@ impl RequestMeta { remote_addr: Some("192.168.0.1".parse().unwrap()), forwarded_for: String::new(), user_agent: Some("sentry/agent".to_string()), + session_id: None, + distinct_id: None, sent_at: None, } } @@ -128,6 +138,12 @@ impl RequestMeta { if let Some(user_agent) = other.user_agent { self.user_agent = Some(user_agent); } + if let Some(session_id) = other.session_id { + self.session_id = Some(session_id); + } + if let Some(distinct_id) = other.distinct_id { + self.distinct_id = Some(distinct_id); + } } /// Returns a reference to the DSN. @@ -198,6 +214,16 @@ impl RequestMeta { self.sent_at } + /// Returns the session id if this contains session data. + pub fn session_id(&self) -> Option<&Uuid> { + self.session_id.as_ref() + } + + /// Returns the "distinct id" if this contains session data. + pub fn distinct_id(&self) -> Option<&str> { + self.distinct_id.as_deref() + } + /// Formats the Sentry authentication header. /// /// This header must be included in store requests. @@ -325,6 +351,8 @@ fn extract_event_meta( remote_addr, forwarded_for, user_agent, + session_id: None, + distinct_id: None, sent_at: None, // We only support this via envelopes }) })) From 9bc30942507b71bef20b2fad72c8d74e7582c34c Mon Sep 17 00:00:00 2001 From: Armin Ronacher Date: Tue, 4 Feb 2020 10:57:11 +0100 Subject: [PATCH 02/13] ref: ProcessEvent -> ProcessEnvelope --- relay-server/src/actors/events.rs | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/relay-server/src/actors/events.rs b/relay-server/src/actors/events.rs index bcfd3693a0b..be3859e1d1a 100644 --- a/relay-server/src/actors/events.rs +++ b/relay-server/src/actors/events.rs @@ -531,7 +531,10 @@ impl EventProcessor { } } - fn process(&self, message: ProcessEvent) -> Result { + fn process( + &self, + message: ProcessEnvelope, + ) -> Result { let mut envelope = message.envelope; macro_rules! if_processing { @@ -617,7 +620,7 @@ impl EventProcessor { // envelope only contains attachments or user reports. We should not run filters or // apply rate limits. log::trace!("no event for envelope, skipping processing"); - return Ok(ProcessEventResponse { envelope }); + return Ok(ProcessEnvelopeResponse { envelope }); } if_processing! { @@ -653,7 +656,7 @@ impl EventProcessor { } envelope.add_item(event_item); - Ok(ProcessEventResponse { envelope }) + Ok(ProcessEnvelopeResponse { envelope }) } } @@ -661,25 +664,25 @@ impl Actor for EventProcessor { type Context = SyncContext; } -struct ProcessEvent { +struct ProcessEnvelope { pub envelope: Envelope, pub project_state: Arc, pub start_time: Instant, } #[cfg_attr(not(feature = "processing"), allow(dead_code))] -struct ProcessEventResponse { +struct ProcessEnvelopeResponse { envelope: Envelope, } -impl Message for ProcessEvent { - type Result = Result; +impl Message for ProcessEnvelope { + type Result = Result; } -impl Handler for EventProcessor { - type Result = Result; +impl Handler for EventProcessor { + type Result = Result; - fn handle(&mut self, message: ProcessEvent, _context: &mut Self::Context) -> Self::Result { + fn handle(&mut self, message: ProcessEnvelope, _context: &mut Self::Context) -> Self::Result { metric!(timer(RelayTimers::EventWaitTime) = message.start_time.elapsed()); metric!(timer(RelayTimers::EventProcessingTime), { self.process(message) @@ -906,7 +909,7 @@ impl Handler for EventManager { .and_then(clone!(org_id_for_err, |project_state| { *org_id_for_err.lock() = project_state.organization_id; processor - .send(ProcessEvent { + .send(ProcessEnvelope { envelope, project_state, start_time, From 492e7edb5f6db9681df2d1a466a07bce78165ce2 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Fri, 7 Feb 2020 16:07:44 +0100 Subject: [PATCH 03/13] ref(protocol): Rename session module for consistency --- relay-general/src/protocol/mod.rs | 4 ++-- relay-general/src/protocol/{sessions.rs => session.rs} | 0 2 files changed, 2 insertions(+), 2 deletions(-) rename relay-general/src/protocol/{sessions.rs => session.rs} (100%) diff --git a/relay-general/src/protocol/mod.rs b/relay-general/src/protocol/mod.rs index 15154520b0f..aec40992d7f 100644 --- a/relay-general/src/protocol/mod.rs +++ b/relay-general/src/protocol/mod.rs @@ -12,7 +12,7 @@ mod mechanism; mod metrics; mod request; mod security_report; -mod sessions; +mod session; mod span; mod stacktrace; mod tags; @@ -43,7 +43,7 @@ pub use self::mechanism::{CError, MachException, Mechanism, MechanismMeta, Posix pub use self::metrics::Metrics; pub use self::request::{Cookies, HeaderName, HeaderValue, Headers, Query, Request}; pub use self::security_report::{Csp, ExpectCt, ExpectStaple, Hpkp, SecurityReportType}; -pub use self::sessions::{ +pub use self::session::{ ParseSessionStatusError, SessionChangeEvent, SessionEventAttributes, SessionStatus, }; pub use self::span::Span; diff --git a/relay-general/src/protocol/sessions.rs b/relay-general/src/protocol/session.rs similarity index 100% rename from relay-general/src/protocol/sessions.rs rename to relay-general/src/protocol/session.rs From 5583a1f3d26f579df0108025b4cb8897c411fcec Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Fri, 7 Feb 2020 17:01:56 +0100 Subject: [PATCH 04/13] feat(protocol): Fix session protocol defaults and add tests --- relay-general/src/protocol/mod.rs | 4 +- relay-general/src/protocol/session.rs | 172 ++++++++++++++++++++++---- relay-general/src/store/schema.rs | 1 - 3 files changed, 152 insertions(+), 25 deletions(-) diff --git a/relay-general/src/protocol/mod.rs b/relay-general/src/protocol/mod.rs index aec40992d7f..741ca4e474e 100644 --- a/relay-general/src/protocol/mod.rs +++ b/relay-general/src/protocol/mod.rs @@ -43,9 +43,7 @@ pub use self::mechanism::{CError, MachException, Mechanism, MechanismMeta, Posix pub use self::metrics::Metrics; pub use self::request::{Cookies, HeaderName, HeaderValue, Headers, Query, Request}; pub use self::security_report::{Csp, ExpectCt, ExpectStaple, Hpkp, SecurityReportType}; -pub use self::session::{ - ParseSessionStatusError, SessionChangeEvent, SessionEventAttributes, SessionStatus, -}; +pub use self::session::{ParseSessionStatusError, SessionAttributes, SessionStatus, SessionUpdate}; pub use self::span::Span; pub use self::stacktrace::{Frame, FrameData, FrameVars, RawStacktrace, Stacktrace}; pub use self::tags::{TagEntry, Tags}; diff --git a/relay-general/src/protocol/session.rs b/relay-general/src/protocol/session.rs index 6d1878d2caf..987ca62fa19 100644 --- a/relay-general/src/protocol/session.rs +++ b/relay-general/src/protocol/session.rs @@ -10,15 +10,25 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Deserialize, Serialize)] #[serde(rename_all = "snake_case")] pub enum SessionStatus { - Ok, + /// The session is currently in progress. + Pending, + /// The session terminated normally. + Exited, + /// The session resulted in an application crash. Crashed, + /// The session an unexpected abrupt termination (not crashing). Abnormal, - Exited, +} + +impl Default for SessionStatus { + fn default() -> Self { + Self::Pending + } } /// An error used when parsing `SessionStatus`. #[derive(Debug, Fail)] -#[fail(display = "invalid event type")] +#[fail(display = "invalid session status")] pub struct ParseSessionStatusError; impl FromStr for SessionStatus { @@ -26,7 +36,7 @@ impl FromStr for SessionStatus { fn from_str(string: &str) -> Result { Ok(match string { - "ok" => SessionStatus::Ok, + "pending" => SessionStatus::Pending, "crashed" => SessionStatus::Crashed, "abnormal" => SessionStatus::Abnormal, "exited" => SessionStatus::Exited, @@ -38,7 +48,7 @@ impl FromStr for SessionStatus { impl fmt::Display for SessionStatus { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { - SessionStatus::Ok => write!(f, "ok"), + SessionStatus::Pending => write!(f, "pending"), SessionStatus::Crashed => write!(f, "crashed"), SessionStatus::Abnormal => write!(f, "abnormal"), SessionStatus::Exited => write!(f, "exited"), @@ -46,46 +56,166 @@ impl fmt::Display for SessionStatus { } } -/// Event specific attributes. -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub struct SessionEventAttributes { +fn is_empty_string(opt: &Option) -> bool { + opt.as_ref().map_or(true, |s| s.is_empty()) +} + +/// Additional attributes for Sessions. +#[serde(default)] +#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] +pub struct SessionAttributes { + /// The operating system name, corresponding to the os context. pub os: Option, + /// The full operating system version string, corresponding to the os context. pub os_version: Option, + /// The device famility identifier, corresponding to the device context. pub device_family: Option, + /// The release version string. pub release: Option, + /// The environment identifier. pub environment: Option, } -fn one() -> f32 { +impl SessionAttributes { + fn is_empty(&self) -> bool { + is_empty_string(&self.os) + && is_empty_string(&self.os_version) + && is_empty_string(&self.device_family) + && is_empty_string(&self.release) + && is_empty_string(&self.environment) + } +} + +fn default_sample_rate() -> f32 { 1.0 } +#[allow(clippy::trivially_copy_pass_by_ref)] +fn is_default_sample_rate(rate: &f32) -> bool { + (*rate - default_sample_rate()) < std::f32::EPSILON +} + #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub struct SessionChangeEvent { - /// Unique identifier of this event. - pub id: Uuid, - /// The session identifier +pub struct SessionUpdate { + /// Unique identifier of this update event. + #[serde(rename = "id", default = "Uuid::new_v4")] + pub update_id: Uuid, + /// The session identifier. #[serde(rename = "sid")] pub session_id: Uuid, - /// The distinct ID - #[serde(rename = "did")] + /// The distinct identifier. + #[serde(rename = "did", default, skip_serializing_if = "Option::is_none")] pub distinct_id: Option, /// An optional logical clock. - pub seq: Option, + #[serde(rename = "seq", default, skip_serializing_if = "Option::is_none")] + pub sequence: Option, /// The timestamp of when the session change event was created. - #[serde(rename = "timestamp")] pub timestamp: DateTime, /// The timestamp of when the session itself started. - #[serde(rename = "timestamp")] pub started: DateTime, /// The sample rate. - #[serde(default = "one")] + #[serde( + default = "default_sample_rate", + skip_serializing_if = "is_default_sample_rate" + )] pub sample_rate: f32, /// An optional duration of the session so far. + #[serde(default, skip_serializing_if = "Option::is_none")] pub duration: Option, /// The status of the session. + #[serde(default)] pub status: SessionStatus, /// The session event attributes. - #[serde(rename = "attrs")] - pub attributes: SessionEventAttributes, + #[serde( + rename = "attrs", + default, + skip_serializing_if = "SessionAttributes::is_empty" + )] + pub attributes: SessionAttributes, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_session_default_values() { + let json = r#"{ + "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", + "timestamp": "2020-02-07T15:17:00Z", + "started": "2020-02-07T14:16:00Z" +}"#; + + let output = r#"{ + "id": "94ecab99-184a-45ee-ac18-6ed2c2c2e9f2", + "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", + "timestamp": "2020-02-07T15:17:00Z", + "started": "2020-02-07T14:16:00Z", + "status": "pending" +}"#; + + let update = SessionUpdate { + update_id: "94ecab99-184a-45ee-ac18-6ed2c2c2e9f2".parse().unwrap(), + session_id: "8333339f-5675-4f89-a9a0-1c935255ab58".parse().unwrap(), + distinct_id: None, + sequence: None, + timestamp: "2020-02-07T15:17:00Z".parse().unwrap(), + started: "2020-02-07T14:16:00Z".parse().unwrap(), + sample_rate: 1.0, + duration: None, + status: SessionStatus::Pending, + attributes: SessionAttributes::default(), + }; + + // Since the update_id is defaulted randomly, ensure that it matches. + let mut parsed = serde_json::from_str::(json).unwrap(); + parsed.update_id = update.update_id; + + assert_eq_dbg!(update, parsed); + assert_eq_str!(output, serde_json::to_string_pretty(&update).unwrap()); + } + + #[test] + fn test_session_roundtrip() { + let json = r#"{ + "id": "94ecab99-184a-45ee-ac18-6ed2c2c2e9f2", + "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", + "did": "b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf", + "seq": 42, + "timestamp": "2020-02-07T15:17:00Z", + "started": "2020-02-07T14:16:00Z", + "sample_rate": 2.0, + "duration": 1947.49, + "status": "exited", + "attrs": { + "os": "iOS", + "os_version": "13.3.1", + "device_family": "iPhone12,3", + "release": "sentry-test@1.0.0", + "environment": "production" + } +}"#; + + let update = SessionUpdate { + update_id: "94ecab99-184a-45ee-ac18-6ed2c2c2e9f2".parse().unwrap(), + session_id: "8333339f-5675-4f89-a9a0-1c935255ab58".parse().unwrap(), + distinct_id: Some("b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf".parse().unwrap()), + sequence: Some(42), + timestamp: "2020-02-07T15:17:00Z".parse().unwrap(), + started: "2020-02-07T14:16:00Z".parse().unwrap(), + sample_rate: 2.0, + duration: Some(1947.49), + status: SessionStatus::Exited, + attributes: SessionAttributes { + os: Some("iOS".to_owned()), + os_version: Some("13.3.1".to_owned()), + device_family: Some("iPhone12,3".to_owned()), + release: Some("sentry-test@1.0.0".to_owned()), + environment: Some("production".to_owned()), + }, + }; + + assert_eq_dbg!(update, serde_json::from_str(json).unwrap()); + assert_eq_str!(json, serde_json::to_string_pretty(&update).unwrap()); + } } diff --git a/relay-general/src/store/schema.rs b/relay-general/src/store/schema.rs index c8447cb7dae..9869a58b17c 100644 --- a/relay-general/src/store/schema.rs +++ b/relay-general/src/store/schema.rs @@ -214,7 +214,6 @@ mod tests { #[test] fn test_mechanism_missing_attributes() { use crate::protocol::{CError, MachException, Mechanism, MechanismMeta, PosixSignal}; - use crate::types::ErrorKind; let mut mechanism = Annotated::new(Mechanism { ty: Annotated::new("mytype".to_string()), From 2158f8c0ffaa04fdf5f062adcaf840ed89ec3568 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Fri, 7 Feb 2020 17:06:09 +0100 Subject: [PATCH 05/13] ref(server): Minor cleanup in envelopes --- relay-server/src/envelope.rs | 33 ++++++--------------------------- 1 file changed, 6 insertions(+), 27 deletions(-) diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index 7e8dd6193a8..a30880b3d14 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -83,7 +83,7 @@ pub enum ItemType { UnrealReport, /// User feedback encoded as JSON. UserReport, - /// Session data. + /// Session update data. Session, } @@ -407,9 +407,11 @@ impl Item { // Form data items may contain partial event payloads, but those are only ever valid if they // occur together with an explicit event item, such as a minidump or apple crash report. For - // this reason, FormData alone does not constitute an event item. Same for user reports - // and sessions. - ItemType::FormData | ItemType::UserReport | ItemType::Session => false, + // this reason, FormData alone does not constitute an event item. + ItemType::FormData => false, + + // The remaining item types cannot carry event payloads. + ItemType::UserReport | ItemType::Session => false, } } @@ -989,27 +991,4 @@ mod tests { "###); } - - #[test] - fn test_deserialize_envelope_session_data() { - // With terminating newline - let bytes = Bytes::from(&b"\ - {\"dsn\":\"https://e12d836b15bb49d7bbf99e64295d995b:@sentry.io/42\"}\n\ - {\"type\":\"session\",\"length\":83}\n\ - {\"op\":\"start\",\"ts\":1580740770.86862,\"attrs\":{\"os\":\"Android\",\"os_version\":\"8.0.0\"}}\n\ - "[..]); - - let envelope = Envelope::parse_bytes(bytes).unwrap(); - - assert_eq!(envelope.len(), 1); - let items: Vec<_> = envelope.items().collect(); - - assert_eq!(items[0].ty(), ItemType::Session); - assert_eq!(items[0].len(), 83); - assert_eq!( - items[0].payload(), - Bytes::from(&b"{\"op\":\"start\",\"ts\":1580740770.86862,\"attrs\":{\"os\":\"Android\",\"os_version\":\"8.0.0\"}}\n"[..]) - ); - assert_eq!(items[0].content_type(), None); - } } From 2e57f87f99950f9e7cc4d0da132901810374efde Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Fri, 7 Feb 2020 17:31:04 +0100 Subject: [PATCH 06/13] fix(protocol): Revert back to SessionStatus::Ok --- relay-general/src/protocol/session.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/relay-general/src/protocol/session.rs b/relay-general/src/protocol/session.rs index 987ca62fa19..ecf597cc6a9 100644 --- a/relay-general/src/protocol/session.rs +++ b/relay-general/src/protocol/session.rs @@ -10,8 +10,10 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Deserialize, Serialize)] #[serde(rename_all = "snake_case")] pub enum SessionStatus { - /// The session is currently in progress. - Pending, + /// The session is healthy. + /// + /// This does not necessarily indicate that the session is still active. + Ok, /// The session terminated normally. Exited, /// The session resulted in an application crash. @@ -22,7 +24,7 @@ pub enum SessionStatus { impl Default for SessionStatus { fn default() -> Self { - Self::Pending + Self::Ok } } @@ -36,7 +38,7 @@ impl FromStr for SessionStatus { fn from_str(string: &str) -> Result { Ok(match string { - "pending" => SessionStatus::Pending, + "ok" => SessionStatus::Ok, "crashed" => SessionStatus::Crashed, "abnormal" => SessionStatus::Abnormal, "exited" => SessionStatus::Exited, @@ -48,7 +50,7 @@ impl FromStr for SessionStatus { impl fmt::Display for SessionStatus { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match *self { - SessionStatus::Pending => write!(f, "pending"), + SessionStatus::Ok => write!(f, "ok"), SessionStatus::Crashed => write!(f, "crashed"), SessionStatus::Abnormal => write!(f, "abnormal"), SessionStatus::Exited => write!(f, "exited"), @@ -151,7 +153,7 @@ mod tests { "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", "timestamp": "2020-02-07T15:17:00Z", "started": "2020-02-07T14:16:00Z", - "status": "pending" + "status": "ok" }"#; let update = SessionUpdate { @@ -163,7 +165,7 @@ mod tests { started: "2020-02-07T14:16:00Z".parse().unwrap(), sample_rate: 1.0, duration: None, - status: SessionStatus::Pending, + status: SessionStatus::Ok, attributes: SessionAttributes::default(), }; From 7ba8a82bce30ddbddb9703dc25ba4188895845c4 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Tue, 11 Feb 2020 10:45:40 +0100 Subject: [PATCH 07/13] feat(server): Add most basic session normalization --- relay-server/src/actors/events.rs | 23 ++++++++++++++++++++++- relay-server/src/envelope.rs | 11 +++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/relay-server/src/actors/events.rs b/relay-server/src/actors/events.rs index 085da42f7c5..94ff3bf876f 100644 --- a/relay-server/src/actors/events.rs +++ b/relay-server/src/actors/events.rs @@ -16,7 +16,7 @@ use relay_general::pii::PiiProcessor; use relay_general::processor::{process_value, ProcessingState}; use relay_general::protocol::{ Breadcrumb, Csp, Event, EventId, ExpectCt, ExpectStaple, Hpkp, LenientString, Metrics, - SecurityReportType, Values, + SecurityReportType, SessionUpdate, Values, }; use relay_general::types::{Annotated, Array, Object, ProcessingAction, Value}; @@ -547,6 +547,27 @@ impl EventProcessor { }; } + // TODO(ja): Clean this up by introducing separate envelope types. + envelope.retain_items(|item| { + if item.ty() != ItemType::Session { + return true; + } + + let result = serde_json::from_slice(&item.payload()) + .and_then(|session: SessionUpdate| serde_json::to_vec(&session)); + + match result { + Ok(json) => { + item.set_payload(ContentType::Json, json); + true + } + Err(err) => { + log::debug!("dropped invalid session: {}", err); + false + } + } + }); + // Unreal endpoint puts the whole request into an item. This is done to make the endpoint // fast. For envelopes containing an Unreal request, we will look into the unreal item and // expand it so it can be consumed like any other event (e.g. `__sentry-event`). External diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index a30880b3d14..e8695abf68e 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -589,6 +589,17 @@ impl Envelope { self.items.iter() } + /// Retains only the elements specified by the predicate. + /// + /// In other words, remove all elements e such that `f(&item)` returns `false`. This method + /// operates in place and preserves the order of the retained elements. + pub fn retain_items(&mut self, f: F) + where + F: FnMut(&mut Item) -> bool, + { + self.items.retain(f); + } + /// Returns the an option with a reference to the first item that matches /// the predicate, or None if the predicate is not matched by any item. pub fn get_item_by(&self, mut pred: F) -> Option<&Item> From b67007b4be0bc2553d496cfbb859a19f77276d98 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Tue, 11 Feb 2020 13:44:00 +0100 Subject: [PATCH 08/13] feat(server): Send session events to a Kafka topic --- relay-config/src/config.rs | 28 +++-- relay-general/src/protocol/session.rs | 50 +++++++-- relay-server/src/actors/events.rs | 45 +++----- relay-server/src/actors/store.rs | 153 ++++++++++++++++++++------ relay-server/src/envelope.rs | 11 -- 5 files changed, 200 insertions(+), 87 deletions(-) diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 4e2ffcb8b1e..ef546b40799 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -398,12 +398,15 @@ pub enum KafkaTopic { Attachments, /// Transaction events topic. Transactions, - /// All outcomes are sent through this channel. + /// Shared outcomes topic for Relay and Sentry. Outcomes, + /// Session health updates. + Sessions, } /// Configuration for topics. #[derive(Serialize, Deserialize, Debug)] +#[serde(default)] pub struct TopicNames { /// Simple events topic name. pub events: String, @@ -413,6 +416,20 @@ pub struct TopicNames { pub transactions: String, /// Event outcomes topic name. pub outcomes: String, + /// Session health topic name. + pub sessions: String, +} + +impl Default for TopicNames { + fn default() -> Self { + Self { + events: "ingest-events".to_owned(), + attachments: "ingest-attachments".to_owned(), + transactions: "ingest-transactions".to_owned(), + outcomes: "outcomes".to_owned(), + sessions: "ingest-sessions".to_owned(), + } + } } /// A name value pair of Kafka config parameter. @@ -461,6 +478,7 @@ pub struct Processing { /// Kafka producer configurations. pub kafka_config: Vec, /// Kafka topic names. + #[serde(default)] pub topics: TopicNames, /// Redis hosts to connect to for storing state for rate limits. pub redis: Option, @@ -484,12 +502,7 @@ impl Default for Processing { max_secs_in_future: 0, max_secs_in_past: 0, kafka_config: Vec::new(), - topics: TopicNames { - events: String::new(), - attachments: String::new(), - transactions: String::new(), - outcomes: String::new(), - }, + topics: TopicNames::default(), redis: Some(Redis::default()), attachment_chunk_size: default_chunk_size(), projectconfig_cache_prefix: default_projectconfig_cache_prefix(), @@ -977,6 +990,7 @@ impl Config { KafkaTopic::Events => topics.events.as_str(), KafkaTopic::Transactions => topics.transactions.as_str(), KafkaTopic::Outcomes => topics.outcomes.as_str(), + KafkaTopic::Sessions => topics.sessions.as_str(), } } diff --git a/relay-general/src/protocol/session.rs b/relay-general/src/protocol/session.rs index ecf597cc6a9..ff7a8d5f842 100644 --- a/relay-general/src/protocol/session.rs +++ b/relay-general/src/protocol/session.rs @@ -1,10 +1,11 @@ use std::fmt; use std::str::FromStr; -use uuid::Uuid; +use std::time::SystemTime; use chrono::{DateTime, Utc}; use failure::Fail; use serde::{Deserialize, Serialize}; +use uuid::Uuid; /// The type of session event we're dealing with. #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Deserialize, Serialize)] @@ -88,6 +89,13 @@ impl SessionAttributes { } } +fn default_sequence() -> u64 { + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() +} + fn default_sample_rate() -> f32 { 1.0 } @@ -106,11 +114,11 @@ pub struct SessionUpdate { #[serde(rename = "sid")] pub session_id: Uuid, /// The distinct identifier. - #[serde(rename = "did", default, skip_serializing_if = "Option::is_none")] - pub distinct_id: Option, + #[serde(rename = "did", default)] + pub distinct_id: Uuid, /// An optional logical clock. - #[serde(rename = "seq", default, skip_serializing_if = "Option::is_none")] - pub sequence: Option, + #[serde(rename = "seq", default = "default_sequence")] + pub sequence: u64, /// The timestamp of when the session change event was created. pub timestamp: DateTime, /// The timestamp of when the session itself started. @@ -136,6 +144,24 @@ pub struct SessionUpdate { pub attributes: SessionAttributes, } +impl SessionUpdate { + /// Parses a session update from JSON. + pub fn parse(payload: &[u8]) -> Result { + let mut session = serde_json::from_slice::(payload)?; + + if session.distinct_id.is_nil() { + session.distinct_id = session.session_id; + } + + Ok(session) + } + + /// Serializes a session update back into JSON. + pub fn serialize(&self) -> Result, serde_json::Error> { + serde_json::to_vec(self) + } +} + #[cfg(test)] mod tests { use super::*; @@ -151,6 +177,8 @@ mod tests { let output = r#"{ "id": "94ecab99-184a-45ee-ac18-6ed2c2c2e9f2", "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", + "did": "8333339f-5675-4f89-a9a0-1c935255ab58", + "seq": 4711, "timestamp": "2020-02-07T15:17:00Z", "started": "2020-02-07T14:16:00Z", "status": "ok" @@ -159,8 +187,8 @@ mod tests { let update = SessionUpdate { update_id: "94ecab99-184a-45ee-ac18-6ed2c2c2e9f2".parse().unwrap(), session_id: "8333339f-5675-4f89-a9a0-1c935255ab58".parse().unwrap(), - distinct_id: None, - sequence: None, + distinct_id: "8333339f-5675-4f89-a9a0-1c935255ab58".parse().unwrap(), + sequence: 4711, // this would be a timestamp instead timestamp: "2020-02-07T15:17:00Z".parse().unwrap(), started: "2020-02-07T14:16:00Z".parse().unwrap(), sample_rate: 1.0, @@ -173,6 +201,10 @@ mod tests { let mut parsed = serde_json::from_str::(json).unwrap(); parsed.update_id = update.update_id; + // Sequence is defaulted to the current timestamp. Override for snapshot. + assert_eq!(parsed.sequence, default_sequence()); + parsed.sequence = 4711; + assert_eq_dbg!(update, parsed); assert_eq_str!(output, serde_json::to_string_pretty(&update).unwrap()); } @@ -201,8 +233,8 @@ mod tests { let update = SessionUpdate { update_id: "94ecab99-184a-45ee-ac18-6ed2c2c2e9f2".parse().unwrap(), session_id: "8333339f-5675-4f89-a9a0-1c935255ab58".parse().unwrap(), - distinct_id: Some("b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf".parse().unwrap()), - sequence: Some(42), + distinct_id: "b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf".parse().unwrap(), + sequence: 42, timestamp: "2020-02-07T15:17:00Z".parse().unwrap(), started: "2020-02-07T14:16:00Z".parse().unwrap(), sample_rate: 2.0, diff --git a/relay-server/src/actors/events.rs b/relay-server/src/actors/events.rs index 94ff3bf876f..ebd3095eee8 100644 --- a/relay-server/src/actors/events.rs +++ b/relay-server/src/actors/events.rs @@ -1,5 +1,6 @@ use std::collections::BTreeMap; use std::rc::Rc; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Instant; @@ -7,7 +8,7 @@ use actix::fut::result; use actix::prelude::*; use failure::Fail; use futures::prelude::*; -use parking_lot::{Mutex, RwLock}; +use parking_lot::RwLock; use serde_json::Value as SerdeValue; use relay_common::{clone, metric, LogError}; @@ -16,7 +17,7 @@ use relay_general::pii::PiiProcessor; use relay_general::processor::{process_value, ProcessingState}; use relay_general::protocol::{ Breadcrumb, Csp, Event, EventId, ExpectCt, ExpectStaple, Hpkp, LenientString, Metrics, - SecurityReportType, SessionUpdate, Values, + SecurityReportType, Values, }; use relay_general::types::{Annotated, Array, Object, ProcessingAction, Value}; @@ -547,27 +548,6 @@ impl EventProcessor { }; } - // TODO(ja): Clean this up by introducing separate envelope types. - envelope.retain_items(|item| { - if item.ty() != ItemType::Session { - return true; - } - - let result = serde_json::from_slice(&item.payload()) - .and_then(|session: SessionUpdate| serde_json::to_vec(&session)); - - match result { - Ok(json) => { - item.set_payload(ContentType::Json, json); - true - } - Err(err) => { - log::debug!("dropped invalid session: {}", err); - false - } - } - }); - // Unreal endpoint puts the whole request into an item. This is done to make the endpoint // fast. For envelopes containing an Unreal request, we will look into the unreal item and // expand it so it can be consumed like any other event (e.g. `__sentry-event`). External @@ -909,7 +889,7 @@ impl Handler for EventManager { // This is used to add the respective organization id to the outcome emitted in the error // case. The organization id can only be obtained via the project state, which has not been // loaded at this time. - let org_id_for_err = Rc::new(Mutex::new(None::)); + let organization_id = Rc::new(AtomicU64::new(0)); metric!(set(RelaySets::UniqueProjects) = project_id as i64); @@ -927,8 +907,11 @@ impl Handler for EventManager { .map_err(ProcessingError::ScheduleFailed) .and_then(|result| result.map_err(ProcessingError::ProjectFailed)) })) - .and_then(clone!(org_id_for_err, |project_state| { - *org_id_for_err.lock() = project_state.organization_id; + .and_then(clone!(organization_id, |project_state| { + if let Some(id) = project_state.organization_id { + organization_id.store(id, Ordering::Relaxed); + } + processor .send(ProcessEnvelope { envelope, @@ -938,7 +921,7 @@ impl Handler for EventManager { .map_err(ProcessingError::ScheduleFailed) .flatten() })) - .and_then(clone!(captured_events, |processed| { + .and_then(clone!(captured_events, organization_id, |processed| { let envelope = processed.envelope; #[cfg(feature = "processing")] @@ -949,6 +932,7 @@ impl Handler for EventManager { .send(StoreEvent { envelope, start_time, + organization_id: organization_id.load(Ordering::Relaxed), project_id, }) .map_err(ProcessingError::ScheduleFailed) @@ -1118,10 +1102,15 @@ impl Handler for EventManager { } if let Some(outcome) = outcome_params { + let org_id = match organization_id.load(Ordering::Relaxed) { + 0 => None, + id => Some(id), + }; + outcome_producer.do_send(TrackOutcome { timestamp: Instant::now(), project_id: project_id, - org_id: *(org_id_for_err.lock()), + org_id, key_id: None, outcome, event_id, diff --git a/relay-server/src/actors/store.rs b/relay-server/src/actors/store.rs index b3e764b9af1..bd943e8faa1 100644 --- a/relay-server/src/actors/store.rs +++ b/relay-server/src/actors/store.rs @@ -14,9 +14,9 @@ use serde::{ser::Error, Serialize}; use rmp_serde::encode::Error as RmpError; -use relay_common::{metric, ProjectId, Uuid}; +use relay_common::{metric, LogError, ProjectId, Uuid}; use relay_config::{Config, KafkaTopic}; -use relay_general::protocol::{EventId, EventType}; +use relay_general::protocol::{EventId, EventType, SessionStatus, SessionUpdate}; use crate::envelope::{AttachmentType, Envelope, Item, ItemType}; use crate::metrics::RelayCounters; @@ -30,7 +30,9 @@ pub enum StoreError { #[fail(display = "failed to send kafka message")] SendFailed(#[cause] KafkaError), #[fail(display = "failed to serialize kafka message")] - SerializeFailed(#[cause] RmpError), + InvalidMsgPack(#[cause] RmpError), + #[fail(display = "failed to serialize json message")] + InvalidJson(#[cause] serde_json::Error), #[fail(display = "failed to store event because event id was missing")] NoEventId, } @@ -58,20 +60,11 @@ impl StoreForwarder { }) } - fn produce( - &self, - topic: KafkaTopic, - event_id: Option, - message: KafkaMessage, - ) -> Result<(), StoreError> { - let serialized = rmp_serde::to_vec_named(&message).map_err(StoreError::SerializeFailed)?; - let mut record = BaseRecord::to(self.config.kafka_topic_name(topic)).payload(&serialized); - - // Use the event id as partition routing key. - if let Some(ref event_id) = event_id { - // TODO: consider routing for event-id less envelopes. - record = record.key(event_id.0.as_bytes().as_ref()); - }; + fn produce(&self, topic: KafkaTopic, message: KafkaMessage) -> Result<(), StoreError> { + let serialized = message.serialize()?; + let record = BaseRecord::to(self.config.kafka_topic_name(topic)) + .key(message.key()) + .payload(&serialized); match self.producer.send(record) { Ok(_) => Ok(()), @@ -106,7 +99,7 @@ impl StoreForwarder { chunk_index, }); - self.produce(KafkaTopic::Attachments, Some(event_id), attachment_message)?; + self.produce(KafkaTopic::Attachments, attachment_message)?; offset += chunk_size; chunk_index += 1; } @@ -132,17 +125,59 @@ impl StoreForwarder { start_time: Instant, item: &Item, ) -> Result<(), StoreError> { - self.produce( - KafkaTopic::Attachments, - Some(event_id), - KafkaMessage::UserReport(UserReportKafkaMessage { - project_id, - payload: item.payload(), - start_time: instant_to_unix_timestamp(start_time), - }), - )?; + let message = KafkaMessage::UserReport(UserReportKafkaMessage { + project_id, + event_id, + payload: item.payload(), + start_time: instant_to_unix_timestamp(start_time), + }); - Ok(()) + self.produce(KafkaTopic::Attachments, message) + } + + fn produce_session( + &self, + org_id: u64, + project_id: ProjectId, + item: &Item, + ) -> Result<(), StoreError> { + let session = match SessionUpdate::parse(&item.payload()) { + Ok(session) => session, + Err(error) => { + // Skip gracefully here to allow sending other messages. + log::error!("failed to store session: {}", LogError(&error)); + return Ok(()); + } + }; + + let status = match session.status { + SessionStatus::Ok => 0, + SessionStatus::Exited => 1, + SessionStatus::Crashed => 2, + SessionStatus::Abnormal => 3, + }; + + let message = KafkaMessage::Session(SessionKafkaMessage { + org_id, + project_id, + event_id: session.update_id, + session_id: session.session_id, + distinct_id: session.distinct_id, + seq: session.sequence, + timestamp: session.timestamp.to_rfc3339(), + started: session.started.to_rfc3339(), + sample_rate: session.sample_rate, + duration: session.duration.unwrap_or(0.0), + status, + os: session.attributes.os, + os_version: session.attributes.os_version, + device_family: session.attributes.device_family, + release: session.attributes.release, + environment: session.attributes.environment, + retention_days: (), + }); + + self.produce(KafkaTopic::Sessions, message) } } @@ -210,7 +245,7 @@ struct EventKafkaMessage { /// Time at which the event was received by Relay. start_time: u64, /// The event id. - event_id: Option, + event_id: EventId, /// The project id for the current event. project_id: ProjectId, /// The client ip address. @@ -259,16 +294,67 @@ struct UserReportKafkaMessage { project_id: ProjectId, start_time: u64, payload: Bytes, + + // Used for KafkaMessage::key + #[serde(skip)] + event_id: EventId, +} + +#[derive(Debug, Serialize)] +struct SessionKafkaMessage { + org_id: u64, + project_id: u64, + event_id: Uuid, + session_id: Uuid, + distinct_id: Uuid, + seq: u64, + timestamp: String, + started: String, + sample_rate: f32, + duration: f64, + status: u8, + os: Option, + os_version: Option, + device_family: Option, + release: Option, + environment: Option, + retention_days: (), // TODO: Project config } /// An enum over all possible ingest messages. #[derive(Debug, Serialize)] #[serde(tag = "type", rename_all = "snake_case")] +#[allow(clippy::large_enum_variant)] enum KafkaMessage { Event(EventKafkaMessage), Attachment(AttachmentKafkaMessage), AttachmentChunk(AttachmentChunkKafkaMessage), UserReport(UserReportKafkaMessage), + Session(SessionKafkaMessage), +} + +impl KafkaMessage { + /// Returns the partitioning key for this kafka message determining. + fn key(&self) -> &[u8] { + let event_id = match self { + Self::Event(message) => &message.event_id.0, + Self::Attachment(message) => &message.event_id.0, + Self::AttachmentChunk(message) => &message.event_id.0, + Self::UserReport(message) => &message.event_id.0, + Self::Session(message) => &message.event_id, + }; + + event_id.as_bytes() + } + + /// Serializes the message into its binary format. + fn serialize(&self) -> Result, StoreError> { + if let KafkaMessage::Session(ref message) = *self { + return serde_json::to_vec(&message).map_err(StoreError::InvalidJson); + } + + rmp_serde::to_vec_named(&self).map_err(StoreError::InvalidMsgPack) + } } /// Message sent to the StoreForwarder containing an event @@ -277,6 +363,7 @@ pub struct StoreEvent { pub envelope: Envelope, pub start_time: Instant, pub project_id: ProjectId, + pub organization_id: u64, } impl Message for StoreEvent { @@ -298,6 +385,7 @@ impl Handler for StoreForwarder { envelope, start_time, project_id, + organization_id, } = message; let event_id = envelope.event_id(); @@ -332,6 +420,7 @@ impl Handler for StoreForwarder { item, )? } + ItemType::Session => self.produce_session(organization_id, project_id, item)?, _ => {} } } @@ -341,13 +430,13 @@ impl Handler for StoreForwarder { let event_message = KafkaMessage::Event(EventKafkaMessage { payload: event_item.payload(), start_time: instant_to_unix_timestamp(start_time), - event_id, + event_id: event_id.ok_or(StoreError::NoEventId)?, project_id, remote_addr: envelope.meta().client_addr().map(|addr| addr.to_string()), attachments, }); - self.produce(topic, event_id, event_message)?; + self.produce(topic, event_message)?; metric!( counter(RelayCounters::ProcessingEventProduced) += 1, event_type = "event" @@ -361,7 +450,7 @@ impl Handler for StoreForwarder { attachment, }); - self.produce(topic, event_id, attachment_message)?; + self.produce(topic, attachment_message)?; metric!( counter(RelayCounters::ProcessingEventProduced) += 1, event_type = "attachment" diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index e8695abf68e..a30880b3d14 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -589,17 +589,6 @@ impl Envelope { self.items.iter() } - /// Retains only the elements specified by the predicate. - /// - /// In other words, remove all elements e such that `f(&item)` returns `false`. This method - /// operates in place and preserves the order of the retained elements. - pub fn retain_items(&mut self, f: F) - where - F: FnMut(&mut Item) -> bool, - { - self.items.retain(f); - } - /// Returns the an option with a reference to the first item that matches /// the predicate, or None if the predicate is not matched by any item. pub fn get_item_by(&self, mut pred: F) -> Option<&Item> From ebeca343887dd6917691166d6b2ad86d4487c792 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Tue, 11 Feb 2020 13:52:00 +0100 Subject: [PATCH 09/13] ref(server): Rename StoreEvent to StoreEnvelope --- relay-server/src/actors/events.rs | 4 ++-- relay-server/src/actors/store.rs | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/relay-server/src/actors/events.rs b/relay-server/src/actors/events.rs index ebd3095eee8..2c9f7ed3198 100644 --- a/relay-server/src/actors/events.rs +++ b/relay-server/src/actors/events.rs @@ -35,7 +35,7 @@ use crate::utils::{self, FormDataIter, FutureExt, RedisPool}; #[cfg(feature = "processing")] use { - crate::actors::store::{StoreError, StoreEvent, StoreForwarder}, + crate::actors::store::{StoreEnvelope, StoreError, StoreForwarder}, crate::quotas::{QuotasError, RateLimiter}, crate::service::ServerErrorKind, failure::ResultExt, @@ -929,7 +929,7 @@ impl Handler for EventManager { if let Some(store_forwarder) = store_forwarder { log::trace!("sending envelope to kafka"); let future = store_forwarder - .send(StoreEvent { + .send(StoreEnvelope { envelope, start_time, organization_id: organization_id.load(Ordering::Relaxed), diff --git a/relay-server/src/actors/store.rs b/relay-server/src/actors/store.rs index bd943e8faa1..3d0046bca75 100644 --- a/relay-server/src/actors/store.rs +++ b/relay-server/src/actors/store.rs @@ -359,14 +359,14 @@ impl KafkaMessage { /// Message sent to the StoreForwarder containing an event #[derive(Clone, Debug)] -pub struct StoreEvent { +pub struct StoreEnvelope { pub envelope: Envelope, pub start_time: Instant, pub project_id: ProjectId, pub organization_id: u64, } -impl Message for StoreEvent { +impl Message for StoreEnvelope { type Result = Result<(), StoreError>; } @@ -377,11 +377,11 @@ fn is_slow_item(item: &Item) -> bool { item.ty() == ItemType::Attachment || item.ty() == ItemType::UserReport } -impl Handler for StoreForwarder { +impl Handler for StoreForwarder { type Result = Result<(), StoreError>; - fn handle(&mut self, message: StoreEvent, _ctx: &mut Self::Context) -> Self::Result { - let StoreEvent { + fn handle(&mut self, message: StoreEnvelope, _ctx: &mut Self::Context) -> Self::Result { + let StoreEnvelope { envelope, start_time, project_id, From 7dccd236e4e0970682f0d5a7ea92117aabfc8f87 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Tue, 11 Feb 2020 16:20:20 +0100 Subject: [PATCH 10/13] fix: Tests --- relay-general/src/protocol/session.rs | 4 ++-- relay-server/src/actors/events.rs | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/relay-general/src/protocol/session.rs b/relay-general/src/protocol/session.rs index ff7a8d5f842..438c4b740d0 100644 --- a/relay-general/src/protocol/session.rs +++ b/relay-general/src/protocol/session.rs @@ -198,7 +198,7 @@ mod tests { }; // Since the update_id is defaulted randomly, ensure that it matches. - let mut parsed = serde_json::from_str::(json).unwrap(); + let mut parsed = SessionUpdate::parse(json.as_bytes()).unwrap(); parsed.update_id = update.update_id; // Sequence is defaulted to the current timestamp. Override for snapshot. @@ -249,7 +249,7 @@ mod tests { }, }; - assert_eq_dbg!(update, serde_json::from_str(json).unwrap()); + assert_eq_dbg!(update, SessionUpdate::parse(json.as_bytes()).unwrap()); assert_eq_str!(json, serde_json::to_string_pretty(&update).unwrap()); } } diff --git a/relay-server/src/actors/events.rs b/relay-server/src/actors/events.rs index 2c9f7ed3198..9db8861280a 100644 --- a/relay-server/src/actors/events.rs +++ b/relay-server/src/actors/events.rs @@ -924,6 +924,9 @@ impl Handler for EventManager { .and_then(clone!(captured_events, organization_id, |processed| { let envelope = processed.envelope; + // avoid warnings since this is only used in the + let _ = organization_id; + #[cfg(feature = "processing")] { if let Some(store_forwarder) = store_forwarder { From c431c1650b2013c44e8697bf38c6398bae2e9018 Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Tue, 11 Feb 2020 20:41:55 +0100 Subject: [PATCH 11/13] fix(py): Fix envelope serialization --- py/sentry_relay/envelope.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/py/sentry_relay/envelope.py b/py/sentry_relay/envelope.py index b6a9cd7ff30..bad81ecf829 100644 --- a/py/sentry_relay/envelope.py +++ b/py/sentry_relay/envelope.py @@ -15,7 +15,7 @@ class Envelope(object): def __init__(self, headers=None, items=None): if headers is not None: headers = dict(headers) - self.headers = headers + self.headers = headers or {} if items is None: items = [] else: @@ -35,7 +35,7 @@ def __iter__(self): return iter(self.items) def serialize_into(self, f): - f.write(json.dumps(self.headers)) + f.write(json.dumps(self.headers).encode("utf-8")) f.write(b"\n") for item in self.items: item.serialize_into(f) @@ -76,7 +76,7 @@ def get_bytes(self): with open(self.path, "rb") as f: self.bytes = f.read() elif self.event is not None: - self.bytes = json.dumps(self.event) + self.bytes = json.dumps(self.event).encode("utf-8") else: self.bytes = b"" return self.bytes @@ -155,7 +155,7 @@ def serialize_into(self, f): headers = dict(self.headers) length, writer = self.payload.prepare_serialize() headers["length"] = length - f.write(json.dumps(headers)) + f.write(json.dumps(headers).encode("utf-8")) f.write(b"\n") writer(f) f.write(b"\n") From ab251f2e35cd96d5f85ee65845f6aa0f1abca35b Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Tue, 11 Feb 2020 20:42:39 +0100 Subject: [PATCH 12/13] ref(tests): Move Envelope include to fixtures module --- tests/integration/fixtures/__init__.py | 14 ++++++++++++++ tests/integration/fixtures/mini_sentry.py | 16 +--------------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/tests/integration/fixtures/__init__.py b/tests/integration/fixtures/__init__.py index 8a8f15696b2..90dce0441a3 100644 --- a/tests/integration/fixtures/__init__.py +++ b/tests/integration/fixtures/__init__.py @@ -1,11 +1,25 @@ import datetime +import json import time +import os import requests session = requests.session() +# HACK: import the envelope module from librelay without requiring to build the cabi +with open( + os.path.abspath(os.path.dirname(__file__)) + "/../../../py/sentry_relay/envelope.py" +) as f: + envelope_namespace = {} + eval(compile(f.read(), "envelope.py", "exec"), envelope_namespace) + +Envelope = envelope_namespace["Envelope"] +Item = envelope_namespace["Item"] +PayloadRef = envelope_namespace["PayloadRef"] + + class SentryLike(object): _healthcheck_passed = False diff --git a/tests/integration/fixtures/mini_sentry.py b/tests/integration/fixtures/mini_sentry.py index 7e191f529ed..910219d5428 100644 --- a/tests/integration/fixtures/mini_sentry.py +++ b/tests/integration/fixtures/mini_sentry.py @@ -1,4 +1,3 @@ -import os import gzip import json import uuid @@ -11,20 +10,7 @@ from flask import Flask, request as flask_request, jsonify from pytest_localserver.http import WSGIServer -from . import SentryLike - - -# HACK: import the envelope module from librelay without requiring to build the cabi -with open( - os.path.abspath(os.path.dirname(__file__)) + "/../../../py/sentry_relay/envelope.py" -) as f: - envelope_namespace = {} - eval(compile(f.read(), "envelope.py", "exec"), envelope_namespace) - - -Envelope = envelope_namespace["Envelope"] -Item = envelope_namespace["Item"] -PayloadRef = envelope_namespace["PayloadRef"] +from . import SentryLike, Envelope, Item class Sentry(SentryLike): From cb38f9bcf1d9eff203bd24406e0030c846c9fe3b Mon Sep 17 00:00:00 2001 From: Jan Michael Auer Date: Tue, 11 Feb 2020 20:43:12 +0100 Subject: [PATCH 13/13] test(integration): Add a test for session ingestion --- tests/integration/conftest.py | 1 + tests/integration/fixtures/__init__.py | 42 ++++++++++++++------ tests/integration/fixtures/processing.py | 30 ++++++++++++--- tests/integration/test_session.py | 49 ++++++++++++++++++++++++ 4 files changed, 105 insertions(+), 17 deletions(-) create mode 100644 tests/integration/test_session.py diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index b57154eec13..9fe5372543a 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -20,6 +20,7 @@ outcomes_consumer, transactions_consumer, attachments_consumer, + sessions_consumer, ) # noqa diff --git a/tests/integration/fixtures/__init__.py b/tests/integration/fixtures/__init__.py index 90dce0441a3..f5bbbf8b305 100644 --- a/tests/integration/fixtures/__init__.py +++ b/tests/integration/fixtures/__init__.py @@ -29,6 +29,20 @@ class SentryLike(object): def url(self): return "http://{}:{}".format(*self.server_address) + @property + def dsn(self): + """DSN for which you will find the events in self.captured_events""" + # bogus, we never check the DSN + return "http://{}@{}:{}/42".format(self.dsn_public_key, *self.server_address) + + @property + def auth_header(self): + return ( + "Sentry sentry_version=5, sentry_timestamp=1535376240291, " + "sentry_client=raven-node/2.6.3, " + "sentry_key={}".format(self.dsn_public_key) + ) + def _wait(self, path): backoff = 0.1 while True: @@ -51,12 +65,6 @@ def wait_relay_healthcheck(self): def __repr__(self): return "<{}({})>".format(self.__class__.__name__, repr(self.upstream)) - @property - def dsn(self): - """DSN for which you will find the events in self.captured_events""" - # bogus, we never check the DSN - return "http://{}@{}:{}/42".format(self.dsn_public_key, *self.server_address) - def iter_public_keys(self): try: yield self.public_key @@ -137,11 +145,7 @@ def send_event(self, project_id, payload=None, headers=None, legacy=False): headers = { "Content-Type": "application/octet-stream", - "X-Sentry-Auth": ( - "Sentry sentry_version=5, sentry_timestamp=1535376240291, " - "sentry_client=raven-node/2.6.3, " - "sentry_key={}".format(self.dsn_public_key) - ), + "X-Sentry-Auth": self.auth_header, **(headers or {}), } @@ -153,6 +157,22 @@ def send_event(self, project_id, payload=None, headers=None, legacy=False): response = self.post(url, headers=headers, **kwargs) response.raise_for_status() + def send_envelope(self, project_id, envelope, headers=None): + url = "/api/%s/store/" % project_id + headers = { + "Content-Type": "application/x-sentry-envelope", + "X-Sentry-Auth": self.auth_header, + **(headers or {}), + } + + response = self.post(url, headers=headers, data=envelope.serialize()) + response.raise_for_status() + + def send_session(self, project_id, payload): + session_item = Item(json.dumps(payload), {"type": "session"}) + envelope = Envelope(headers={"dsn": self.dsn}, items=[session_item]) + self.send_envelope(project_id, envelope) + def send_security_report( self, project_id, content_type, payload, release, environment ): diff --git a/tests/integration/fixtures/processing.py b/tests/integration/fixtures/processing.py index a2a1780bf59..3a7694e5e8b 100644 --- a/tests/integration/fixtures/processing.py +++ b/tests/integration/fixtures/processing.py @@ -49,6 +49,7 @@ def inner(options=None): "attachments": get_topic_name("attachments"), "transactions": get_topic_name("transactions"), "outcomes": get_topic_name("outcomes"), + "sessions": get_topic_name("sessions"), } if not processing.get("redis"): @@ -196,18 +197,35 @@ def attachments_consumer(kafka_consumer): return lambda: AttachmentsConsumer(kafka_consumer("attachments")) +@pytest.fixture +def sessions_consumer(kafka_consumer): + return lambda: SessionsConsumer(kafka_consumer("sessions")) + + +class SessionsConsumer(ConsumerBase): + def __init__(self, consumer): + self.consumer = consumer + + def get_session(self): + message = self.poll() + assert message is not None + assert message.error() is None + + return json.loads(message.value()) + + class EventsConsumer(ConsumerBase): def __init__(self, consumer): self.consumer = consumer def get_event(self): - event = self.poll() - assert event is not None - assert event.error() is None + message = self.poll() + assert message is not None + assert message.error() is None - v = msgpack.unpackb(event.value(), raw=False, use_list=False) - assert v["type"] == "event" - return json.loads(v["payload"].decode("utf8")), v + event = msgpack.unpackb(message.value(), raw=False, use_list=False) + assert event["type"] == "event" + return json.loads(event["payload"].decode("utf8")), event def get_message(self): message = self.poll() diff --git a/tests/integration/test_session.py b/tests/integration/test_session.py new file mode 100644 index 00000000000..316f46cadae --- /dev/null +++ b/tests/integration/test_session.py @@ -0,0 +1,49 @@ +def test_session_with_processing(mini_sentry, relay_with_processing, sessions_consumer): + relay = relay_with_processing() + relay.wait_relay_healthcheck() + + sessions_consumer = sessions_consumer() + + project_config = mini_sentry.project_configs[42] = mini_sentry.full_project_config() + relay.send_session( + 42, + { + "id": "94ecab99-184a-45ee-ac18-6ed2c2c2e9f2", + "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", + "did": "b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf", + "seq": 42, + "timestamp": "2020-02-07T15:17:00Z", + "started": "2020-02-07T14:16:00Z", + "sample_rate": 2.0, + "duration": 1947.49, + "status": "exited", + "attrs": { + "os": "iOS", + "os_version": "13.3.1", + "device_family": "iPhone12,3", + "release": "sentry-test@1.0.0", + "environment": "production", + }, + }, + ) + + session = sessions_consumer.get_session() + assert session == { + "org_id": 1, + "project_id": 42, + "event_id": "94ecab99-184a-45ee-ac18-6ed2c2c2e9f2", + "session_id": "8333339f-5675-4f89-a9a0-1c935255ab58", + "distinct_id": "b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf", + "seq": 42, + "timestamp": "2020-02-07T15:17:00+00:00", + "started": "2020-02-07T14:16:00+00:00", + "sample_rate": 2.0, + "duration": 1947.49, + "status": 1, + "os": "iOS", + "os_version": "13.3.1", + "device_family": "iPhone12,3", + "release": "sentry-test@1.0.0", + "environment": "production", + "retention_days": None, + }