Skip to content

Commit 0cfe368

Browse files
authored
ref: Remove Compatibility mode for Session Aggregates (#913)
The compatibility mode was added in #815 because it was waiting on proper support in snuba. That downstream support was added recently in getsentry/snuba#1492, and compatibility mode was switched off in production via a configuration change. Also, the snuba change was included in the 21.1 CalVer release, so it is fine to remove compatibility mode and ship this feature in full fidelity in 21.2.
1 parent 2974b9c commit 0cfe368

File tree

5 files changed

+33
-228
lines changed

5 files changed

+33
-228
lines changed

CHANGELOG.md

+7-2
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
# Changelog
22

3+
## Unreleased
4+
5+
**Internal**:
6+
7+
- Compatibility mode for pre-aggregated sessions was removed. The feature is now enabled by default in full fidelity. ([#913](https://github.com/getsentry/relay/pull/913))
8+
39
## 21.1.0
410

511
**Features**:
612

713
- Support dynamic sampling for error events. ([#883](https://github.com/getsentry/relay/pull/883))
814

9-
1015
**Bug Fixes**:
1116

1217
- Make all fields but event-id optional to fix regressions in user feedback ingestion. ([#886](https://github.com/getsentry/relay/pull/886))
@@ -46,7 +51,7 @@
4651

4752
**Internal**:
4853

49-
- Add *experimental* support for picking up HTTP proxies from the regular environment variables. This feature needs to be enabled by setting `http: client: "reqwest"` in your `config.yml`. ([#839](https://github.com/getsentry/relay/pull/839))
54+
- Add _experimental_ support for picking up HTTP proxies from the regular environment variables. This feature needs to be enabled by setting `http: client: "reqwest"` in your `config.yml`. ([#839](https://github.com/getsentry/relay/pull/839))
5055
- Refactor transparent request forwarding for unknown endpoints. Requests are now entirely buffered in memory and occupy the same queues and actors as other requests. This should not cause issues but may change behavior under load. ([#839](https://github.com/getsentry/relay/pull/839))
5156
- Add reason codes to the `X-Sentry-Rate-Limits` header in store responses. This allows external Relays to emit outcomes with the proper reason codes. ([#850](https://github.com/getsentry/relay/pull/850))
5257
- Emit metrics for outcomes in external relays. ([#851](https://github.com/getsentry/relay/pull/851))

relay-config/src/config.rs

-13
Original file line numberDiff line numberDiff line change
@@ -691,18 +691,11 @@ fn default_max_rate_limit() -> Option<u32> {
691691
Some(300) // 5 minutes
692692
}
693693

694-
fn default_explode_session_aggregates() -> bool {
695-
true
696-
}
697-
698694
/// Controls Sentry-internal event processing.
699695
#[derive(Serialize, Deserialize, Debug)]
700696
pub struct Processing {
701697
/// True if the Relay should do processing. Defaults to `false`.
702698
pub enabled: bool,
703-
/// Indicates if session aggregates should be exploded into individual session updates.
704-
#[serde(default = "default_explode_session_aggregates")]
705-
pub explode_session_aggregates: bool,
706699
/// GeoIp DB file source.
707700
#[serde(default)]
708701
pub geoip_path: Option<PathBuf>,
@@ -739,7 +732,6 @@ impl Default for Processing {
739732
fn default() -> Self {
740733
Self {
741734
enabled: false,
742-
explode_session_aggregates: default_explode_session_aggregates(),
743735
geoip_path: None,
744736
max_secs_in_future: default_max_secs_in_future(),
745737
max_secs_in_past: default_max_secs_in_past(),
@@ -1385,11 +1377,6 @@ impl Config {
13851377
self.values.processing.enabled
13861378
}
13871379

1388-
/// Indicates if session aggregates should be exploded into individual session updates.
1389-
pub fn explode_session_aggregates(&self) -> bool {
1390-
self.values.processing.explode_session_aggregates
1391-
}
1392-
13931380
/// The path to the GeoIp database required for event processing.
13941381
pub fn geoip_path(&self) -> Option<&Path> {
13951382
self.values.processing.geoip_path.as_deref()

relay-general/src/protocol/session.rs

-144
Original file line numberDiff line numberDiff line change
@@ -166,55 +166,6 @@ impl SessionAggregates {
166166
pub fn serialize(&self) -> Result<Vec<u8>, serde_json::Error> {
167167
serde_json::to_vec(self)
168168
}
169-
170-
/// The total number of sessions in this aggregate.
171-
pub fn num_sessions(&self) -> u32 {
172-
self.aggregates
173-
.iter()
174-
.map(|i| i.exited + i.errored + i.abnormal + i.crashed)
175-
.sum()
176-
}
177-
178-
/// Creates individual session updates from the aggregates.
179-
pub fn into_updates_iter(self) -> impl Iterator<Item = SessionUpdate> {
180-
let attributes = self.attributes;
181-
let mut items = self.aggregates;
182-
let mut item_opt = items.pop();
183-
std::iter::from_fn(move || loop {
184-
let item = item_opt.as_mut()?;
185-
186-
let (status, errors) = if item.exited > 0 {
187-
item.exited -= 1;
188-
(SessionStatus::Exited, 0)
189-
} else if item.errored > 0 {
190-
item.errored -= 1;
191-
// when exploding, we create "legacy" session updates that have no `errored` state
192-
(SessionStatus::Exited, 1)
193-
} else if item.abnormal > 0 {
194-
item.abnormal -= 1;
195-
(SessionStatus::Abnormal, 1)
196-
} else if item.crashed > 0 {
197-
item.crashed -= 1;
198-
(SessionStatus::Crashed, 1)
199-
} else {
200-
item_opt = items.pop();
201-
continue;
202-
};
203-
let attributes = attributes.clone();
204-
return Some(SessionUpdate {
205-
session_id: Uuid::new_v4(),
206-
distinct_id: item.distinct_id.clone(),
207-
sequence: 0,
208-
init: true,
209-
timestamp: Utc::now(),
210-
started: item.started,
211-
duration: None,
212-
status,
213-
errors,
214-
attributes,
215-
});
216-
})
217-
}
218169
}
219170

220171
#[cfg(test)]
@@ -341,99 +292,4 @@ mod tests {
341292
let update = SessionUpdate::parse(json.as_bytes()).unwrap();
342293
assert_eq_dbg!(update.attributes.ip_address, Some(IpAddr::auto()));
343294
}
344-
345-
#[test]
346-
fn test_session_aggregates() {
347-
let json = r#"{
348-
"aggregates": [{
349-
"started": "2020-02-07T14:16:00Z",
350-
"exited": 2,
351-
"abnormal": 1
352-
},{
353-
"started": "2020-02-07T14:17:00Z",
354-
"did": "some-user",
355-
"errored": 1
356-
}],
357-
"attrs": {
358-
"release": "[email protected]",
359-
"environment": "production",
360-
"ip_address": "::1",
361-
"user_agent": "Firefox/72.0"
362-
}
363-
}"#;
364-
let aggregates = SessionAggregates::parse(json.as_bytes()).unwrap();
365-
let mut iter = aggregates.into_updates_iter();
366-
367-
let mut settings = insta::Settings::new();
368-
settings.add_redaction(".timestamp", "[TS]");
369-
settings.add_redaction(".sid", "[SID]");
370-
settings.bind(|| {
371-
insta::assert_yaml_snapshot!(iter.next().unwrap(), @r###"
372-
---
373-
sid: "[SID]"
374-
did: some-user
375-
seq: 0
376-
init: true
377-
timestamp: "[TS]"
378-
started: "2020-02-07T14:17:00Z"
379-
status: exited
380-
errors: 1
381-
attrs:
382-
383-
environment: production
384-
ip_address: "::1"
385-
user_agent: Firefox/72.0
386-
"###);
387-
insta::assert_yaml_snapshot!(iter.next().unwrap(), @r###"
388-
---
389-
sid: "[SID]"
390-
did: ~
391-
seq: 0
392-
init: true
393-
timestamp: "[TS]"
394-
started: "2020-02-07T14:16:00Z"
395-
status: exited
396-
errors: 0
397-
attrs:
398-
399-
environment: production
400-
ip_address: "::1"
401-
user_agent: Firefox/72.0
402-
"###);
403-
insta::assert_yaml_snapshot!(iter.next().unwrap(), @r###"
404-
---
405-
sid: "[SID]"
406-
did: ~
407-
seq: 0
408-
init: true
409-
timestamp: "[TS]"
410-
started: "2020-02-07T14:16:00Z"
411-
status: exited
412-
errors: 0
413-
attrs:
414-
415-
environment: production
416-
ip_address: "::1"
417-
user_agent: Firefox/72.0
418-
"###);
419-
insta::assert_yaml_snapshot!(iter.next().unwrap(), @r###"
420-
---
421-
sid: "[SID]"
422-
did: ~
423-
seq: 0
424-
init: true
425-
timestamp: "[TS]"
426-
started: "2020-02-07T14:16:00Z"
427-
status: abnormal
428-
errors: 1
429-
attrs:
430-
431-
environment: production
432-
ip_address: "::1"
433-
user_agent: Firefox/72.0
434-
"###);
435-
});
436-
437-
assert_eq!(iter.next(), None);
438-
}
439295
}

relay-server/src/actors/store.rs

+10-26
Original file line numberDiff line numberDiff line change
@@ -168,45 +168,29 @@ impl StoreForwarder {
168168
Ok(session) => session,
169169
Err(_) => return Ok(()),
170170
};
171+
171172
if session.status == SessionStatus::Errored {
172173
// Individual updates should never have the status `errored`
173174
session.status = SessionStatus::Exited;
174175
}
175-
self.produce_session_update(org_id, project_id, event_retention, client, session)?;
176+
self.produce_session_update(org_id, project_id, event_retention, client, session)
176177
}
177178
ItemType::Sessions => {
178179
let aggregates = match SessionAggregates::parse(&item.payload()) {
179180
Ok(aggregates) => aggregates,
180181
Err(_) => return Ok(()),
181182
};
182183

183-
if self.config.explode_session_aggregates() {
184-
if aggregates.num_sessions() as usize > MAX_EXPLODED_SESSIONS {
185-
relay_log::warn!("exploded session items from aggregate exceed threshold");
186-
}
187-
188-
for session in aggregates.into_updates_iter().take(MAX_EXPLODED_SESSIONS) {
189-
self.produce_session_update(
190-
org_id,
191-
project_id,
192-
event_retention,
193-
client,
194-
session,
195-
)?;
196-
}
197-
} else {
198-
self.produce_sessions_from_aggregate(
199-
org_id,
200-
project_id,
201-
event_retention,
202-
client,
203-
aggregates,
204-
)?
205-
}
184+
self.produce_sessions_from_aggregate(
185+
org_id,
186+
project_id,
187+
event_retention,
188+
client,
189+
aggregates,
190+
)
206191
}
207-
_ => {}
192+
_ => Ok(()),
208193
}
209-
Ok(())
210194
}
211195

212196
fn produce_sessions_from_aggregate(

tests/integration/test_session.py

+16-43
Original file line numberDiff line numberDiff line change
@@ -164,11 +164,12 @@ def test_session_with_processing_two_events(
164164

165165

166166
def test_session_aggregates(mini_sentry, relay_with_processing, sessions_consumer):
167-
relay = relay_with_processing({"processing": {"explode_session_aggregates": False}})
167+
relay = relay_with_processing()
168168
sessions_consumer = sessions_consumer()
169169

170170
timestamp = datetime.now(tz=timezone.utc)
171-
started = timestamp - timedelta(hours=1)
171+
started1 = timestamp - timedelta(hours=1)
172+
started2 = started1 - timedelta(hours=1)
172173

173174
project_id = 42
174175
mini_sentry.add_full_project_config(project_id)
@@ -177,11 +178,12 @@ def test_session_aggregates(mini_sentry, relay_with_processing, sessions_consume
177178
{
178179
"aggregates": [
179180
{
180-
"started": started.isoformat(),
181+
"started": started1.isoformat(),
181182
"did": "foobarbaz",
182183
"exited": 2,
183184
"errored": 3,
184-
}
185+
},
186+
{"started": started2.isoformat(), "abnormal": 1,},
185187
],
186188
"attrs": {"release": "[email protected]", "environment": "production",},
187189
},
@@ -196,7 +198,7 @@ def test_session_aggregates(mini_sentry, relay_with_processing, sessions_consume
196198
"distinct_id": "367e2499-2b45-586d-814f-778b60144e87",
197199
"quantity": 2,
198200
"seq": 0,
199-
"started": started.timestamp(),
201+
"started": started1.timestamp(),
200202
"duration": None,
201203
"status": "exited",
202204
"errors": 0,
@@ -215,7 +217,7 @@ def test_session_aggregates(mini_sentry, relay_with_processing, sessions_consume
215217
"distinct_id": "367e2499-2b45-586d-814f-778b60144e87",
216218
"quantity": 3,
217219
"seq": 0,
218-
"started": started.timestamp(),
220+
"started": started1.timestamp(),
219221
"duration": None,
220222
"status": "errored",
221223
"errors": 1,
@@ -225,54 +227,25 @@ def test_session_aggregates(mini_sentry, relay_with_processing, sessions_consume
225227
"sdk": "raven-node/2.6.3",
226228
}
227229

228-
229-
def test_session_aggregates_explode(
230-
mini_sentry, relay_with_processing, sessions_consumer
231-
):
232-
relay = relay_with_processing({"processing": {"explode_session_aggregates": True}})
233-
sessions_consumer = sessions_consumer()
234-
235-
timestamp = datetime.now(tz=timezone.utc)
236-
started = timestamp - timedelta(hours=1)
237-
238-
project_id = 42
239-
mini_sentry.add_full_project_config(project_id)
240-
relay.send_session_aggregates(
241-
project_id,
242-
{
243-
"aggregates": [
244-
{"started": started.isoformat(), "did": "foobarbaz", "exited": 2,}
245-
],
246-
"attrs": {"release": "[email protected]", "environment": "production",},
247-
},
248-
)
249-
250-
expected = {
230+
session = sessions_consumer.get_session()
231+
del session["received"]
232+
assert session == {
251233
"org_id": 1,
252234
"project_id": project_id,
253-
"distinct_id": "367e2499-2b45-586d-814f-778b60144e87",
235+
"session_id": "00000000-0000-0000-0000-000000000000",
236+
"distinct_id": "00000000-0000-0000-0000-000000000000",
254237
"quantity": 1,
255238
"seq": 0,
256-
"started": started.timestamp(),
239+
"started": started2.timestamp(),
257240
"duration": None,
258-
"status": "exited",
259-
"errors": 0,
241+
"status": "abnormal",
242+
"errors": 1,
260243
"release": "[email protected]",
261244
"environment": "production",
262245
"retention_days": 90,
263246
"sdk": "raven-node/2.6.3",
264247
}
265248

266-
session = sessions_consumer.get_session()
267-
del session["session_id"]
268-
del session["received"]
269-
assert session == expected
270-
271-
session = sessions_consumer.get_session()
272-
del session["session_id"]
273-
del session["received"]
274-
assert session == expected
275-
276249

277250
def test_session_with_custom_retention(
278251
mini_sentry, relay_with_processing, sessions_consumer

0 commit comments

Comments
 (0)