Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(replays): remove chunking logic for large recordings #2063

Merged
merged 3 commits into from
Jun 28, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 1 addition & 89 deletions relay-server/src/actors/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,10 +664,6 @@ impl StoreService {
start_time: Instant,
retention: u16,
) -> Result<(), StoreError> {
// Payloads must be chunked if they exceed a certain threshold. We do not chunk every
// message because we can achieve better parallelism when dealing with a single
// message.

// 2000 bytes are reserved for the message metadata.
let max_message_metadata_size = 2000;

Expand Down Expand Up @@ -697,90 +693,12 @@ impl StoreService {
event_type = "replay_recording_not_chunked"
);
} else {
// Produce chunks to the topic first. Ordering matters.
let replay_recording = self.produce_replay_recording_chunks(
event_id.ok_or(StoreError::NoEventId)?,
scoping.organization_id,
scoping.project_id,
item,
)?;

let message = KafkaMessage::ReplayRecording(ReplayRecordingKafkaMessage {
replay_id: event_id.ok_or(StoreError::NoEventId)?,
project_id: scoping.project_id,
key_id: scoping.key_id,
org_id: scoping.organization_id,
received: UnixTimestamp::from_instant(start_time).as_secs(),
retention_days: retention,
replay_recording,
});

self.produce(
KafkaTopic::ReplayRecordings,
scoping.organization_id,
message,
)?;

metric!(
counter(RelayCounters::ProcessingMessageProduced) += 1,
event_type = "replay_recording"
);
relay_log::warn!("replay_recording over maximum size.");
};

Ok(())
}

fn produce_replay_recording_chunks(
&self,
replay_id: EventId,
organization_id: u64,
project_id: ProjectId,
item: &Item,
) -> Result<ReplayRecordingChunkMeta, StoreError> {
let id = Uuid::new_v4().to_string();

let mut chunk_index = 0;
let mut offset = 0;
let payload = item.payload();
let size = item.len();

// This skips chunks for empty replay recordings. The consumer does not require chunks for
// empty replay recordings. `chunks` will be `0` in this case.
while offset < size {
// XXX: Max msesage size is 1MB. We reserve 2000 bytes for metadata and the rest is
// consumed by the blob.
let max_chunk_size = 1000 * 1000 - 2000;
let chunk_size = std::cmp::min(max_chunk_size, size - offset);

let replay_recording_chunk_message =
KafkaMessage::ReplayRecordingChunk(ReplayRecordingChunkKafkaMessage {
payload: payload.slice(offset..offset + chunk_size),
replay_id,
project_id,
id: id.clone(),
chunk_index,
});

self.produce(
KafkaTopic::ReplayRecordings,
organization_id,
replay_recording_chunk_message,
)?;

offset += chunk_size;
chunk_index += 1;
}

// The chunk_index is incremented after every loop iteration. After we exit the loop, it
// is one larger than the last chunk, so it is equal to the number of chunks.

Ok(ReplayRecordingChunkMeta {
id,
chunks: chunk_index,
size: Some(size),
})
}

fn produce_check_in(
&self,
organization_id: u64,
Expand Down Expand Up @@ -1079,8 +997,6 @@ enum KafkaMessage {
Profile(ProfileKafkaMessage),
ReplayEvent(ReplayEventKafkaMessage),
ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage),
ReplayRecording(ReplayRecordingKafkaMessage),
ReplayRecordingChunk(ReplayRecordingChunkKafkaMessage),
CheckIn(CheckInKafkaMessage),
}

Expand All @@ -1095,8 +1011,6 @@ impl Message for KafkaMessage {
KafkaMessage::Metric(_) => "metric",
KafkaMessage::Profile(_) => "profile",
KafkaMessage::ReplayEvent(_) => "replay_event",
KafkaMessage::ReplayRecording(_) => "replay_recording",
KafkaMessage::ReplayRecordingChunk(_) => "replay_recording_chunk",
KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked",
KafkaMessage::CheckIn(_) => "check_in",
}
Expand All @@ -1113,8 +1027,6 @@ impl Message for KafkaMessage {
Self::Metric(_message) => Uuid::nil(), // TODO(ja): Determine a partitioning key
Self::Profile(_message) => Uuid::nil(),
Self::ReplayEvent(message) => message.replay_id.0,
Self::ReplayRecording(message) => message.replay_id.0,
Self::ReplayRecordingChunk(message) => message.replay_id.0,
Self::ReplayRecordingNotChunked(_message) => Uuid::nil(), // Ensure random partitioning.
Self::CheckIn(_message) => Uuid::nil(),
};
Expand Down