@@ -60,6 +60,7 @@ struct Producers {
60
60
metrics : Producer ,
61
61
profiles : Producer ,
62
62
replay_recordings : Producer ,
63
+ replay_events : Producer ,
63
64
}
64
65
65
66
impl Producers {
@@ -78,6 +79,7 @@ impl Producers {
78
79
KafkaTopic :: Metrics => Some ( & self . metrics ) ,
79
80
KafkaTopic :: Profiles => Some ( & self . profiles ) ,
80
81
KafkaTopic :: ReplayRecordings => Some ( & self . replay_recordings ) ,
82
+ KafkaTopic :: ReplayEvents => Some ( & self . replay_events ) ,
81
83
}
82
84
}
83
85
}
@@ -140,6 +142,11 @@ impl StoreForwarder {
140
142
& mut reused_producers,
141
143
KafkaTopic :: ReplayRecordings ,
142
144
) ?,
145
+ replay_events : make_producer (
146
+ & * config,
147
+ & mut reused_producers,
148
+ KafkaTopic :: ReplayEvents ,
149
+ ) ?,
143
150
} ;
144
151
145
152
Ok ( Self { config, producers } )
@@ -193,6 +200,7 @@ impl StoreForwarder {
193
200
id : id. clone ( ) ,
194
201
chunk_index,
195
202
} ) ;
203
+
196
204
self . produce ( topic, attachment_message) ?;
197
205
offset += chunk_size;
198
206
chunk_index += 1 ;
@@ -444,6 +452,27 @@ impl StoreForwarder {
444
452
) ;
445
453
Ok ( ( ) )
446
454
}
455
+ fn produce_replay_event (
456
+ & self ,
457
+ event_id : EventId ,
458
+ project_id : ProjectId ,
459
+ start_time : Instant ,
460
+ item : & Item ,
461
+ ) -> Result < ( ) , StoreError > {
462
+ let message = ReplayEventKafkaMessage {
463
+ event_id,
464
+ project_id,
465
+ start_time : UnixTimestamp :: from_instant ( start_time) . as_secs ( ) ,
466
+ payload : item. payload ( ) ,
467
+ } ;
468
+ relay_log:: trace!( "Sending replay event to Kafka" ) ;
469
+ self . produce ( KafkaTopic :: ReplayEvents , KafkaMessage :: ReplayEvent ( message) ) ?;
470
+ metric ! (
471
+ counter( RelayCounters :: ProcessingMessageProduced ) += 1 ,
472
+ event_type = "replay_event"
473
+ ) ;
474
+ Ok ( ( ) )
475
+ }
447
476
}
448
477
449
478
/// StoreMessageForwarder is an async actor since the only thing it does is put the messages
@@ -531,6 +560,17 @@ struct EventKafkaMessage {
531
560
/// Attachments that are potentially relevant for processing.
532
561
attachments : Vec < ChunkedAttachment > ,
533
562
}
563
+ #[ derive( Clone , Debug , Serialize ) ]
564
+ struct ReplayEventKafkaMessage {
565
+ /// Raw event payload.
566
+ payload : Bytes ,
567
+ /// Time at which the event was received by Relay.
568
+ start_time : u64 ,
569
+ /// The event id.
570
+ event_id : EventId ,
571
+ /// The project id for the current event.
572
+ project_id : ProjectId ,
573
+ }
534
574
535
575
/// Container payload for chunks of attachments.
536
576
#[ derive( Debug , Serialize ) ]
@@ -631,6 +671,7 @@ enum KafkaMessage {
631
671
Metric ( MetricKafkaMessage ) ,
632
672
Profile ( ProfileKafkaMessage ) ,
633
673
ReplayRecording ( AttachmentKafkaMessage ) ,
674
+ ReplayEvent ( ReplayEventKafkaMessage ) ,
634
675
}
635
676
636
677
impl KafkaMessage {
@@ -644,6 +685,7 @@ impl KafkaMessage {
644
685
KafkaMessage :: Metric ( _) => "metric" ,
645
686
KafkaMessage :: Profile ( _) => "profile" ,
646
687
KafkaMessage :: ReplayRecording ( _) => "replay_recording" ,
688
+ KafkaMessage :: ReplayEvent ( _) => "replay_event" ,
647
689
}
648
690
}
649
691
@@ -658,6 +700,7 @@ impl KafkaMessage {
658
700
Self :: Metric ( _message) => Uuid :: nil ( ) , // TODO(ja): Determine a partitioning key
659
701
Self :: Profile ( _message) => Uuid :: nil ( ) ,
660
702
Self :: ReplayRecording ( _message) => Uuid :: nil ( ) ,
703
+ Self :: ReplayEvent ( _message) => Uuid :: nil ( ) ,
661
704
} ;
662
705
663
706
if uuid. is_nil ( ) {
@@ -790,6 +833,12 @@ impl Handler<StoreEnvelope> for StoreForwarder {
790
833
) ?;
791
834
replay_recordings. push ( replay_recording) ;
792
835
}
836
+ ItemType :: ReplayEvent => self . produce_replay_event (
837
+ event_id. ok_or ( StoreError :: NoEventId ) ?,
838
+ scoping. project_id ,
839
+ start_time,
840
+ item,
841
+ ) ?,
793
842
_ => { }
794
843
}
795
844
}
0 commit comments