From b03f7fecc26edb72a40737f23d717973b31ee753 Mon Sep 17 00:00:00 2001 From: Joshua Ferge Date: Mon, 24 Apr 2023 16:08:11 -0700 Subject: [PATCH] ref(replays): remove chunking logic for large recordings --- relay-server/src/actors/store.rs | 90 +------------------------------- 1 file changed, 1 insertion(+), 89 deletions(-) diff --git a/relay-server/src/actors/store.rs b/relay-server/src/actors/store.rs index 838239cda24..41883d6bf47 100644 --- a/relay-server/src/actors/store.rs +++ b/relay-server/src/actors/store.rs @@ -660,10 +660,6 @@ impl StoreService { start_time: Instant, retention: u16, ) -> Result<(), StoreError> { - // Payloads must be chunked if they exceed a certain threshold. We do not chunk every - // message because we can achieve better parallelism when dealing with a single - // message. - // 2000 bytes are reserved for the message metadata. let max_message_metadata_size = 2000; @@ -693,90 +689,12 @@ impl StoreService { event_type = "replay_recording_not_chunked" ); } else { - // Produce chunks to the topic first. Ordering matters. - let replay_recording = self.produce_replay_recording_chunks( - event_id.ok_or(StoreError::NoEventId)?, - scoping.organization_id, - scoping.project_id, - item, - )?; - - let message = KafkaMessage::ReplayRecording(ReplayRecordingKafkaMessage { - replay_id: event_id.ok_or(StoreError::NoEventId)?, - project_id: scoping.project_id, - key_id: scoping.key_id, - org_id: scoping.organization_id, - received: UnixTimestamp::from_instant(start_time).as_secs(), - retention_days: retention, - replay_recording, - }); - - self.produce( - KafkaTopic::ReplayRecordings, - scoping.organization_id, - message, - )?; - - metric!( - counter(RelayCounters::ProcessingMessageProduced) += 1, - event_type = "replay_recording" - ); + relay_log::warn!("replay_recording over maximum size."); }; Ok(()) } - fn produce_replay_recording_chunks( - &self, - replay_id: EventId, - organization_id: u64, - project_id: ProjectId, - item: &Item, - ) -> Result { - let id = Uuid::new_v4().to_string(); - - let mut chunk_index = 0; - let mut offset = 0; - let payload = item.payload(); - let size = item.len(); - - // This skips chunks for empty replay recordings. The consumer does not require chunks for - // empty replay recordings. `chunks` will be `0` in this case. - while offset < size { - // XXX: Max msesage size is 1MB. We reserve 2000 bytes for metadata and the rest is - // consumed by the blob. - let max_chunk_size = 1000 * 1000 - 2000; - let chunk_size = std::cmp::min(max_chunk_size, size - offset); - - let replay_recording_chunk_message = - KafkaMessage::ReplayRecordingChunk(ReplayRecordingChunkKafkaMessage { - payload: payload.slice(offset..offset + chunk_size), - replay_id, - project_id, - id: id.clone(), - chunk_index, - }); - - self.produce( - KafkaTopic::ReplayRecordings, - organization_id, - replay_recording_chunk_message, - )?; - - offset += chunk_size; - chunk_index += 1; - } - - // The chunk_index is incremented after every loop iteration. After we exit the loop, it - // is one larger than the last chunk, so it is equal to the number of chunks. - - Ok(ReplayRecordingChunkMeta { - id, - chunks: chunk_index, - size: Some(size), - }) - } - fn produce_check_in( &self, organization_id: u64, @@ -1071,8 +989,6 @@ enum KafkaMessage { Profile(ProfileKafkaMessage), ReplayEvent(ReplayEventKafkaMessage), ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage), - ReplayRecording(ReplayRecordingKafkaMessage), - ReplayRecordingChunk(ReplayRecordingChunkKafkaMessage), CheckIn(CheckInKafkaMessage), } @@ -1087,8 +1003,6 @@ impl Message for KafkaMessage { KafkaMessage::Metric(_) => "metric", KafkaMessage::Profile(_) => "profile", KafkaMessage::ReplayEvent(_) => "replay_event", - KafkaMessage::ReplayRecording(_) => "replay_recording", - KafkaMessage::ReplayRecordingChunk(_) => "replay_recording_chunk", KafkaMessage::ReplayRecordingNotChunked(_) => "replay_recording_not_chunked", KafkaMessage::CheckIn(_) => "check_in", } @@ -1105,8 +1019,6 @@ impl Message for KafkaMessage { Self::Metric(_message) => Uuid::nil(), // TODO(ja): Determine a partitioning key Self::Profile(_message) => Uuid::nil(), Self::ReplayEvent(message) => message.replay_id.0, - Self::ReplayRecording(message) => message.replay_id.0, - Self::ReplayRecordingChunk(message) => message.replay_id.0, Self::ReplayRecordingNotChunked(_message) => Uuid::nil(), // Ensure random partitioning. Self::CheckIn(_message) => Uuid::nil(), };