diff --git a/py/sentry_relay/envelope.py b/py/sentry_relay/envelope.py index b6a9cd7ff30..bad81ecf829 100644 --- a/py/sentry_relay/envelope.py +++ b/py/sentry_relay/envelope.py @@ -15,7 +15,7 @@ class Envelope(object): def __init__(self, headers=None, items=None): if headers is not None: headers = dict(headers) - self.headers = headers + self.headers = headers or {} if items is None: items = [] else: @@ -35,7 +35,7 @@ def __iter__(self): return iter(self.items) def serialize_into(self, f): - f.write(json.dumps(self.headers)) + f.write(json.dumps(self.headers).encode("utf-8")) f.write(b"\n") for item in self.items: item.serialize_into(f) @@ -76,7 +76,7 @@ def get_bytes(self): with open(self.path, "rb") as f: self.bytes = f.read() elif self.event is not None: - self.bytes = json.dumps(self.event) + self.bytes = json.dumps(self.event).encode("utf-8") else: self.bytes = b"" return self.bytes @@ -155,7 +155,7 @@ def serialize_into(self, f): headers = dict(self.headers) length, writer = self.payload.prepare_serialize() headers["length"] = length - f.write(json.dumps(headers)) + f.write(json.dumps(headers).encode("utf-8")) f.write(b"\n") writer(f) f.write(b"\n") diff --git a/relay-config/src/config.rs b/relay-config/src/config.rs index 4e2ffcb8b1e..ef546b40799 100644 --- a/relay-config/src/config.rs +++ b/relay-config/src/config.rs @@ -398,12 +398,15 @@ pub enum KafkaTopic { Attachments, /// Transaction events topic. Transactions, - /// All outcomes are sent through this channel. + /// Shared outcomes topic for Relay and Sentry. Outcomes, + /// Session health updates. + Sessions, } /// Configuration for topics. #[derive(Serialize, Deserialize, Debug)] +#[serde(default)] pub struct TopicNames { /// Simple events topic name. pub events: String, @@ -413,6 +416,20 @@ pub struct TopicNames { pub transactions: String, /// Event outcomes topic name. pub outcomes: String, + /// Session health topic name. + pub sessions: String, +} + +impl Default for TopicNames { + fn default() -> Self { + Self { + events: "ingest-events".to_owned(), + attachments: "ingest-attachments".to_owned(), + transactions: "ingest-transactions".to_owned(), + outcomes: "outcomes".to_owned(), + sessions: "ingest-sessions".to_owned(), + } + } } /// A name value pair of Kafka config parameter. @@ -461,6 +478,7 @@ pub struct Processing { /// Kafka producer configurations. pub kafka_config: Vec, /// Kafka topic names. + #[serde(default)] pub topics: TopicNames, /// Redis hosts to connect to for storing state for rate limits. pub redis: Option, @@ -484,12 +502,7 @@ impl Default for Processing { max_secs_in_future: 0, max_secs_in_past: 0, kafka_config: Vec::new(), - topics: TopicNames { - events: String::new(), - attachments: String::new(), - transactions: String::new(), - outcomes: String::new(), - }, + topics: TopicNames::default(), redis: Some(Redis::default()), attachment_chunk_size: default_chunk_size(), projectconfig_cache_prefix: default_projectconfig_cache_prefix(), @@ -977,6 +990,7 @@ impl Config { KafkaTopic::Events => topics.events.as_str(), KafkaTopic::Transactions => topics.transactions.as_str(), KafkaTopic::Outcomes => topics.outcomes.as_str(), + KafkaTopic::Sessions => topics.sessions.as_str(), } } diff --git a/relay-general/src/protocol/mod.rs b/relay-general/src/protocol/mod.rs index e97ce9dd60b..741ca4e474e 100644 --- a/relay-general/src/protocol/mod.rs +++ b/relay-general/src/protocol/mod.rs @@ -12,6 +12,7 @@ mod mechanism; mod metrics; mod request; mod security_report; +mod session; mod span; mod stacktrace; mod tags; @@ -42,6 +43,7 @@ pub use self::mechanism::{CError, MachException, Mechanism, MechanismMeta, Posix pub use self::metrics::Metrics; pub use self::request::{Cookies, HeaderName, HeaderValue, Headers, Query, Request}; pub use self::security_report::{Csp, ExpectCt, ExpectStaple, Hpkp, SecurityReportType}; +pub use self::session::{ParseSessionStatusError, SessionAttributes, SessionStatus, SessionUpdate}; pub use self::span::Span; pub use self::stacktrace::{Frame, FrameData, FrameVars, RawStacktrace, Stacktrace}; pub use self::tags::{TagEntry, Tags}; diff --git a/relay-general/src/protocol/session.rs b/relay-general/src/protocol/session.rs new file mode 100644 index 00000000000..438c4b740d0 --- /dev/null +++ b/relay-general/src/protocol/session.rs @@ -0,0 +1,255 @@ +use std::fmt; +use std::str::FromStr; +use std::time::SystemTime; + +use chrono::{DateTime, Utc}; +use failure::Fail; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +/// The type of session event we're dealing with. +#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum SessionStatus { + /// The session is healthy. + /// + /// This does not necessarily indicate that the session is still active. + Ok, + /// The session terminated normally. + Exited, + /// The session resulted in an application crash. + Crashed, + /// The session an unexpected abrupt termination (not crashing). + Abnormal, +} + +impl Default for SessionStatus { + fn default() -> Self { + Self::Ok + } +} + +/// An error used when parsing `SessionStatus`. +#[derive(Debug, Fail)] +#[fail(display = "invalid session status")] +pub struct ParseSessionStatusError; + +impl FromStr for SessionStatus { + type Err = ParseSessionStatusError; + + fn from_str(string: &str) -> Result { + Ok(match string { + "ok" => SessionStatus::Ok, + "crashed" => SessionStatus::Crashed, + "abnormal" => SessionStatus::Abnormal, + "exited" => SessionStatus::Exited, + _ => return Err(ParseSessionStatusError), + }) + } +} + +impl fmt::Display for SessionStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + SessionStatus::Ok => write!(f, "ok"), + SessionStatus::Crashed => write!(f, "crashed"), + SessionStatus::Abnormal => write!(f, "abnormal"), + SessionStatus::Exited => write!(f, "exited"), + } + } +} + +fn is_empty_string(opt: &Option) -> bool { + opt.as_ref().map_or(true, |s| s.is_empty()) +} + +/// Additional attributes for Sessions. +#[serde(default)] +#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] +pub struct SessionAttributes { + /// The operating system name, corresponding to the os context. + pub os: Option, + /// The full operating system version string, corresponding to the os context. + pub os_version: Option, + /// The device famility identifier, corresponding to the device context. + pub device_family: Option, + /// The release version string. + pub release: Option, + /// The environment identifier. + pub environment: Option, +} + +impl SessionAttributes { + fn is_empty(&self) -> bool { + is_empty_string(&self.os) + && is_empty_string(&self.os_version) + && is_empty_string(&self.device_family) + && is_empty_string(&self.release) + && is_empty_string(&self.environment) + } +} + +fn default_sequence() -> u64 { + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() +} + +fn default_sample_rate() -> f32 { + 1.0 +} + +#[allow(clippy::trivially_copy_pass_by_ref)] +fn is_default_sample_rate(rate: &f32) -> bool { + (*rate - default_sample_rate()) < std::f32::EPSILON +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct SessionUpdate { + /// Unique identifier of this update event. + #[serde(rename = "id", default = "Uuid::new_v4")] + pub update_id: Uuid, + /// The session identifier. + #[serde(rename = "sid")] + pub session_id: Uuid, + /// The distinct identifier. + #[serde(rename = "did", default)] + pub distinct_id: Uuid, + /// An optional logical clock. + #[serde(rename = "seq", default = "default_sequence")] + pub sequence: u64, + /// The timestamp of when the session change event was created. + pub timestamp: DateTime, + /// The timestamp of when the session itself started. + pub started: DateTime, + /// The sample rate. + #[serde( + default = "default_sample_rate", + skip_serializing_if = "is_default_sample_rate" + )] + pub sample_rate: f32, + /// An optional duration of the session so far. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub duration: Option, + /// The status of the session. + #[serde(default)] + pub status: SessionStatus, + /// The session event attributes. + #[serde( + rename = "attrs", + default, + skip_serializing_if = "SessionAttributes::is_empty" + )] + pub attributes: SessionAttributes, +} + +impl SessionUpdate { + /// Parses a session update from JSON. + pub fn parse(payload: &[u8]) -> Result { + let mut session = serde_json::from_slice::(payload)?; + + if session.distinct_id.is_nil() { + session.distinct_id = session.session_id; + } + + Ok(session) + } + + /// Serializes a session update back into JSON. + pub fn serialize(&self) -> Result, serde_json::Error> { + serde_json::to_vec(self) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_session_default_values() { + let json = r#"{ + "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", + "timestamp": "2020-02-07T15:17:00Z", + "started": "2020-02-07T14:16:00Z" +}"#; + + let output = r#"{ + "id": "94ecab99-184a-45ee-ac18-6ed2c2c2e9f2", + "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", + "did": "8333339f-5675-4f89-a9a0-1c935255ab58", + "seq": 4711, + "timestamp": "2020-02-07T15:17:00Z", + "started": "2020-02-07T14:16:00Z", + "status": "ok" +}"#; + + let update = SessionUpdate { + update_id: "94ecab99-184a-45ee-ac18-6ed2c2c2e9f2".parse().unwrap(), + session_id: "8333339f-5675-4f89-a9a0-1c935255ab58".parse().unwrap(), + distinct_id: "8333339f-5675-4f89-a9a0-1c935255ab58".parse().unwrap(), + sequence: 4711, // this would be a timestamp instead + timestamp: "2020-02-07T15:17:00Z".parse().unwrap(), + started: "2020-02-07T14:16:00Z".parse().unwrap(), + sample_rate: 1.0, + duration: None, + status: SessionStatus::Ok, + attributes: SessionAttributes::default(), + }; + + // Since the update_id is defaulted randomly, ensure that it matches. + let mut parsed = SessionUpdate::parse(json.as_bytes()).unwrap(); + parsed.update_id = update.update_id; + + // Sequence is defaulted to the current timestamp. Override for snapshot. + assert_eq!(parsed.sequence, default_sequence()); + parsed.sequence = 4711; + + assert_eq_dbg!(update, parsed); + assert_eq_str!(output, serde_json::to_string_pretty(&update).unwrap()); + } + + #[test] + fn test_session_roundtrip() { + let json = r#"{ + "id": "94ecab99-184a-45ee-ac18-6ed2c2c2e9f2", + "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", + "did": "b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf", + "seq": 42, + "timestamp": "2020-02-07T15:17:00Z", + "started": "2020-02-07T14:16:00Z", + "sample_rate": 2.0, + "duration": 1947.49, + "status": "exited", + "attrs": { + "os": "iOS", + "os_version": "13.3.1", + "device_family": "iPhone12,3", + "release": "sentry-test@1.0.0", + "environment": "production" + } +}"#; + + let update = SessionUpdate { + update_id: "94ecab99-184a-45ee-ac18-6ed2c2c2e9f2".parse().unwrap(), + session_id: "8333339f-5675-4f89-a9a0-1c935255ab58".parse().unwrap(), + distinct_id: "b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf".parse().unwrap(), + sequence: 42, + timestamp: "2020-02-07T15:17:00Z".parse().unwrap(), + started: "2020-02-07T14:16:00Z".parse().unwrap(), + sample_rate: 2.0, + duration: Some(1947.49), + status: SessionStatus::Exited, + attributes: SessionAttributes { + os: Some("iOS".to_owned()), + os_version: Some("13.3.1".to_owned()), + device_family: Some("iPhone12,3".to_owned()), + release: Some("sentry-test@1.0.0".to_owned()), + environment: Some("production".to_owned()), + }, + }; + + assert_eq_dbg!(update, SessionUpdate::parse(json.as_bytes()).unwrap()); + assert_eq_str!(json, serde_json::to_string_pretty(&update).unwrap()); + } +} diff --git a/relay-general/src/store/schema.rs b/relay-general/src/store/schema.rs index c8447cb7dae..9869a58b17c 100644 --- a/relay-general/src/store/schema.rs +++ b/relay-general/src/store/schema.rs @@ -214,7 +214,6 @@ mod tests { #[test] fn test_mechanism_missing_attributes() { use crate::protocol::{CError, MachException, Mechanism, MechanismMeta, PosixSignal}; - use crate::types::ErrorKind; let mut mechanism = Annotated::new(Mechanism { ty: Annotated::new("mytype".to_string()), diff --git a/relay-server/src/actors/events.rs b/relay-server/src/actors/events.rs index f87d3d83ee5..9db8861280a 100644 --- a/relay-server/src/actors/events.rs +++ b/relay-server/src/actors/events.rs @@ -1,5 +1,6 @@ use std::collections::BTreeMap; use std::rc::Rc; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::time::Instant; @@ -7,7 +8,7 @@ use actix::fut::result; use actix::prelude::*; use failure::Fail; use futures::prelude::*; -use parking_lot::{Mutex, RwLock}; +use parking_lot::RwLock; use serde_json::Value as SerdeValue; use relay_common::{clone, metric, LogError}; @@ -34,7 +35,7 @@ use crate::utils::{self, FormDataIter, FutureExt, RedisPool}; #[cfg(feature = "processing")] use { - crate::actors::store::{StoreError, StoreEvent, StoreForwarder}, + crate::actors::store::{StoreEnvelope, StoreError, StoreForwarder}, crate::quotas::{QuotasError, RateLimiter}, crate::service::ServerErrorKind, failure::ResultExt, @@ -525,10 +526,16 @@ impl EventProcessor { // These may be forwarded to upstream / store: ItemType::Attachment => false, ItemType::UserReport => false, + + // session data is never considered as part of deduplication + ItemType::Session => false, } } - fn process(&self, message: ProcessEvent) -> Result { + fn process( + &self, + message: ProcessEnvelope, + ) -> Result { let mut envelope = message.envelope; macro_rules! if_processing { @@ -614,7 +621,7 @@ impl EventProcessor { // envelope only contains attachments or user reports. We should not run filters or // apply rate limits. log::trace!("no event for envelope, skipping processing"); - return Ok(ProcessEventResponse { envelope }); + return Ok(ProcessEnvelopeResponse { envelope }); } if_processing! { @@ -650,7 +657,7 @@ impl EventProcessor { } envelope.add_item(event_item); - Ok(ProcessEventResponse { envelope }) + Ok(ProcessEnvelopeResponse { envelope }) } } @@ -658,25 +665,25 @@ impl Actor for EventProcessor { type Context = SyncContext; } -struct ProcessEvent { +struct ProcessEnvelope { pub envelope: Envelope, pub project_state: Arc, pub start_time: Instant, } #[cfg_attr(not(feature = "processing"), allow(dead_code))] -struct ProcessEventResponse { +struct ProcessEnvelopeResponse { envelope: Envelope, } -impl Message for ProcessEvent { - type Result = Result; +impl Message for ProcessEnvelope { + type Result = Result; } -impl Handler for EventProcessor { - type Result = Result; +impl Handler for EventProcessor { + type Result = Result; - fn handle(&mut self, message: ProcessEvent, _context: &mut Self::Context) -> Self::Result { + fn handle(&mut self, message: ProcessEnvelope, _context: &mut Self::Context) -> Self::Result { metric!(timer(RelayTimers::EventWaitTime) = message.start_time.elapsed()); metric!(timer(RelayTimers::EventProcessingTime), { self.process(message) @@ -882,7 +889,7 @@ impl Handler for EventManager { // This is used to add the respective organization id to the outcome emitted in the error // case. The organization id can only be obtained via the project state, which has not been // loaded at this time. - let org_id_for_err = Rc::new(Mutex::new(None::)); + let organization_id = Rc::new(AtomicU64::new(0)); metric!(set(RelaySets::UniqueProjects) = project_id as i64); @@ -900,10 +907,13 @@ impl Handler for EventManager { .map_err(ProcessingError::ScheduleFailed) .and_then(|result| result.map_err(ProcessingError::ProjectFailed)) })) - .and_then(clone!(org_id_for_err, |project_state| { - *org_id_for_err.lock() = project_state.organization_id; + .and_then(clone!(organization_id, |project_state| { + if let Some(id) = project_state.organization_id { + organization_id.store(id, Ordering::Relaxed); + } + processor - .send(ProcessEvent { + .send(ProcessEnvelope { envelope, project_state, start_time, @@ -911,17 +921,21 @@ impl Handler for EventManager { .map_err(ProcessingError::ScheduleFailed) .flatten() })) - .and_then(clone!(captured_events, |processed| { + .and_then(clone!(captured_events, organization_id, |processed| { let envelope = processed.envelope; + // avoid warnings since this is only used in the + let _ = organization_id; + #[cfg(feature = "processing")] { if let Some(store_forwarder) = store_forwarder { log::trace!("sending envelope to kafka"); let future = store_forwarder - .send(StoreEvent { + .send(StoreEnvelope { envelope, start_time, + organization_id: organization_id.load(Ordering::Relaxed), project_id, }) .map_err(ProcessingError::ScheduleFailed) @@ -1091,10 +1105,15 @@ impl Handler for EventManager { } if let Some(outcome) = outcome_params { + let org_id = match organization_id.load(Ordering::Relaxed) { + 0 => None, + id => Some(id), + }; + outcome_producer.do_send(TrackOutcome { timestamp: Instant::now(), project_id: project_id, - org_id: *(org_id_for_err.lock()), + org_id, key_id: None, outcome, event_id, diff --git a/relay-server/src/actors/store.rs b/relay-server/src/actors/store.rs index b3e764b9af1..3d0046bca75 100644 --- a/relay-server/src/actors/store.rs +++ b/relay-server/src/actors/store.rs @@ -14,9 +14,9 @@ use serde::{ser::Error, Serialize}; use rmp_serde::encode::Error as RmpError; -use relay_common::{metric, ProjectId, Uuid}; +use relay_common::{metric, LogError, ProjectId, Uuid}; use relay_config::{Config, KafkaTopic}; -use relay_general::protocol::{EventId, EventType}; +use relay_general::protocol::{EventId, EventType, SessionStatus, SessionUpdate}; use crate::envelope::{AttachmentType, Envelope, Item, ItemType}; use crate::metrics::RelayCounters; @@ -30,7 +30,9 @@ pub enum StoreError { #[fail(display = "failed to send kafka message")] SendFailed(#[cause] KafkaError), #[fail(display = "failed to serialize kafka message")] - SerializeFailed(#[cause] RmpError), + InvalidMsgPack(#[cause] RmpError), + #[fail(display = "failed to serialize json message")] + InvalidJson(#[cause] serde_json::Error), #[fail(display = "failed to store event because event id was missing")] NoEventId, } @@ -58,20 +60,11 @@ impl StoreForwarder { }) } - fn produce( - &self, - topic: KafkaTopic, - event_id: Option, - message: KafkaMessage, - ) -> Result<(), StoreError> { - let serialized = rmp_serde::to_vec_named(&message).map_err(StoreError::SerializeFailed)?; - let mut record = BaseRecord::to(self.config.kafka_topic_name(topic)).payload(&serialized); - - // Use the event id as partition routing key. - if let Some(ref event_id) = event_id { - // TODO: consider routing for event-id less envelopes. - record = record.key(event_id.0.as_bytes().as_ref()); - }; + fn produce(&self, topic: KafkaTopic, message: KafkaMessage) -> Result<(), StoreError> { + let serialized = message.serialize()?; + let record = BaseRecord::to(self.config.kafka_topic_name(topic)) + .key(message.key()) + .payload(&serialized); match self.producer.send(record) { Ok(_) => Ok(()), @@ -106,7 +99,7 @@ impl StoreForwarder { chunk_index, }); - self.produce(KafkaTopic::Attachments, Some(event_id), attachment_message)?; + self.produce(KafkaTopic::Attachments, attachment_message)?; offset += chunk_size; chunk_index += 1; } @@ -132,17 +125,59 @@ impl StoreForwarder { start_time: Instant, item: &Item, ) -> Result<(), StoreError> { - self.produce( - KafkaTopic::Attachments, - Some(event_id), - KafkaMessage::UserReport(UserReportKafkaMessage { - project_id, - payload: item.payload(), - start_time: instant_to_unix_timestamp(start_time), - }), - )?; + let message = KafkaMessage::UserReport(UserReportKafkaMessage { + project_id, + event_id, + payload: item.payload(), + start_time: instant_to_unix_timestamp(start_time), + }); - Ok(()) + self.produce(KafkaTopic::Attachments, message) + } + + fn produce_session( + &self, + org_id: u64, + project_id: ProjectId, + item: &Item, + ) -> Result<(), StoreError> { + let session = match SessionUpdate::parse(&item.payload()) { + Ok(session) => session, + Err(error) => { + // Skip gracefully here to allow sending other messages. + log::error!("failed to store session: {}", LogError(&error)); + return Ok(()); + } + }; + + let status = match session.status { + SessionStatus::Ok => 0, + SessionStatus::Exited => 1, + SessionStatus::Crashed => 2, + SessionStatus::Abnormal => 3, + }; + + let message = KafkaMessage::Session(SessionKafkaMessage { + org_id, + project_id, + event_id: session.update_id, + session_id: session.session_id, + distinct_id: session.distinct_id, + seq: session.sequence, + timestamp: session.timestamp.to_rfc3339(), + started: session.started.to_rfc3339(), + sample_rate: session.sample_rate, + duration: session.duration.unwrap_or(0.0), + status, + os: session.attributes.os, + os_version: session.attributes.os_version, + device_family: session.attributes.device_family, + release: session.attributes.release, + environment: session.attributes.environment, + retention_days: (), + }); + + self.produce(KafkaTopic::Sessions, message) } } @@ -210,7 +245,7 @@ struct EventKafkaMessage { /// Time at which the event was received by Relay. start_time: u64, /// The event id. - event_id: Option, + event_id: EventId, /// The project id for the current event. project_id: ProjectId, /// The client ip address. @@ -259,27 +294,79 @@ struct UserReportKafkaMessage { project_id: ProjectId, start_time: u64, payload: Bytes, + + // Used for KafkaMessage::key + #[serde(skip)] + event_id: EventId, +} + +#[derive(Debug, Serialize)] +struct SessionKafkaMessage { + org_id: u64, + project_id: u64, + event_id: Uuid, + session_id: Uuid, + distinct_id: Uuid, + seq: u64, + timestamp: String, + started: String, + sample_rate: f32, + duration: f64, + status: u8, + os: Option, + os_version: Option, + device_family: Option, + release: Option, + environment: Option, + retention_days: (), // TODO: Project config } /// An enum over all possible ingest messages. #[derive(Debug, Serialize)] #[serde(tag = "type", rename_all = "snake_case")] +#[allow(clippy::large_enum_variant)] enum KafkaMessage { Event(EventKafkaMessage), Attachment(AttachmentKafkaMessage), AttachmentChunk(AttachmentChunkKafkaMessage), UserReport(UserReportKafkaMessage), + Session(SessionKafkaMessage), +} + +impl KafkaMessage { + /// Returns the partitioning key for this kafka message determining. + fn key(&self) -> &[u8] { + let event_id = match self { + Self::Event(message) => &message.event_id.0, + Self::Attachment(message) => &message.event_id.0, + Self::AttachmentChunk(message) => &message.event_id.0, + Self::UserReport(message) => &message.event_id.0, + Self::Session(message) => &message.event_id, + }; + + event_id.as_bytes() + } + + /// Serializes the message into its binary format. + fn serialize(&self) -> Result, StoreError> { + if let KafkaMessage::Session(ref message) = *self { + return serde_json::to_vec(&message).map_err(StoreError::InvalidJson); + } + + rmp_serde::to_vec_named(&self).map_err(StoreError::InvalidMsgPack) + } } /// Message sent to the StoreForwarder containing an event #[derive(Clone, Debug)] -pub struct StoreEvent { +pub struct StoreEnvelope { pub envelope: Envelope, pub start_time: Instant, pub project_id: ProjectId, + pub organization_id: u64, } -impl Message for StoreEvent { +impl Message for StoreEnvelope { type Result = Result<(), StoreError>; } @@ -290,14 +377,15 @@ fn is_slow_item(item: &Item) -> bool { item.ty() == ItemType::Attachment || item.ty() == ItemType::UserReport } -impl Handler for StoreForwarder { +impl Handler for StoreForwarder { type Result = Result<(), StoreError>; - fn handle(&mut self, message: StoreEvent, _ctx: &mut Self::Context) -> Self::Result { - let StoreEvent { + fn handle(&mut self, message: StoreEnvelope, _ctx: &mut Self::Context) -> Self::Result { + let StoreEnvelope { envelope, start_time, project_id, + organization_id, } = message; let event_id = envelope.event_id(); @@ -332,6 +420,7 @@ impl Handler for StoreForwarder { item, )? } + ItemType::Session => self.produce_session(organization_id, project_id, item)?, _ => {} } } @@ -341,13 +430,13 @@ impl Handler for StoreForwarder { let event_message = KafkaMessage::Event(EventKafkaMessage { payload: event_item.payload(), start_time: instant_to_unix_timestamp(start_time), - event_id, + event_id: event_id.ok_or(StoreError::NoEventId)?, project_id, remote_addr: envelope.meta().client_addr().map(|addr| addr.to_string()), attachments, }); - self.produce(topic, event_id, event_message)?; + self.produce(topic, event_message)?; metric!( counter(RelayCounters::ProcessingEventProduced) += 1, event_type = "event" @@ -361,7 +450,7 @@ impl Handler for StoreForwarder { attachment, }); - self.produce(topic, event_id, attachment_message)?; + self.produce(topic, attachment_message)?; metric!( counter(RelayCounters::ProcessingEventProduced) += 1, event_type = "attachment" diff --git a/relay-server/src/endpoints/store.rs b/relay-server/src/endpoints/store.rs index 6dd46e70d8a..3e785e0ee6e 100644 --- a/relay-server/src/endpoints/store.rs +++ b/relay-server/src/endpoints/store.rs @@ -82,6 +82,7 @@ fn extract_envelope( #[derive(Serialize)] struct StoreResponse { + #[serde(skip_serializing_if = "Option::is_none")] id: Option, } diff --git a/relay-server/src/envelope.rs b/relay-server/src/envelope.rs index 7ac63e21438..a30880b3d14 100644 --- a/relay-server/src/envelope.rs +++ b/relay-server/src/envelope.rs @@ -83,6 +83,8 @@ pub enum ItemType { UnrealReport, /// User feedback encoded as JSON. UserReport, + /// Session update data. + Session, } impl fmt::Display for ItemType { @@ -94,6 +96,7 @@ impl fmt::Display for ItemType { Self::SecurityReport => write!(f, "security report"), Self::UnrealReport => write!(f, "unreal report"), Self::UserReport => write!(f, "user feedback"), + Self::Session => write!(f, "session"), } } } @@ -405,7 +408,10 @@ impl Item { // Form data items may contain partial event payloads, but those are only ever valid if they // occur together with an explicit event item, such as a minidump or apple crash report. For // this reason, FormData alone does not constitute an event item. - ItemType::FormData | ItemType::UserReport => false, + ItemType::FormData => false, + + // The remaining item types cannot carry event payloads. + ItemType::UserReport | ItemType::Session => false, } } @@ -413,10 +419,14 @@ impl Item { /// /// This is true for all items except session health events. pub fn requires_event(&self) -> bool { - // NOTE: Item types should be explicitly white listed here. match self.ty() { - // TODO: whitelist item types - _ => true, + ItemType::Event => true, + ItemType::Attachment => true, + ItemType::FormData => true, + ItemType::SecurityReport => true, + ItemType::UnrealReport => true, + ItemType::UserReport => true, + ItemType::Session => false, } } } diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index b57154eec13..9fe5372543a 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -20,6 +20,7 @@ outcomes_consumer, transactions_consumer, attachments_consumer, + sessions_consumer, ) # noqa diff --git a/tests/integration/fixtures/__init__.py b/tests/integration/fixtures/__init__.py index 8a8f15696b2..f5bbbf8b305 100644 --- a/tests/integration/fixtures/__init__.py +++ b/tests/integration/fixtures/__init__.py @@ -1,11 +1,25 @@ import datetime +import json import time +import os import requests session = requests.session() +# HACK: import the envelope module from librelay without requiring to build the cabi +with open( + os.path.abspath(os.path.dirname(__file__)) + "/../../../py/sentry_relay/envelope.py" +) as f: + envelope_namespace = {} + eval(compile(f.read(), "envelope.py", "exec"), envelope_namespace) + +Envelope = envelope_namespace["Envelope"] +Item = envelope_namespace["Item"] +PayloadRef = envelope_namespace["PayloadRef"] + + class SentryLike(object): _healthcheck_passed = False @@ -15,6 +29,20 @@ class SentryLike(object): def url(self): return "http://{}:{}".format(*self.server_address) + @property + def dsn(self): + """DSN for which you will find the events in self.captured_events""" + # bogus, we never check the DSN + return "http://{}@{}:{}/42".format(self.dsn_public_key, *self.server_address) + + @property + def auth_header(self): + return ( + "Sentry sentry_version=5, sentry_timestamp=1535376240291, " + "sentry_client=raven-node/2.6.3, " + "sentry_key={}".format(self.dsn_public_key) + ) + def _wait(self, path): backoff = 0.1 while True: @@ -37,12 +65,6 @@ def wait_relay_healthcheck(self): def __repr__(self): return "<{}({})>".format(self.__class__.__name__, repr(self.upstream)) - @property - def dsn(self): - """DSN for which you will find the events in self.captured_events""" - # bogus, we never check the DSN - return "http://{}@{}:{}/42".format(self.dsn_public_key, *self.server_address) - def iter_public_keys(self): try: yield self.public_key @@ -123,11 +145,7 @@ def send_event(self, project_id, payload=None, headers=None, legacy=False): headers = { "Content-Type": "application/octet-stream", - "X-Sentry-Auth": ( - "Sentry sentry_version=5, sentry_timestamp=1535376240291, " - "sentry_client=raven-node/2.6.3, " - "sentry_key={}".format(self.dsn_public_key) - ), + "X-Sentry-Auth": self.auth_header, **(headers or {}), } @@ -139,6 +157,22 @@ def send_event(self, project_id, payload=None, headers=None, legacy=False): response = self.post(url, headers=headers, **kwargs) response.raise_for_status() + def send_envelope(self, project_id, envelope, headers=None): + url = "/api/%s/store/" % project_id + headers = { + "Content-Type": "application/x-sentry-envelope", + "X-Sentry-Auth": self.auth_header, + **(headers or {}), + } + + response = self.post(url, headers=headers, data=envelope.serialize()) + response.raise_for_status() + + def send_session(self, project_id, payload): + session_item = Item(json.dumps(payload), {"type": "session"}) + envelope = Envelope(headers={"dsn": self.dsn}, items=[session_item]) + self.send_envelope(project_id, envelope) + def send_security_report( self, project_id, content_type, payload, release, environment ): diff --git a/tests/integration/fixtures/mini_sentry.py b/tests/integration/fixtures/mini_sentry.py index 7e191f529ed..910219d5428 100644 --- a/tests/integration/fixtures/mini_sentry.py +++ b/tests/integration/fixtures/mini_sentry.py @@ -1,4 +1,3 @@ -import os import gzip import json import uuid @@ -11,20 +10,7 @@ from flask import Flask, request as flask_request, jsonify from pytest_localserver.http import WSGIServer -from . import SentryLike - - -# HACK: import the envelope module from librelay without requiring to build the cabi -with open( - os.path.abspath(os.path.dirname(__file__)) + "/../../../py/sentry_relay/envelope.py" -) as f: - envelope_namespace = {} - eval(compile(f.read(), "envelope.py", "exec"), envelope_namespace) - - -Envelope = envelope_namespace["Envelope"] -Item = envelope_namespace["Item"] -PayloadRef = envelope_namespace["PayloadRef"] +from . import SentryLike, Envelope, Item class Sentry(SentryLike): diff --git a/tests/integration/fixtures/processing.py b/tests/integration/fixtures/processing.py index a2a1780bf59..3a7694e5e8b 100644 --- a/tests/integration/fixtures/processing.py +++ b/tests/integration/fixtures/processing.py @@ -49,6 +49,7 @@ def inner(options=None): "attachments": get_topic_name("attachments"), "transactions": get_topic_name("transactions"), "outcomes": get_topic_name("outcomes"), + "sessions": get_topic_name("sessions"), } if not processing.get("redis"): @@ -196,18 +197,35 @@ def attachments_consumer(kafka_consumer): return lambda: AttachmentsConsumer(kafka_consumer("attachments")) +@pytest.fixture +def sessions_consumer(kafka_consumer): + return lambda: SessionsConsumer(kafka_consumer("sessions")) + + +class SessionsConsumer(ConsumerBase): + def __init__(self, consumer): + self.consumer = consumer + + def get_session(self): + message = self.poll() + assert message is not None + assert message.error() is None + + return json.loads(message.value()) + + class EventsConsumer(ConsumerBase): def __init__(self, consumer): self.consumer = consumer def get_event(self): - event = self.poll() - assert event is not None - assert event.error() is None + message = self.poll() + assert message is not None + assert message.error() is None - v = msgpack.unpackb(event.value(), raw=False, use_list=False) - assert v["type"] == "event" - return json.loads(v["payload"].decode("utf8")), v + event = msgpack.unpackb(message.value(), raw=False, use_list=False) + assert event["type"] == "event" + return json.loads(event["payload"].decode("utf8")), event def get_message(self): message = self.poll() diff --git a/tests/integration/test_session.py b/tests/integration/test_session.py new file mode 100644 index 00000000000..316f46cadae --- /dev/null +++ b/tests/integration/test_session.py @@ -0,0 +1,49 @@ +def test_session_with_processing(mini_sentry, relay_with_processing, sessions_consumer): + relay = relay_with_processing() + relay.wait_relay_healthcheck() + + sessions_consumer = sessions_consumer() + + project_config = mini_sentry.project_configs[42] = mini_sentry.full_project_config() + relay.send_session( + 42, + { + "id": "94ecab99-184a-45ee-ac18-6ed2c2c2e9f2", + "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", + "did": "b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf", + "seq": 42, + "timestamp": "2020-02-07T15:17:00Z", + "started": "2020-02-07T14:16:00Z", + "sample_rate": 2.0, + "duration": 1947.49, + "status": "exited", + "attrs": { + "os": "iOS", + "os_version": "13.3.1", + "device_family": "iPhone12,3", + "release": "sentry-test@1.0.0", + "environment": "production", + }, + }, + ) + + session = sessions_consumer.get_session() + assert session == { + "org_id": 1, + "project_id": 42, + "event_id": "94ecab99-184a-45ee-ac18-6ed2c2c2e9f2", + "session_id": "8333339f-5675-4f89-a9a0-1c935255ab58", + "distinct_id": "b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf", + "seq": 42, + "timestamp": "2020-02-07T15:17:00+00:00", + "started": "2020-02-07T14:16:00+00:00", + "sample_rate": 2.0, + "duration": 1947.49, + "status": 1, + "os": "iOS", + "os_version": "13.3.1", + "device_family": "iPhone12,3", + "release": "sentry-test@1.0.0", + "environment": "production", + "retention_days": None, + }