Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sessions): Initial support for session data in envelopes #449

Merged
merged 17 commits into from
Feb 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions py/sentry_relay/envelope.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class Envelope(object):
def __init__(self, headers=None, items=None):
if headers is not None:
headers = dict(headers)
self.headers = headers
self.headers = headers or {}
if items is None:
items = []
else:
Expand All @@ -35,7 +35,7 @@ def __iter__(self):
return iter(self.items)

def serialize_into(self, f):
f.write(json.dumps(self.headers))
f.write(json.dumps(self.headers).encode("utf-8"))
f.write(b"\n")
for item in self.items:
item.serialize_into(f)
Expand Down Expand Up @@ -76,7 +76,7 @@ def get_bytes(self):
with open(self.path, "rb") as f:
self.bytes = f.read()
elif self.event is not None:
self.bytes = json.dumps(self.event)
self.bytes = json.dumps(self.event).encode("utf-8")
else:
self.bytes = b""
return self.bytes
Expand Down Expand Up @@ -155,7 +155,7 @@ def serialize_into(self, f):
headers = dict(self.headers)
length, writer = self.payload.prepare_serialize()
headers["length"] = length
f.write(json.dumps(headers))
f.write(json.dumps(headers).encode("utf-8"))
f.write(b"\n")
writer(f)
f.write(b"\n")
Expand Down
28 changes: 21 additions & 7 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,12 +398,15 @@ pub enum KafkaTopic {
Attachments,
/// Transaction events topic.
Transactions,
/// All outcomes are sent through this channel.
/// Shared outcomes topic for Relay and Sentry.
Outcomes,
/// Session health updates.
Sessions,
}

