@@ -21,6 +21,7 @@ use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64};
21
21
use std:: sync:: Arc ;
22
22
use std:: task:: { Context , Poll } ;
23
23
use std:: time:: Duration ;
24
+ use tokio:: time;
24
25
use tokio:: time:: sleep;
25
26
use tracing:: { error, info, trace, warn} ;
26
27
@@ -36,10 +37,18 @@ pub enum AutoCommit {
36
37
Disabled ,
37
38
/// The auto-commit is enabled and the offset is stored on the server after a certain interval.
38
39
Interval ( IggyDuration ) ,
39
- /// The auto-commit is enabled and the offset is stored on the server after a certain interval or depending on the mode.
40
+ /// The auto-commit is enabled and the offset is stored on the server after a certain interval or depending on the mode when consuming the messages .
40
41
IntervalOrWhen ( IggyDuration , AutoCommitWhen ) ,
41
- /// The auto-commit is enabled and the offset is stored on the server depending on the mode.
42
+ /// The auto-commit is enabled and the offset is stored on the server after a certain interval or depending on the mode after consuming the messages.
43
+ ///
44
+ /// **This will only work with the `IggyConsumerMessageExt` trait when using `consume_messages()`.**
45
+ IntervalOrAfter ( IggyDuration , AutoCommitAfter ) ,
46
+ /// The auto-commit is enabled and the offset is stored on the server depending on the mode when consuming the messages.
42
47
When ( AutoCommitWhen ) ,
48
+ /// The auto-commit is enabled and the offset is stored on the server depending on the mode after consuming the messages.
49
+ ///
50
+ /// **This will only work with the `IggyConsumerMessageExt` trait when using `consume_messages()`.**
51
+ After ( AutoCommitAfter ) ,
43
52
}
44
53
45
54
/// The auto-commit mode for storing the offset on the server.
@@ -55,6 +64,19 @@ pub enum AutoCommitWhen {
55
64
ConsumingEveryNthMessage ( u32 ) ,
56
65
}
57
66
67
+ /// The auto-commit mode for storing the offset on the server **after** receiving the messages.
68
+ ///
69
+ /// **This will only work with the `IggyConsumerMessageExt` trait when using `consume_messages()`.**
70
+ #[ derive( Debug , PartialEq , Copy , Clone ) ]
71
+ pub enum AutoCommitAfter {
72
+ /// The offset is stored on the server after all the messages are consumed.
73
+ ConsumingAllMessages ,
74
+ /// The offset is stored on the server after consuming each message.
75
+ ConsumingEachMessage ,
76
+ /// The offset is stored on the server after consuming every Nth message.
77
+ ConsumingEveryNthMessage ( u32 ) ,
78
+ }
79
+
58
80
unsafe impl Send for IggyConsumer { }
59
81
unsafe impl Sync for IggyConsumer { }
60
82
@@ -89,6 +111,8 @@ pub struct IggyConsumer {
89
111
last_polled_at : Arc < AtomicU64 > ,
90
112
current_partition_id : Arc < AtomicU32 > ,
91
113
reconnection_retry_interval : IggyDuration ,
114
+ init_retries : Option < u32 > ,
115
+ init_retry_interval : IggyDuration ,
92
116
allow_replay : bool ,
93
117
}
94
118
@@ -108,7 +132,9 @@ impl IggyConsumer {
108
132
auto_join_consumer_group : bool ,
109
133
create_consumer_group_if_not_exists : bool ,
110
134
encryptor : Option < Arc < EncryptorKind > > ,
111
- retry_interval : IggyDuration ,
135
+ reconnection_retry_interval : IggyDuration ,
136
+ init_retries : Option < u32 > ,
137
+ init_retry_interval : IggyDuration ,
112
138
allow_replay : bool ,
113
139
) -> Self {
114
140
let ( store_offset_sender, _) = flume:: unbounded ( ) ;
@@ -160,11 +186,17 @@ impl IggyConsumer {
160
186
} ,
161
187
last_polled_at : Arc :: new ( AtomicU64 :: new ( 0 ) ) ,
162
188
current_partition_id : Arc :: new ( AtomicU32 :: new ( 0 ) ) ,
163
- reconnection_retry_interval : retry_interval,
189
+ reconnection_retry_interval,
190
+ init_retries,
191
+ init_retry_interval,
164
192
allow_replay,
165
193
}
166
194
}
167
195
196
+ pub ( crate ) fn auto_commit ( & self ) -> AutoCommit {
197
+ self . auto_commit
198
+ }
199
+
168
200
/// Returns the name of the consumer.
169
201
pub fn name ( & self ) -> & str {
170
202
& self . consumer_name
@@ -230,19 +262,65 @@ impl IggyConsumer {
230
262
return Ok ( ( ) ) ;
231
263
}
232
264
265
+ let stream_id = self . stream_id . clone ( ) ;
266
+ let topic_id = self . topic_id . clone ( ) ;
267
+ let consumer_name = & self . consumer_name ;
268
+
269
+ info ! (
270
+ "Initializing consumer: {consumer_name} for stream: {stream_id}, topic: {topic_id}..."
271
+ ) ;
272
+
233
273
{
274
+ let mut retries = 0 ;
275
+ let init_retries = self . init_retries . unwrap_or_default ( ) ;
276
+ let interval = self . init_retry_interval ;
277
+
278
+ let mut timer = time:: interval ( interval. get_duration ( ) ) ;
279
+ timer. tick ( ) . await ;
280
+
234
281
let client = self . client . read ( ) . await ;
235
- if client. get_stream ( & self . stream_id ) . await ?. is_none ( ) {
282
+ let mut stream_exists = client. get_stream ( & stream_id) . await ?. is_some ( ) ;
283
+ let mut topic_exists = client. get_topic ( & stream_id, & topic_id) . await ?. is_some ( ) ;
284
+
285
+ loop {
286
+ if stream_exists && topic_exists {
287
+ info ! ( "Stream: {stream_id} and topic: {topic_id} were found. Initializing consumer..." , ) ;
288
+ break ;
289
+ }
290
+
291
+ if retries >= init_retries {
292
+ break ;
293
+ }
294
+
295
+ retries += 1 ;
296
+ if !stream_exists {
297
+ warn ! ( "Stream: {stream_id} does not exist. Retrying ({retries}/{init_retries}) in {interval}..." , ) ;
298
+ timer. tick ( ) . await ;
299
+ stream_exists = client. get_stream ( & stream_id) . await ?. is_some ( ) ;
300
+ }
301
+
302
+ if !stream_exists {
303
+ continue ;
304
+ }
305
+
306
+ topic_exists = client. get_topic ( & stream_id, & topic_id) . await ?. is_some ( ) ;
307
+ if topic_exists {
308
+ break ;
309
+ }
310
+
311
+ warn ! ( "Topic: {topic_id} does not exist in stream: {stream_id}. Retrying ({retries}/{init_retries}) in {interval}..." , ) ;
312
+ timer. tick ( ) . await ;
313
+ }
314
+
315
+ if !stream_exists {
316
+ error ! ( "Stream: {stream_id} was not found." ) ;
236
317
return Err ( IggyError :: StreamNameNotFound (
237
318
self . stream_id . get_string_value ( ) . unwrap_or_default ( ) ,
238
319
) ) ;
239
- }
320
+ } ;
240
321
241
- if client
242
- . get_topic ( & self . stream_id , & self . topic_id )
243
- . await ?
244
- . is_none ( )
245
- {
322
+ if !topic_exists {
323
+ error ! ( "Topic: {topic_id} was not found in stream: {stream_id}." ) ;
246
324
return Err ( IggyError :: TopicNameNotFound (
247
325
self . topic_id . get_string_value ( ) . unwrap_or_default ( ) ,
248
326
self . stream_id . get_string_value ( ) . unwrap_or_default ( ) ,
@@ -256,6 +334,7 @@ impl IggyConsumer {
256
334
match self . auto_commit {
257
335
AutoCommit :: Interval ( interval) => self . store_offsets_in_background ( interval) ,
258
336
AutoCommit :: IntervalOrWhen ( interval, _) => self . store_offsets_in_background ( interval) ,
337
+ AutoCommit :: IntervalOrAfter ( interval, _) => self . store_offsets_in_background ( interval) ,
259
338
_ => { }
260
339
}
261
340
@@ -285,6 +364,10 @@ impl IggyConsumer {
285
364
} ) ;
286
365
287
366
self . initialized = true ;
367
+ info ! (
368
+ "Consumer: {consumer_name} has been initialized for stream: {}, topic: {}." ,
369
+ self . stream_id, self . topic_id
370
+ ) ;
288
371
Ok ( ( ) )
289
372
}
290
373
@@ -359,7 +442,7 @@ impl IggyConsumer {
359
442
} ) ;
360
443
}
361
444
362
- fn send_store_offset ( & self , partition_id : u32 , offset : u64 ) {
445
+ pub ( crate ) fn send_store_offset ( & self , partition_id : u32 , offset : u64 ) {
363
446
if let Err ( error) = self . store_offset_sender . send ( ( partition_id, offset) ) {
364
447
error ! ( "Failed to send offset to store: {error}, please verify if `init()` on IggyConsumer object has been called." ) ;
365
448
}
@@ -868,6 +951,8 @@ pub struct IggyConsumerBuilder {
868
951
create_consumer_group_if_not_exists : bool ,
869
952
encryptor : Option < Arc < EncryptorKind > > ,
870
953
reconnection_retry_interval : IggyDuration ,
954
+ init_retries : Option < u32 > ,
955
+ init_retry_interval : IggyDuration ,
871
956
allow_replay : bool ,
872
957
}
873
958
@@ -901,6 +986,8 @@ impl IggyConsumerBuilder {
901
986
encryptor,
902
987
polling_interval,
903
988
reconnection_retry_interval : IggyDuration :: ONE_SECOND ,
989
+ init_retries : None ,
990
+ init_retry_interval : IggyDuration :: ONE_SECOND ,
904
991
allow_replay : false ,
905
992
}
906
993
}
@@ -941,6 +1028,13 @@ impl IggyConsumerBuilder {
941
1028
}
942
1029
}
943
1030
1031
+ pub fn commit_failed_messages ( self ) -> Self {
1032
+ Self {
1033
+ auto_commit : AutoCommit :: Disabled ,
1034
+ ..self
1035
+ }
1036
+ }
1037
+
944
1038
/// Automatically joins the consumer group if the consumer is a part of a consumer group.
945
1039
pub fn auto_join_consumer_group ( self ) -> Self {
946
1040
Self {
@@ -1013,6 +1107,17 @@ impl IggyConsumerBuilder {
1013
1107
}
1014
1108
}
1015
1109
1110
+ /// Sets the number of retries and the interval when initializing the consumer if the stream or topic is not found.
1111
+ /// Might be useful when the stream or topic is created dynamically by the producer.
1112
+ /// By default, the consumer will not retry.
1113
+ pub fn init_retries ( self , retries : u32 , interval : IggyDuration ) -> Self {
1114
+ Self {
1115
+ init_retries : Some ( retries) ,
1116
+ init_retry_interval : interval,
1117
+ ..self
1118
+ }
1119
+ }
1120
+
1016
1121
/// Allows replaying the messages, `false` by default.
1017
1122
pub fn allow_replay ( self ) -> Self {
1018
1123
Self {
@@ -1040,6 +1145,8 @@ impl IggyConsumerBuilder {
1040
1145
self . create_consumer_group_if_not_exists ,
1041
1146
self . encryptor ,
1042
1147
self . reconnection_retry_interval ,
1148
+ self . init_retries ,
1149
+ self . init_retry_interval ,
1043
1150
self . allow_replay ,
1044
1151
)
1045
1152
}
0 commit comments