Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(replays): add replay events ItemType #1239

Merged
merged 36 commits into from
Jun 17, 2022
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
b910e80
feat(replays): add ItemType for recordings
JoshFerge Apr 22, 2022
fbb22ef
Merge branch 'master' into jferg/replay-recordings
JoshFerge Apr 27, 2022
d81ad3a
name change recording -> payload
JoshFerge Apr 27, 2022
e9972c5
feat(replays): add replay events itemtype
JoshFerge Apr 22, 2022
39cfd11
pr feedback
JoshFerge Apr 27, 2022
3d0d2ea
fix merge
JoshFerge Apr 27, 2022
7915d55
Update CHANGELOG.md
JoshFerge Apr 27, 2022
b58a4e3
drop events if ff not enabled
JoshFerge May 2, 2022
e9b9590
Merge branch 'master' into jferg/replay-recordings
JoshFerge May 2, 2022
1c78bf9
Merge branch 'jferg/replay-recordings' into jferg/replay-events-itemtype
JoshFerge May 2, 2022
4152676
add payload to ff condition
JoshFerge May 2, 2022
45d0531
fix itemtype reference
JoshFerge May 2, 2022
874051c
fix itemtype reference
JoshFerge May 3, 2022
dc27892
add replay items to from_str method
JoshFerge May 3, 2022
8167775
Merge branch 'master' into jferg/replay-recordings
JoshFerge May 4, 2022
6c78fb1
Merge branch 'jferg/replay-recordings' into jferg/replay-events-itemtype
JoshFerge May 4, 2022
6ff5c7b
Merge branch 'master' into jferg/replay-recordings
JoshFerge May 10, 2022
ed70621
fix merge conflicts
JoshFerge May 10, 2022
55248e8
remove ReplayEvent EventType
JoshFerge May 12, 2022
db13fef
update schema snapshot
JoshFerge May 25, 2022
39ee7e7
Merge branch 'master' into jferg/replay-recordings
JoshFerge Jun 1, 2022
0611228
rename payloads -> recordings
JoshFerge Jun 1, 2022
e62d36a
checkpoint: get envelope replay test working in principle
JoshFerge Jun 2, 2022
35a0755
small fixups
JoshFerge Jun 2, 2022
404371f
add second test for without processing
JoshFerge Jun 2, 2022
88b7cb7
separate out recording chunks
JoshFerge Jun 3, 2022
3e3882c
set defaults for recording attachment chunks
JoshFerge Jun 3, 2022
6c79661
merge recordings PR updates
JoshFerge Jun 6, 2022
e864666
event_id -> replay_id
JoshFerge Jun 7, 2022
a54ef48
Merge branch 'master' into jferg/replay-recordings
jan-auer Jun 9, 2022
6240aac
used ChunkedReplayRecording type
JoshFerge Jun 9, 2022
a8fd742
specify to use min latest relay version
JoshFerge Jun 9, 2022
ca921a0
merge upstream, pr feedback
JoshFerge Jun 9, 2022
6ebbd73
black
JoshFerge Jun 9, 2022
92b522d
remove errant test file
JoshFerge Jun 9, 2022
2f1f9db
fix merge conflicts
JoshFerge Jun 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
- Refactor aggregation error, recover from errors more gracefully. ([#1240](https://github.com/getsentry/relay/pull/1240))
- Remove/reject nul-bytes from metric strings. ([#1235](https://github.com/getsentry/relay/pull/1235))
- Remove the unused "internal" data category. ([#1245](https://github.com/getsentry/relay/pull/1245))
- Add ReplayEvent ItemType. ([#1239](https://github.com/getsentry/relay/pull/1239))
- Add ReplayRecording ItemType. ([#1236](https://github.com/getsentry/relay/pull/1236))
- Add the client and version as `sdk` tag to extracted session metrics in the format `name/version`. ([#1248](https://github.com/getsentry/relay/pull/1248))
- Expose `shutdown_timeout` in `OverridableConfig` ([#1247](https://github.com/getsentry/relay/pull/1247))
- Normalize all profiles and reject invalid ones. ([#1250](https://github.com/getsentry/relay/pull/1250))
Expand Down
4 changes: 4 additions & 0 deletions relay-common/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ pub enum DataCategory {
Session = 5,
/// A profile
Profile = 6,
/// Replay Events
ReplayEvent = 7,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: When moving this live, please ask someone in @getsentry/owners-ingest to issue a new library release of relay so we can add this new data category to Sentry and Snuba.

This will require to regenerate the CABI headers once the name of the enum variant and value are final (see this PR):

  1. Run cargo install cbindgen on your machine
  2. Go to the relay-cabi subfolder
  3. Run make header
  4. Commit the result into this PR

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do 👍🏼

/// Any other data category not known by this Relay.
#[serde(other)]
Unknown = -1,
Expand All @@ -124,6 +126,7 @@ impl DataCategory {
"security" => Self::Security,
"attachment" => Self::Attachment,
"session" => Self::Session,
"replay_event" => Self::ReplayEvent,
"profile" => Self::Profile,
_ => Self::Unknown,
}
Expand All @@ -138,6 +141,7 @@ impl DataCategory {
Self::Security => "security",
Self::Attachment => "attachment",
Self::Session => "session",
Self::ReplayEvent => "replay_event",
Self::Profile => "profile",
Self::Unknown => "unknown",
}
Expand Down
12 changes: 12 additions & 0 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,10 @@ pub enum KafkaTopic {
Metrics,
/// Profiles
Profiles,
/// ReplayEvents, breadcrumb + session updates for replays
ReplayEvents,
/// ReplayRecording, large blobs sent by the replay sdk
ReplayRecordings,
}

/// Configuration for topics.
Expand All @@ -801,6 +805,10 @@ pub struct TopicAssignments {
pub metrics: TopicAssignment,
/// Stacktrace topic name
pub profiles: TopicAssignment,
/// Replay Events topic name.
pub replay_events: TopicAssignment,
/// Recordings topic name.
pub replay_recordings: TopicAssignment,
}

impl TopicAssignments {
Expand All @@ -815,6 +823,8 @@ impl TopicAssignments {
KafkaTopic::Sessions => &self.sessions,
KafkaTopic::Metrics => &self.metrics,
KafkaTopic::Profiles => &self.profiles,
KafkaTopic::ReplayEvents => &self.replay_events,
KafkaTopic::ReplayRecordings => &self.replay_recordings,
}
}
}
Expand All @@ -830,6 +840,8 @@ impl Default for TopicAssignments {
sessions: "ingest-sessions".to_owned().into(),
metrics: "ingest-metrics".to_owned().into(),
profiles: "profiles".to_owned().into(),
replay_events: "ingest-replay-events".to_owned().into(),
replay_recordings: "ingest-replay-recordings".to_owned().into(),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions relay-quotas/src/quota.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ impl CategoryUnit {
DataCategory::Default
| DataCategory::Error
| DataCategory::Transaction
| DataCategory::ReplayEvent
| DataCategory::Security
| DataCategory::Profile => Some(Self::Count),
DataCategory::Attachment => Some(Self::Bytes),
Expand Down
20 changes: 19 additions & 1 deletion relay-server/src/actors/envelopes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,22 @@ impl EnvelopeProcessor {
});
}

/// Remove replays if the feature flag is not enabled
fn process_replays(&self, state: &mut ProcessEnvelopeState) {
let replays_enabled = state.project_state.has_feature(Feature::Replays);
state.envelope.retain_items(|item| {
match item.ty() {
ItemType::ReplayEvent | ItemType::ReplayRecording => {
if !replays_enabled {
return false;
}
true
}
_ => true, // Keep all other item types
}
});
}

/// Creates and initializes the processing state.
///
/// This applies defaults to the envelope and initializes empty rate limits.
Expand Down Expand Up @@ -1320,7 +1336,8 @@ impl EnvelopeProcessor {
ItemType::MetricBuckets => false,
ItemType::ClientReport => false,
ItemType::Profile => false,

ItemType::ReplayEvent => false,
ItemType::ReplayRecording => false,
// Without knowing more, `Unknown` items are allowed to be repeated
ItemType::Unknown(_) => false,
}
Expand Down Expand Up @@ -1834,6 +1851,7 @@ impl EnvelopeProcessor {
self.process_client_reports(state);
self.process_user_reports(state);
self.process_profiles(state);
self.process_replays(state);

if state.creates_event() {
if_processing!({
Expand Down
3 changes: 3 additions & 0 deletions relay-server/src/actors/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ pub enum Feature {
#[serde(rename = "organizations:profiling")]
Profiling,

#[serde(rename = "organizations:session-replay")]
Replays,

/// Unused.
///
/// This used to control the initial experimental metrics extraction for sessions and has been
Expand Down
158 changes: 157 additions & 1 deletion relay-server/src/actors/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const MAX_EXPLODED_SESSIONS: usize = 100;
/// Fallback name used for attachment items without a `filename` header.
const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";

const REPLAY_RECORDINGS_ATTACHMENT_NAME: &str = "rr";

#[derive(Fail, Debug)]
pub enum StoreError {
#[fail(display = "failed to send kafka message")]
Expand All @@ -59,6 +61,8 @@ struct Producers {
sessions: Producer,
metrics: Producer,
profiles: Producer,
replay_events: Producer,
replay_recordings: Producer,
}

impl Producers {
Expand All @@ -76,6 +80,8 @@ impl Producers {
KafkaTopic::Sessions => Some(&self.sessions),
KafkaTopic::Metrics => Some(&self.metrics),
KafkaTopic::Profiles => Some(&self.profiles),
KafkaTopic::ReplayEvents => Some(&self.replay_events),
KafkaTopic::ReplayRecordings => Some(&self.replay_recordings),
}
}
}
Expand Down Expand Up @@ -133,6 +139,16 @@ impl StoreForwarder {
sessions: make_producer(&*config, &mut reused_producers, KafkaTopic::Sessions)?,
metrics: make_producer(&*config, &mut reused_producers, KafkaTopic::Metrics)?,
profiles: make_producer(&*config, &mut reused_producers, KafkaTopic::Profiles)?,
replay_recordings: make_producer(
&*config,
&mut reused_producers,
KafkaTopic::ReplayRecordings,
)?,
replay_events: make_producer(
&*config,
&mut reused_producers,
KafkaTopic::ReplayEvents,
)?,
};

Ok(Self { config, producers })
Expand Down Expand Up @@ -177,7 +193,6 @@ impl StoreForwarder {
while offset < size {
let max_chunk_size = self.config.attachment_chunk_size();
let chunk_size = std::cmp::min(max_chunk_size, size - offset);

let attachment_message = KafkaMessage::AttachmentChunk(AttachmentChunkKafkaMessage {
payload: payload.slice(offset, offset + chunk_size),
event_id,
Expand Down Expand Up @@ -439,6 +454,75 @@ impl StoreForwarder {
);
Ok(())
}
fn produce_replay_event(
&self,
replay_id: EventId,
project_id: ProjectId,
start_time: Instant,
item: &Item,
) -> Result<(), StoreError> {
let message = ReplayEventKafkaMessage {
replay_id,
project_id,
start_time: UnixTimestamp::from_instant(start_time).as_secs(),
payload: item.payload(),
};
relay_log::trace!("Sending replay event to Kafka");
self.produce(KafkaTopic::ReplayEvents, KafkaMessage::ReplayEvent(message))?;
metric!(
counter(RelayCounters::ProcessingMessageProduced) += 1,
event_type = "replay_event"
);
Ok(())
}

fn produce_replay_recording_chunks(
&self,
replay_id: EventId,
project_id: ProjectId,
item: &Item,
) -> Result<ChunkedAttachment, StoreError> {
let id = Uuid::new_v4().to_string();

let mut chunk_index = 0;
let mut offset = 0;
let payload = item.payload();
let size = item.len();

// This skips chunks for empty replay recordings. The consumer does not require chunks for
// empty replay recordings. `chunks` will be `0` in this case.
while offset < size {
let max_chunk_size = self.config.attachment_chunk_size();
let chunk_size = std::cmp::min(max_chunk_size, size - offset);

let replay_recording_chunk_message =
KafkaMessage::ReplayRecordingChunk(ReplayRecordingChunkKafkaMessage {
payload: payload.slice(offset, offset + chunk_size),
replay_id,
project_id,
id: id.clone(),
chunk_index,
});
self.produce(KafkaTopic::ReplayRecordings, replay_recording_chunk_message)?;
offset += chunk_size;
chunk_index += 1;
}

// The chunk_index is incremented after every loop iteration. After we exit the loop, it
// is one larger than the last chunk, so it is equal to the number of chunks.

Ok(ChunkedAttachment {
id,
name: REPLAY_RECORDINGS_ATTACHMENT_NAME.to_owned(),
content_type: item
.content_type()
.map(|content_type| content_type.as_str().to_owned()),
attachment_type: item.attachment_type().unwrap_or_default(),
chunks: chunk_index,
size: Some(size),
rate_limited: Some(item.rate_limited()),
})
}
}

/// StoreMessageForwarder is an async actor since the only thing it does is put the messages
Expand Down Expand Up @@ -526,6 +610,17 @@ struct EventKafkaMessage {
/// Attachments that are potentially relevant for processing.
attachments: Vec<ChunkedAttachment>,
}
#[derive(Clone, Debug, Serialize)]
struct ReplayEventKafkaMessage {
/// Raw event payload.
payload: Bytes,
/// Time at which the event was received by Relay.
start_time: u64,
/// The event id.
replay_id: EventId,
/// The project id for the current event.
project_id: ProjectId,
}

/// Container payload for chunks of attachments.
#[derive(Debug, Serialize)]
Expand Down Expand Up @@ -558,6 +653,32 @@ struct AttachmentKafkaMessage {
attachment: ChunkedAttachment,
}

/// Container payload for chunks of attachments.
#[derive(Debug, Serialize)]
struct ReplayRecordingChunkKafkaMessage {
/// Chunk payload of the replay recording.
payload: Bytes,
/// The replay id.
replay_id: EventId,
/// The project id for the current replay.
project_id: ProjectId,
/// The recording ID within the replay.
///
/// The triple `(project_id, replay_id, id)` identifies a replay recording chunk uniquely.
id: String,
/// Sequence number of chunk. Starts at 0 and ends at `ReplayRecordingKafkaMessage.num_chunks - 1`.
chunk_index: usize,
}
#[derive(Debug, Serialize)]
struct ReplayRecordingKafkaMessage {
/// The replay id.
replay_id: EventId,
/// The project id for the current event.
project_id: ProjectId,
/// The recording attachment.
replay_recording: ChunkedAttachment,
}

/// User report for an event wrapped up in a message ready for consumption in Kafka.
///
/// Is always independent of an event and can be sent as part of any envelope.
Expand Down Expand Up @@ -626,6 +747,9 @@ enum KafkaMessage {
Session(SessionKafkaMessage),
Metric(MetricKafkaMessage),
Profile(ProfileKafkaMessage),
ReplayEvent(ReplayEventKafkaMessage),
ReplayRecording(ReplayRecordingKafkaMessage),
ReplayRecordingChunk(ReplayRecordingChunkKafkaMessage),
}

impl KafkaMessage {
Expand All @@ -638,6 +762,9 @@ impl KafkaMessage {
KafkaMessage::Session(_) => "session",
KafkaMessage::Metric(_) => "metric",
KafkaMessage::Profile(_) => "profile",
KafkaMessage::ReplayEvent(_) => "replay_event",
KafkaMessage::ReplayRecording(_) => "replay_recording",
KafkaMessage::ReplayRecordingChunk(_) => "replay_recording_chunk",
}
}

Expand All @@ -651,6 +778,9 @@ impl KafkaMessage {
Self::Session(_message) => Uuid::nil(), // Explicit random partitioning for sessions
Self::Metric(_message) => Uuid::nil(), // TODO(ja): Determine a partitioning key
Self::Profile(_message) => Uuid::nil(),
Self::ReplayEvent(_message) => Uuid::nil(),
Self::ReplayRecording(message) => message.replay_id.0,
Self::ReplayRecordingChunk(message) => message.replay_id.0,
};

if uuid.is_nil() {
Expand Down Expand Up @@ -766,6 +896,32 @@ impl Handler<StoreEnvelope> for StoreForwarder {
start_time,
item,
)?,
ItemType::ReplayRecording => {
let replay_recording = self.produce_replay_recording_chunks(
event_id.ok_or(StoreError::NoEventId)?,
scoping.project_id,
item,
)?;
relay_log::trace!("Sending individual replay_recordings of envelope to kafka");
let replay_recording_message =
KafkaMessage::ReplayRecording(ReplayRecordingKafkaMessage {
replay_id: event_id.ok_or(StoreError::NoEventId)?,
project_id: scoping.project_id,
replay_recording,
});

self.produce(KafkaTopic::ReplayRecordings, replay_recording_message)?;
metric!(
counter(RelayCounters::ProcessingMessageProduced) += 1,
event_type = "replay_recording"
);
}
ItemType::ReplayEvent => self.produce_replay_event(
event_id.ok_or(StoreError::NoEventId)?,
scoping.project_id,
start_time,
item,
)?,
_ => {}
}
}
Expand Down
Loading