/// Configuration for topics.
#[derive(Serialize, Deserialize, Debug)]
#[serde(default)]
pub struct TopicNames {
/// Simple events topic name.
pub events: String,
Expand All @@ -413,6 +416,20 @@ pub struct TopicNames {
pub transactions: String,
/// Event outcomes topic name.
pub outcomes: String,
/// Session health topic name.
pub sessions: String,
}

impl Default for TopicNames {
fn default() -> Self {
Self {
events: "ingest-events".to_owned(),
attachments: "ingest-attachments".to_owned(),
transactions: "ingest-transactions".to_owned(),
outcomes: "outcomes".to_owned(),
sessions: "ingest-sessions".to_owned(),
}
}
}

/// A name value pair of Kafka config parameter.
Expand Down Expand Up @@ -461,6 +478,7 @@ pub struct Processing {
/// Kafka producer configurations.
pub kafka_config: Vec<KafkaConfigParam>,
/// Kafka topic names.
#[serde(default)]
pub topics: TopicNames,
/// Redis hosts to connect to for storing state for rate limits.
pub redis: Option<Redis>,
Expand All @@ -484,12 +502,7 @@ impl Default for Processing {
max_secs_in_future: 0,
max_secs_in_past: 0,
kafka_config: Vec::new(),
topics: TopicNames {
events: String::new(),
attachments: String::new(),
transactions: String::new(),
outcomes: String::new(),
},
topics: TopicNames::default(),
redis: Some(Redis::default()),
attachment_chunk_size: default_chunk_size(),
projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
Expand Down Expand Up @@ -977,6 +990,7 @@ impl Config {
KafkaTopic::Events => topics.events.as_str(),
KafkaTopic::Transactions => topics.transactions.as_str(),
KafkaTopic::Outcomes => topics.outcomes.as_str(),
KafkaTopic::Sessions => topics.sessions.as_str(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions relay-general/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod mechanism;
mod metrics;
mod request;
mod security_report;
mod session;
mod span;
mod stacktrace;
mod tags;
Expand Down Expand Up @@ -42,6 +43,7 @@ pub use self::mechanism::{CError, MachException, Mechanism, MechanismMeta, Posix
pub use self::metrics::Metrics;
pub use self::request::{Cookies, HeaderName, HeaderValue, Headers, Query, Request};
pub use self::security_report::{Csp, ExpectCt, ExpectStaple, Hpkp, SecurityReportType};
pub use self::session::{ParseSessionStatusError, SessionAttributes, SessionStatus, SessionUpdate};
pub use self::span::Span;
pub use self::stacktrace::{Frame, FrameData, FrameVars, RawStacktrace, Stacktrace};
pub use self::tags::{TagEntry, Tags};
Expand Down
255 changes: 255 additions & 0 deletions relay-general/src/protocol/session.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
use std::fmt;
use std::str::FromStr;
use std::time::SystemTime;

use chrono::{DateTime, Utc};
use failure::Fail;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

/// The type of session event we're dealing with.
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum SessionStatus {
/// The session is healthy.
///
/// This does not necessarily indicate that the session is still active.
Ok,
/// The session terminated normally.
Exited,
/// The session resulted in an application crash.
Crashed,
/// The session an unexpected abrupt termination (not crashing).
Abnormal,
}

impl Default for SessionStatus {
fn default() -> Self {
Self::Ok
}
}

/// An error used when parsing `SessionStatus`.
#[derive(Debug, Fail)]
#[fail(display = "invalid session status")]
pub struct ParseSessionStatusError;

impl FromStr for SessionStatus {
type Err = ParseSessionStatusError;

fn from_str(string: &str) -> Result<Self, Self::Err> {
Ok(match string {
"ok" => SessionStatus::Ok,
"crashed" => SessionStatus::Crashed,
"abnormal" => SessionStatus::Abnormal,
"exited" => SessionStatus::Exited,
_ => return Err(ParseSessionStatusError),
})
}
}

impl fmt::Display for SessionStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
SessionStatus::Ok => write!(f, "ok"),
SessionStatus::Crashed => write!(f, "crashed"),
SessionStatus::Abnormal => write!(f, "abnormal"),
SessionStatus::Exited => write!(f, "exited"),
}
}
}

fn is_empty_string(opt: &Option<String>) -> bool {
opt.as_ref().map_or(true, |s| s.is_empty())
}

/// Additional attributes for Sessions.
#[serde(default)]
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
pub struct SessionAttributes {
/// The operating system name, corresponding to the os context.
pub os: Option<String>,
/// The full operating system version string, corresponding to the os context.
pub os_version: Option<String>,
/// The device famility identifier, corresponding to the device context.
pub device_family: Option<String>,
/// The release version string.
pub release: Option<String>,
/// The environment identifier.
pub environment: Option<String>,
}

impl SessionAttributes {
fn is_empty(&self) -> bool {
is_empty_string(&self.os)
&& is_empty_string(&self.os_version)
&& is_empty_string(&self.device_family)
&& is_empty_string(&self.release)
&& is_empty_string(&self.environment)
}
}

fn default_sequence() -> u64 {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}

fn default_sample_rate() -> f32 {
1.0
}

#[allow(clippy::trivially_copy_pass_by_ref)]
fn is_default_sample_rate(rate: &f32) -> bool {
(*rate - default_sample_rate()) < std::f32::EPSILON
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct SessionUpdate {
/// Unique identifier of this update event.
#[serde(rename = "id", default = "Uuid::new_v4")]
pub update_id: Uuid,
/// The session identifier.
#[serde(rename = "sid")]
pub session_id: Uuid,
/// The distinct identifier.
#[serde(rename = "did", default)]
pub distinct_id: Uuid,
/// An optional logical clock.
#[serde(rename = "seq", default = "default_sequence")]
pub sequence: u64,
/// The timestamp of when the session change event was created.
pub timestamp: DateTime<Utc>,
/// The timestamp of when the session itself started.
pub started: DateTime<Utc>,
/// The sample rate.
#[serde(
default = "default_sample_rate",
skip_serializing_if = "is_default_sample_rate"
)]
pub sample_rate: f32,
/// An optional duration of the session so far.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub duration: Option<f64>,
/// The status of the session.
#[serde(default)]
pub status: SessionStatus,
/// The session event attributes.
#[serde(
rename = "attrs",
default,
skip_serializing_if = "SessionAttributes::is_empty"
)]
pub attributes: SessionAttributes,
}

