1
- use crate :: streaming:: batching:: message_batch:: RetainedMessageBatch ;
1
+ use crate :: streaming:: batching:: message_batch:: { RetainedMessageBatch , RETAINED_BATCH_OVERHEAD } ;
2
2
use flume:: { unbounded, Receiver } ;
3
3
use iggy:: { error:: IggyError , utils:: duration:: IggyDuration } ;
4
4
use std:: {
5
5
io:: IoSlice ,
6
- sync:: { atomic:: AtomicU64 , Arc } ,
6
+ sync:: {
7
+ atomic:: { AtomicU64 , Ordering } ,
8
+ Arc ,
9
+ } ,
7
10
time:: Duration ,
8
11
} ;
9
- use tokio:: {
10
- fs:: File ,
11
- io:: { AsyncSeekExt , AsyncWriteExt } ,
12
- select,
13
- time:: sleep,
14
- } ;
12
+ use tokio:: { fs:: File , io:: AsyncWriteExt , select, time:: sleep} ;
15
13
use tracing:: { error, trace, warn} ;
16
14
17
15
#[ derive( Debug ) ]
@@ -35,12 +33,12 @@ impl PersisterTask {
35
33
file : File ,
36
34
file_path : String ,
37
35
fsync : bool ,
38
- file_size : Arc < AtomicU64 > ,
36
+ log_file_size : Arc < AtomicU64 > ,
39
37
max_retries : u32 ,
40
38
retry_delay : IggyDuration ,
41
39
) -> Self {
42
40
let ( sender, receiver) = unbounded ( ) ;
43
- let file_size_clone = file_size . clone ( ) ;
41
+ let log_file_size_clone = log_file_size . clone ( ) ;
44
42
let file_path_clone = file_path. clone ( ) ;
45
43
let handle = tokio:: spawn ( async move {
46
44
Self :: run (
@@ -50,7 +48,7 @@ impl PersisterTask {
50
48
fsync,
51
49
max_retries,
52
50
retry_delay,
53
- file_size_clone ,
51
+ log_file_size_clone ,
54
52
)
55
53
. await ;
56
54
} ) ;
@@ -171,12 +169,12 @@ impl PersisterTask {
171
169
fsync : bool ,
172
170
max_retries : u32 ,
173
171
retry_delay : IggyDuration ,
174
- file_size : Arc < AtomicU64 > ,
172
+ log_file_size : Arc < AtomicU64 > ,
175
173
) {
176
174
while let Ok ( request) = receiver. recv_async ( ) . await {
177
175
match request {
178
176
PersisterTaskCommand :: WriteRequest ( batch_to_write) => {
179
- if let Err ( e ) = Self :: write_with_retries (
177
+ match Self :: write_with_retries (
180
178
& mut file,
181
179
& file_path,
182
180
batch_to_write,
@@ -186,21 +184,14 @@ impl PersisterTask {
186
184
)
187
185
. await
188
186
{
189
- error ! (
187
+ Ok ( bytes_written) => {
188
+ log_file_size. fetch_add ( bytes_written, Ordering :: AcqRel ) ;
189
+ }
190
+ Err ( e) => {
191
+ error ! (
190
192
"Failed to persist data in LogPersisterTask for file {file_path}: {:?}" ,
191
193
e
192
- ) ;
193
- } else {
194
- match file. stream_position ( ) . await {
195
- Ok ( pos) => {
196
- file_size. store ( pos, std:: sync:: atomic:: Ordering :: Release ) ;
197
- }
198
- Err ( e) => {
199
- error ! (
200
- "Failed to get file stream position in LogPersisterTask for file {file_path}: {:?}" ,
201
- e
202
- ) ;
203
- }
194
+ )
204
195
}
205
196
}
206
197
}
@@ -227,18 +218,19 @@ impl PersisterTask {
227
218
fsync : bool ,
228
219
max_retries : u32 ,
229
220
retry_delay : IggyDuration ,
230
- ) -> Result < ( ) , IggyError > {
221
+ ) -> Result < u64 , IggyError > {
231
222
let header = batch_to_write. header_as_bytes ( ) ;
232
223
let batch_bytes = batch_to_write. bytes ;
233
224
let slices = [ IoSlice :: new ( & header) , IoSlice :: new ( & batch_bytes) ] ;
225
+ let bytes_written = RETAINED_BATCH_OVERHEAD + batch_bytes. len ( ) as u64 ;
234
226
235
227
let mut attempts = 0 ;
236
228
loop {
237
229
match file. write_vectored ( & slices) . await {
238
230
Ok ( _) => {
239
231
if fsync {
240
232
match file. sync_all ( ) . await {
241
- Ok ( _) => return Ok ( ( ) ) ,
233
+ Ok ( _) => return Ok ( bytes_written ) ,
242
234
Err ( e) => {
243
235
attempts += 1 ;
244
236
error ! (
@@ -248,7 +240,7 @@ impl PersisterTask {
248
240
}
249
241
}
250
242
} else {
251
- return Ok ( ( ) ) ;
243
+ return Ok ( bytes_written ) ;
252
244
}
253
245
}
254
246
Err ( e) => {
@@ -265,7 +257,7 @@ impl PersisterTask {
265
257
) ;
266
258
return Err ( IggyError :: CannotWriteToFile ) ;
267
259
}
268
- tokio :: time :: sleep ( retry_delay. get_duration ( ) ) . await ;
260
+ sleep ( retry_delay. get_duration ( ) ) . await ;
269
261
}
270
262
}
271
263
}
0 commit comments