1
1
//! Types for buffering envelopes.
2
2
3
+ use std:: error:: Error ;
3
4
use std:: sync:: atomic:: AtomicBool ;
4
5
use std:: sync:: atomic:: Ordering ;
5
6
use std:: sync:: Arc ;
6
7
use std:: time:: Duration ;
7
8
8
9
use relay_base_schema:: project:: ProjectKey ;
9
10
use relay_config:: Config ;
10
- use relay_system:: Request ;
11
11
use relay_system:: SendError ;
12
12
use relay_system:: { Addr , FromMessage , Interface , NoResponse , Receiver , Service } ;
13
+ use relay_system:: { Controller , Request , Shutdown } ;
13
14
use tokio:: sync:: watch;
15
+ use tokio:: time:: timeout;
14
16
15
17
use crate :: envelope:: Envelope ;
16
18
use crate :: services:: buffer:: envelope_buffer:: Peek ;
@@ -208,18 +210,18 @@ impl EnvelopeBufferService {
208
210
& mut self ,
209
211
buffer : & mut PolymorphicEnvelopeBuffer ,
210
212
) -> Result < ( ) , EnvelopeBufferError > {
211
- relay_log:: trace!( "EnvelopeBufferService peek " ) ;
213
+ relay_log:: trace!( "EnvelopeBufferService: peeking the buffer " ) ;
212
214
match buffer. peek ( ) . await ? {
213
215
Peek :: Empty => {
214
- relay_log:: trace!( "EnvelopeBufferService empty" ) ;
216
+ relay_log:: trace!( "EnvelopeBufferService: peek returned empty" ) ;
215
217
relay_statsd:: metric!(
216
218
counter( RelayCounters :: BufferTryPop ) += 1 ,
217
219
peek_result = "empty"
218
220
) ;
219
221
self . sleep = Duration :: MAX ; // wait for reset by `handle_message`.
220
222
}
221
223
Peek :: Ready ( _) => {
222
- relay_log:: trace!( "EnvelopeBufferService pop " ) ;
224
+ relay_log:: trace!( "EnvelopeBufferService: popping envelope " ) ;
223
225
relay_statsd:: metric!(
224
226
counter( RelayCounters :: BufferTryPop ) += 1 ,
225
227
peek_result = "ready"
@@ -234,7 +236,7 @@ impl EnvelopeBufferService {
234
236
self . sleep = Duration :: ZERO ; // try next pop immediately
235
237
}
236
238
Peek :: NotReady ( stack_key, envelope) => {
237
- relay_log:: trace!( "EnvelopeBufferService request update" ) ;
239
+ relay_log:: trace!( "EnvelopeBufferService: project(s) of envelope not ready, requesting project update" ) ;
238
240
relay_statsd:: metric!(
239
241
counter( RelayCounters :: BufferTryPop ) += 1 ,
240
242
peek_result = "not_ready"
@@ -268,23 +270,55 @@ impl EnvelopeBufferService {
268
270
// projects was already triggered (see XXX).
269
271
// For better separation of concerns, this prefetch should be triggered from here
270
272
// once buffer V1 has been removed.
271
- relay_log:: trace!( "EnvelopeBufferService push" ) ;
273
+ relay_log:: trace!( "EnvelopeBufferService: received push message " ) ;
272
274
self . push ( buffer, envelope) . await ;
273
275
}
274
276
EnvelopeBuffer :: NotReady ( project_key, envelope) => {
275
- relay_log:: trace!( "EnvelopeBufferService project not ready" ) ;
277
+ relay_log:: trace!(
278
+ "EnvelopeBufferService: received project not ready message for project key {}" ,
279
+ & project_key
280
+ ) ;
276
281
buffer. mark_ready ( & project_key, false ) ;
277
282
relay_statsd:: metric!( counter( RelayCounters :: BufferEnvelopesReturned ) += 1 ) ;
278
283
self . push ( buffer, envelope) . await ;
279
284
}
280
285
EnvelopeBuffer :: Ready ( project_key) => {
281
- relay_log:: trace!( "EnvelopeBufferService project ready {}" , & project_key) ;
286
+ relay_log:: trace!(
287
+ "EnvelopeBufferService: received project ready message for project key {}" ,
288
+ & project_key
289
+ ) ;
282
290
buffer. mark_ready ( & project_key, true ) ;
283
291
}
284
292
} ;
285
293
self . sleep = Duration :: ZERO ;
286
294
}
287
295
296
+ async fn handle_shutdown (
297
+ & mut self ,
298
+ buffer : & mut PolymorphicEnvelopeBuffer ,
299
+ message : Shutdown ,
300
+ ) -> bool {
301
+ // We gracefully shut down only if the shutdown has a timeout.
302
+ if let Some ( shutdown_timeout) = message. timeout {
303
+ relay_log:: trace!( "EnvelopeBufferService: shutting down gracefully" ) ;
304
+
305
+ let shutdown_result = timeout ( shutdown_timeout, buffer. shutdown ( ) ) . await ;
306
+ match shutdown_result {
307
+ Ok ( shutdown_result) => {
308
+ return shutdown_result;
309
+ }
310
+ Err ( error) => {
311
+ relay_log:: error!(
312
+ error = & error as & dyn Error ,
313
+ "the envelope buffer didn't shut down in time, some envelopes might be lost" ,
314
+ ) ;
315
+ }
316
+ }
317
+ }
318
+
319
+ false
320
+ }
321
+
288
322
async fn push ( & mut self , buffer : & mut PolymorphicEnvelopeBuffer , envelope : Box < Envelope > ) {
289
323
if let Err ( e) = buffer. push ( envelope) . await {
290
324
relay_log:: error!(
@@ -322,15 +356,17 @@ impl Service for EnvelopeBufferService {
322
356
} ;
323
357
buffer. initialize ( ) . await ;
324
358
325
- relay_log:: info!( "EnvelopeBufferService start" ) ;
359
+ let mut shutdown = Controller :: shutdown_handle ( ) ;
360
+
361
+ relay_log:: info!( "EnvelopeBufferService: starting" ) ;
326
362
let mut iteration = 0 ;
327
363
loop {
328
364
iteration += 1 ;
329
- relay_log:: trace!( "EnvelopeBufferService loop iteration {iteration}" ) ;
365
+ relay_log:: trace!( "EnvelopeBufferService: loop iteration {iteration}" ) ;
330
366
331
367
tokio:: select! {
332
368
// NOTE: we do not select a bias here.
333
- // On the one hand, we might want to prioritize dequeing over enqueing
369
+ // On the one hand, we might want to prioritize dequeuing over enqueuing
334
370
// so we do not exceed the buffer capacity by starving the dequeue.
335
371
// on the other hand, prioritizing old messages violates the LIFO design.
336
372
Ok ( ( ) ) = self . ready_to_pop( & mut buffer) => {
@@ -344,8 +380,15 @@ impl Service for EnvelopeBufferService {
344
380
Some ( message) = rx. recv( ) => {
345
381
self . handle_message( & mut buffer, message) . await ;
346
382
}
383
+ shutdown = shutdown. notified( ) => {
384
+ // In case the shutdown was handled, we break out of the loop signaling that
385
+ // there is no need to process anymore envelopes.
386
+ if self . handle_shutdown( & mut buffer, shutdown) . await {
387
+ break ;
388
+ }
389
+ }
347
390
_ = global_config_rx. changed( ) => {
348
- relay_log:: trace!( "EnvelopeBufferService received global config" ) ;
391
+ relay_log:: trace!( "EnvelopeBufferService: received global config" ) ;
349
392
self . sleep = Duration :: ZERO ; // Try to pop
350
393
}
351
394
else => break ,
@@ -354,7 +397,7 @@ impl Service for EnvelopeBufferService {
354
397
self . update_observable_state ( & mut buffer) ;
355
398
}
356
399
357
- relay_log:: info!( "EnvelopeBufferService stop " ) ;
400
+ relay_log:: info!( "EnvelopeBufferService: stopping " ) ;
358
401
} ) ;
359
402
}
360
403
}
0 commit comments