impl SessionUpdate {
/// Parses a session update from JSON.
pub fn parse(payload: &[u8]) -> Result<Self, serde_json::Error> {
let mut session = serde_json::from_slice::<Self>(payload)?;

if session.distinct_id.is_nil() {
session.distinct_id = session.session_id;
}

Ok(session)
}

/// Serializes a session update back into JSON.
pub fn serialize(&self) -> Result<Vec<u8>, serde_json::Error> {
serde_json::to_vec(self)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_session_default_values() {
let json = r#"{
"sid": "8333339f-5675-4f89-a9a0-1c935255ab58",
"timestamp": "2020-02-07T15:17:00Z",
"started": "2020-02-07T14:16:00Z"
}"#;

let output = r#"{
"id": "94ecab99-184a-45ee-ac18-6ed2c2c2e9f2",
"sid": "8333339f-5675-4f89-a9a0-1c935255ab58",
"did": "8333339f-5675-4f89-a9a0-1c935255ab58",
"seq": 4711,
"timestamp": "2020-02-07T15:17:00Z",
"started": "2020-02-07T14:16:00Z",
"status": "ok"
}"#;

let update = SessionUpdate {
update_id: "94ecab99-184a-45ee-ac18-6ed2c2c2e9f2".parse().unwrap(),
session_id: "8333339f-5675-4f89-a9a0-1c935255ab58".parse().unwrap(),
distinct_id: "8333339f-5675-4f89-a9a0-1c935255ab58".parse().unwrap(),
sequence: 4711, // this would be a timestamp instead
timestamp: "2020-02-07T15:17:00Z".parse().unwrap(),
started: "2020-02-07T14:16:00Z".parse().unwrap(),
sample_rate: 1.0,
duration: None,
status: SessionStatus::Ok,
attributes: SessionAttributes::default(),
};

// Since the update_id is defaulted randomly, ensure that it matches.
let mut parsed = SessionUpdate::parse(json.as_bytes()).unwrap();
parsed.update_id = update.update_id;

// Sequence is defaulted to the current timestamp. Override for snapshot.
assert_eq!(parsed.sequence, default_sequence());
parsed.sequence = 4711;

assert_eq_dbg!(update, parsed);
assert_eq_str!(output, serde_json::to_string_pretty(&update).unwrap());
}

#[test]
fn test_session_roundtrip() {
let json = r#"{
"id": "94ecab99-184a-45ee-ac18-6ed2c2c2e9f2",
"sid": "8333339f-5675-4f89-a9a0-1c935255ab58",
"did": "b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf",
"seq": 42,
"timestamp": "2020-02-07T15:17:00Z",
"started": "2020-02-07T14:16:00Z",
"sample_rate": 2.0,
"duration": 1947.49,
"status": "exited",
"attrs": {
"os": "iOS",
"os_version": "13.3.1",
"device_family": "iPhone12,3",
"release": "[email protected]",
"environment": "production"
}
}"#;

let update = SessionUpdate {
update_id: "94ecab99-184a-45ee-ac18-6ed2c2c2e9f2".parse().unwrap(),
session_id: "8333339f-5675-4f89-a9a0-1c935255ab58".parse().unwrap(),
distinct_id: "b3ef3211-58a4-4b36-a9a1-5a55df0d9aaf".parse().unwrap(),
sequence: 42,
timestamp: "2020-02-07T15:17:00Z".parse().unwrap(),
started: "2020-02-07T14:16:00Z".parse().unwrap(),
sample_rate: 2.0,
duration: Some(1947.49),
status: SessionStatus::Exited,
attributes: SessionAttributes {
os: Some("iOS".to_owned()),
os_version: Some("13.3.1".to_owned()),
device_family: Some("iPhone12,3".to_owned()),
release: Some("[email protected]".to_owned()),
environment: Some("production".to_owned()),
},
};

assert_eq_dbg!(update, SessionUpdate::parse(json.as_bytes()).unwrap());
assert_eq_str!(json, serde_json::to_string_pretty(&update).unwrap());
}
}
1 change: 0 additions & 1 deletion relay-general/src/store/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ mod tests {
#[test]
fn test_mechanism_missing_attributes() {
use crate::protocol::{CError, MachException, Mechanism, MechanismMeta, PosixSignal};
use crate::types::ErrorKind;

let mut mechanism = Annotated::new(Mechanism {
ty: Annotated::new("mytype".to_string()),
Expand Down
Loading