Skip to content

Commit 2cb02a5

Browse files
committed
feat: Add Session Aggregates as new Item
The new sessions item has pre-aggregated counts for different session outcomes. It is configurable if the aggregation should be exploded into individual session updates, or rather sent as aggregates to the kafka topic.
1 parent cc63c7c commit 2cb02a5

File tree

11 files changed

+495
-18
lines changed

11 files changed

+495
-18
lines changed

CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changelog
22

3+
## Unreleased
4+
5+
**Features**:
6+
7+
- Relay is now able to ingest pre-aggregated sessions, which will make it possible to efficiently handle applications that produce thousands of sessions per second. ([#815](https://github.com/getsentry/relay/pull/815))
8+
39
## 20.11.1
410

511
- No documented changes.

relay-config/src/config.rs

+13
Original file line numberDiff line numberDiff line change
@@ -734,11 +734,18 @@ fn default_max_rate_limit() -> Option<u32> {
734734
Some(300) // 5 minutes
735735
}
736736

737+
fn default_explode_session_aggregates() -> bool {
738+
true
739+
}
740+
737741
/// Controls Sentry-internal event processing.
738742
#[derive(Serialize, Deserialize, Debug)]
739743
pub struct Processing {
740744
/// True if the Relay should do processing. Defaults to `false`.
741745
pub enabled: bool,
746+
/// Indicates if session aggregates should be exploded into individual session updates.
747+
#[serde(default = "default_explode_session_aggregates")]
748+
pub explode_session_aggregates: bool,
742749
/// GeoIp DB file source.
743750
#[serde(default)]
744751
pub geoip_path: Option<PathBuf>,
@@ -775,6 +782,7 @@ impl Default for Processing {
775782
fn default() -> Self {
776783
Self {
777784
enabled: false,
785+
explode_session_aggregates: default_explode_session_aggregates(),
778786
geoip_path: None,
779787
max_secs_in_future: 0,
780788
max_secs_in_past: 0,
@@ -1434,6 +1442,11 @@ impl Config {
14341442
self.values.processing.enabled
14351443
}
14361444

1445+
/// Indicates if session aggregates should be exploded into individual session updates.
1446+
pub fn explode_session_aggregates(&self) -> bool {
1447+
self.values.processing.explode_session_aggregates
1448+
}
1449+
14371450
/// The path to the GeoIp database required for event processing.
14381451
pub fn geoip_path(&self) -> Option<&Path> {
14391452
self.values.processing.geoip_path.as_deref()

relay-general/src/protocol/mod.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ pub use self::request::{Cookies, HeaderName, HeaderValue, Headers, Query, Reques
5050
#[cfg(feature = "jsonschema")]
5151
pub use self::schema::event_json_schema;
5252
pub use self::security_report::{Csp, ExpectCt, ExpectStaple, Hpkp, SecurityReportType};
53-
pub use self::session::{ParseSessionStatusError, SessionAttributes, SessionStatus, SessionUpdate};
53+
pub use self::session::{
54+
ParseSessionStatusError, SessionAggregates, SessionAttributes, SessionStatus, SessionUpdate,
55+
};
5456
pub use self::span::Span;
5557
pub use self::stacktrace::{Frame, FrameData, FrameVars, RawStacktrace, Stacktrace};
5658
pub use self::tags::{TagEntry, Tags};

relay-general/src/protocol/session.rs

+199
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ pub enum SessionStatus {
2121
Crashed,
2222
/// The session had an unexpected abrupt termination (not crashing).
2323
Abnormal,
24+
/// The session exited cleanly but experienced some errors during its run.
25+
Errored,
2426
}
2527

2628
impl Default for SessionStatus {
@@ -39,6 +41,7 @@ derive_fromstr_and_display!(SessionStatus, ParseSessionStatusError, {
3941
SessionStatus::Crashed => "crashed",
4042
SessionStatus::Abnormal => "abnormal",
4143
SessionStatus::Exited => "exited",
44+
SessionStatus::Errored => "errored",
4245
});
4346

4447
/// Additional attributes for Sessions.
@@ -117,6 +120,103 @@ impl SessionUpdate {
117120
}
118121
}
119122

123+
#[allow(clippy::trivially_copy_pass_by_ref)]
124+
fn is_zero(val: &u32) -> bool {
125+
*val == 0
126+
}
127+
128+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
129+
pub struct SessionAggregateItem {
130+
/// The timestamp of when the session itself started.
131+
pub started: DateTime<Utc>,
132+
/// The distinct identifier.
133+
#[serde(rename = "did", default, skip_serializing_if = "Option::is_none")]
134+
pub distinct_id: Option<String>,
135+
/// The number of exited sessions that ocurred.
136+
#[serde(default, skip_serializing_if = "is_zero")]
137+
pub exited: u32,
138+
/// The number of errored sessions that ocurred, not including the abnormal and crashed ones.
139+
#[serde(default, skip_serializing_if = "is_zero")]
140+
pub errored: u32,
141+
/// The number of abnormal sessions that ocurred.
142+
#[serde(default, skip_serializing_if = "is_zero")]
143+
pub abnormal: u32,
144+
/// The number of crashed sessions that ocurred.
145+
#[serde(default, skip_serializing_if = "is_zero")]
146+
pub crashed: u32,
147+
}
148+
149+
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
150+
pub struct SessionAggregates {
151+
/// A batch of sessions that were started.
152+
#[serde(default)]
153+
pub aggregates: Vec<SessionAggregateItem>,
154+
/// The shared session event attributes.
155+
#[serde(rename = "attrs")]
156+
pub attributes: SessionAttributes,
157+
}
158+
159+
impl SessionAggregates {
160+
/// Parses a session batch from JSON.
161+
pub fn parse(payload: &[u8]) -> Result<Self, serde_json::Error> {
162+
serde_json::from_slice(payload)
163+
}
164+
165+
/// Serializes a session batch back into JSON.
166+
pub fn serialize(&self) -> Result<Vec<u8>, serde_json::Error> {
167+
serde_json::to_vec(self)
168+
}
169+
170+
/// The total number of sessions in this aggregate.
171+
pub fn num_sessions(&self) -> u32 {
172+
self.aggregates
173+
.iter()
174+
.map(|i| i.exited + i.errored + i.abnormal + i.crashed)
175+
.sum()
176+
}
177+
178+
/// Creates individual session updates from the aggregates.
179+
pub fn into_updates_iter(self) -> impl Iterator<Item = SessionUpdate> {
180+
let attributes = self.attributes;
181+
let mut items = self.aggregates;
182+
let mut item_opt = items.pop();
183+
std::iter::from_fn(move || loop {
184+
let item = item_opt.as_mut()?;
185+
186+
let (status, errors) = if item.exited > 0 {
187+
item.exited -= 1;
188+
(SessionStatus::Exited, 0)
189+
} else if item.errored > 0 {
190+
item.errored -= 1;
191+
// when exploding, we create "legacy" session updates that have no `errored` state
192+
(SessionStatus::Exited, 1)
193+
} else if item.abnormal > 0 {
194+
item.abnormal -= 1;
195+
(SessionStatus::Abnormal, 1)
196+
} else if item.crashed > 0 {
197+
item.crashed -= 1;
198+
(SessionStatus::Crashed, 1)
199+
} else {
200+
item_opt = items.pop();
201+
continue;
202+
};
203+
let attributes = attributes.clone();
204+
return Some(SessionUpdate {
205+
session_id: Uuid::new_v4(),
206+
distinct_id: item.distinct_id.clone(),
207+
sequence: 0,
208+
init: true,
209+
timestamp: Utc::now(),
210+
started: item.started,
211+
duration: None,
212+
status,
213+
errors,
214+
attributes,
215+
});
216+
})
217+
}
218+
}
219+
120220
#[cfg(test)]
121221
mod tests {
122222
use super::*;
@@ -160,6 +260,8 @@ mod tests {
160260
environment: None,
161261
ip_address: None,
162262
user_agent: None,
263+
os: None,
264+
runtime: None,
163265
},
164266
};
165267

@@ -221,6 +323,8 @@ mod tests {
221323
environment: Some("production".to_owned()),
222324
ip_address: Some(IpAddr::parse("::1").unwrap()),
223325
user_agent: Some("Firefox/72.0".to_owned()),
326+
os: None,
327+
runtime: None,
224328
},
225329
};
226330

@@ -241,4 +345,99 @@ mod tests {
241345
let update = SessionUpdate::parse(json.as_bytes()).unwrap();
242346
assert_eq_dbg!(update.attributes.ip_address, Some(IpAddr::auto()));
243347
}
348+
349+
#[test]
350+
fn test_session_aggregates() {
351+
let json = r#"{
352+
"aggregates": [{
353+
"started": "2020-02-07T14:16:00Z",
354+
"exited": 2,
355+
"abnormal": 1
356+
},{
357+
"started": "2020-02-07T14:17:00Z",
358+
"did": "some-user",
359+
"errored": 1
360+
}],
361+
"attrs": {
362+
"release": "[email protected]",
363+
"environment": "production",
364+
"ip_address": "::1",
365+
"user_agent": "Firefox/72.0"
366+
}
367+
}"#;
368+
let aggregates = SessionAggregates::parse(json.as_bytes()).unwrap();
369+
let mut iter = aggregates.into_updates_iter();
370+
371+
let mut settings = insta::Settings::new();
372+
settings.add_redaction(".timestamp", "[TS]");
373+
settings.add_redaction(".sid", "[SID]");
374+
settings.bind(|| {
375+
insta::assert_yaml_snapshot!(iter.next().unwrap(), @r###"
376+
---
377+
sid: "[SID]"
378+
did: some-user
379+
seq: 0
380+
init: true
381+
timestamp: "[TS]"
382+
started: "2020-02-07T14:17:00Z"
383+
status: exited
384+
errors: 1
385+
attrs:
386+
387+
environment: production
388+
ip_address: "::1"
389+
user_agent: Firefox/72.0
390+
"###);
391+
insta::assert_yaml_snapshot!(iter.next().unwrap(), @r###"
392+
---
393+
sid: "[SID]"
394+
did: ~
395+
seq: 0
396+
init: true
397+
timestamp: "[TS]"
398+
started: "2020-02-07T14:16:00Z"
399+
status: exited
400+
errors: 0
401+
attrs:
402+
403+
environment: production
404+
ip_address: "::1"
405+
user_agent: Firefox/72.0
406+
"###);
407+
insta::assert_yaml_snapshot!(iter.next().unwrap(), @r###"
408+
---
409+
sid: "[SID]"
410+
did: ~
411+
seq: 0
412+
init: true
413+
timestamp: "[TS]"
414+
started: "2020-02-07T14:16:00Z"
415+
status: exited
416+
errors: 0
417+
attrs:
418+
419+
environment: production
420+
ip_address: "::1"
421+
user_agent: Firefox/72.0
422+
"###);
423+
insta::assert_yaml_snapshot!(iter.next().unwrap(), @r###"
424+
---
425+
sid: "[SID]"
426+
did: ~
427+
seq: 0
428+
init: true
429+
timestamp: "[TS]"
430+
started: "2020-02-07T14:16:00Z"
431+
status: abnormal
432+
errors: 1
433+
attrs:
434+
435+
environment: production
436+
ip_address: "::1"
437+
user_agent: Firefox/72.0
438+
"###);
439+
});
440+
441+
assert_eq!(iter.next(), None);
442+
}
244443
}

relay-server/src/actors/events.rs

+1
Original file line numberDiff line numberDiff line change
@@ -681,6 +681,7 @@ impl EventProcessor {
681681

682682
// session data is never considered as part of deduplication
683683
ItemType::Session => false,
684+
ItemType::Sessions => false,
684685
}
685686
}
686687

0 commit comments

Comments
 (0)