6
6
7
7
use std:: net:: IpAddr ;
8
8
use std:: sync:: Arc ;
9
- use std:: time:: Duration ;
10
9
use std:: time:: Instant ;
11
10
12
11
use actix:: prelude:: * ;
@@ -15,15 +14,14 @@ use chrono::SecondsFormat;
15
14
use futures:: future:: Future ;
16
15
use serde:: { Deserialize , Serialize } ;
17
16
18
- use relay_common:: ProjectId ;
17
+ use relay_common:: { LogError , ProjectId } ;
19
18
use relay_config:: Config ;
20
19
use relay_filter:: FilterStatKey ;
21
20
use relay_general:: protocol:: EventId ;
22
21
use relay_quotas:: { ReasonCode , Scoping } ;
23
22
24
23
use crate :: actors:: upstream:: SendQuery ;
25
24
use crate :: actors:: upstream:: { UpstreamQuery , UpstreamRelay } ;
26
- use crate :: utils:: run_later;
27
25
use crate :: ServerError ;
28
26
29
27
// Choose the outcome module implementation (either processing or non-processing).
@@ -321,20 +319,28 @@ impl Message for TrackRawOutcome {
321
319
/// Common implementation for both processing and non-processing
322
320
impl OutcomeProducer {
323
321
fn send_batch ( & mut self , context : & mut Context < Self > ) {
324
- log:: trace!( "Sending outcome batch" ) ;
322
+ log:: trace!( "sending outcome batch" ) ;
323
+
324
+ //the future should be either canceled (if we are called with a full batch)
325
+ // or already called (if we are called by a timeout)
326
+ self . send_outcomes_future = None ;
327
+
328
+ if self . unsent_outcomes . len ( ) == 0 {
329
+ log:: warn!( "unexpected send_batch scheduled with no outcomes to send." ) ;
330
+ return ;
331
+ }
332
+
325
333
let request = SendOutcomes {
326
334
outcomes : self . unsent_outcomes . drain ( ..) . collect ( ) ,
327
335
} ;
328
- self . send_scheduled = false ;
329
336
330
- //BUG (RaduW) the future is never resolved, there is no tracing error.
331
337
self . upstream
332
338
. send ( SendQuery ( request) )
333
339
. map ( |_| {
334
340
log:: trace!( "outcome batch sent." ) ;
335
341
} )
336
- . map_err ( |_ | {
337
- log:: warn !( "outcome batch sending failed!" ) ;
342
+ . map_err ( |error | {
343
+ log:: error !( "outcome batch sending failed with: {}" , LogError ( & error ) ) ;
338
344
} )
339
345
. into_actor ( self )
340
346
. spawn ( context) ;
@@ -348,14 +354,13 @@ impl OutcomeProducer {
348
354
log:: trace!( "Batching outcome" ) ;
349
355
self . unsent_outcomes . push ( message) ;
350
356
if self . unsent_outcomes . len ( ) >= self . config . max_outcome_batch_size ( ) {
357
+ if let Some ( send_outcomes_future) = self . send_outcomes_future {
358
+ context. cancel_future ( send_outcomes_future) ;
359
+ }
351
360
self . send_batch ( context)
352
- } else if !self . send_scheduled {
353
- self . send_scheduled = true ;
354
- run_later (
355
- Duration :: from_millis ( self . config . max_outcome_interval_millsec ( ) ) ,
356
- Self :: send_batch,
357
- )
358
- . spawn ( context)
361
+ } else if self . send_outcomes_future . is_none ( ) {
362
+ self . send_outcomes_future =
363
+ Some ( context. run_later ( self . config . max_outcome_interval ( ) , Self :: send_batch) ) ;
359
364
}
360
365
361
366
Ok ( ( ) )
@@ -407,7 +412,7 @@ mod processing {
407
412
producer : Option < ThreadedProducer > ,
408
413
pub ( super ) upstream : Addr < UpstreamRelay > ,
409
414
pub ( super ) unsent_outcomes : Vec < TrackRawOutcome > ,
410
- pub ( super ) send_scheduled : bool ,
415
+ pub ( super ) send_outcomes_future : Option < SpawnHandle > ,
411
416
}
412
417
413
418
impl OutcomeProducer {
@@ -433,7 +438,7 @@ mod processing {
433
438
producer : future_producer,
434
439
upstream,
435
440
unsent_outcomes : Vec :: new ( ) ,
436
- send_scheduled : false ,
441
+ send_outcomes_future : None ,
437
442
} )
438
443
}
439
444
@@ -524,7 +529,7 @@ mod non_processing {
524
529
pub ( super ) config : Arc < Config > ,
525
530
pub ( super ) upstream : Addr < UpstreamRelay > ,
526
531
pub ( super ) unsent_outcomes : Vec < TrackRawOutcome > ,
527
- pub ( super ) send_scheduled : bool ,
532
+ pub ( super ) send_outcomes_future : Option < SpawnHandle > ,
528
533
}
529
534
530
535
impl OutcomeProducer {
@@ -536,7 +541,7 @@ mod non_processing {
536
541
config,
537
542
upstream,
538
543
unsent_outcomes : Vec :: new ( ) ,
539
- send_scheduled : false ,
544
+ send_outcomes_future : None ,
540
545
} )
541
546
}
542
547
}
0 commit comments