1
1
import json
2
2
import logging
3
+ import typing
4
+ from datetime import datetime
3
5
from json import JSONDecodeError
4
6
7
+ import requests
5
8
from celery .utils .log import get_task_logger
6
9
from django .conf import settings
7
10
from django .db .models import Prefetch
28
31
logger .setLevel (logging .DEBUG )
29
32
30
33
34
+ EXECUTE_WEBHOOK_RETRIES = 3
35
+ # these exceptions are fully out of our control (e.g. customer's network issues)
36
+ # let's manually retry them without raising an exception
37
+ EXECUTE_WEBHOOK_EXCEPTIONS_TO_MANUALLY_RETRY = (requests .exceptions .Timeout ,)
31
38
TRIGGER_TYPE_TO_LABEL = {
32
39
Webhook .TRIGGER_ALERT_GROUP_CREATED : "alert group created" ,
33
40
Webhook .TRIGGER_ACKNOWLEDGE : "acknowledge" ,
40
47
}
41
48
42
49
50
+ class WebhookRequestStatus (typing .TypedDict ):
51
+ url : typing .Optional [str ]
52
+ request_trigger : typing .Optional [str ]
53
+ request_headers : typing .Optional [str ]
54
+ request_data : typing .Optional [str ]
55
+ status_code : typing .Optional [int ]
56
+ content : typing .Optional [str ]
57
+ webhook : Webhook
58
+ event_data : str
59
+
60
+
43
61
@shared_dedicated_queue_retry_task (
44
62
autoretry_for = (Exception ,), retry_backoff = True , max_retries = 1 if settings .DEBUG else None
45
63
)
@@ -52,15 +70,14 @@ def send_webhook_event(trigger_type, alert_group_id, organization_id=None, user_
52
70
).exclude (is_webhook_enabled = False )
53
71
54
72
for webhook in webhooks_qs :
55
- print (webhook .name )
56
73
execute_webhook .apply_async ((webhook .pk , alert_group_id , user_id , None ))
57
74
58
75
59
- def _isoformat_date (date_value ) :
76
+ def _isoformat_date (date_value : datetime ) -> typing . Optional [ str ] :
60
77
return date_value .isoformat () if date_value else None
61
78
62
79
63
- def _build_payload (webhook , alert_group , user ) :
80
+ def _build_payload (webhook : Webhook , alert_group : AlertGroup , user : User ) -> typing . Dict [ str , typing . Any ] :
64
81
trigger_type = webhook .trigger_type
65
82
event = {
66
83
"type" : TRIGGER_TYPE_TO_LABEL [trigger_type ],
@@ -96,7 +113,9 @@ def _build_payload(webhook, alert_group, user):
96
113
return data
97
114
98
115
99
- def mask_authorization_header (headers , header_keys_to_mask ):
116
+ def mask_authorization_header (
117
+ headers : typing .Dict [str , str ], header_keys_to_mask : typing .List [str ]
118
+ ) -> typing .Dict [str , str ]:
100
119
masked_headers = headers .copy ()
101
120
lower_keys = set (k .lower () for k in header_keys_to_mask )
102
121
for k in headers .keys ():
@@ -105,8 +124,10 @@ def mask_authorization_header(headers, header_keys_to_mask):
105
124
return masked_headers
106
125
107
126
108
- def make_request (webhook , alert_group , data ):
109
- status = {
127
+ def make_request (
128
+ webhook : Webhook , alert_group : AlertGroup , data : typing .Dict [str , typing .Any ]
129
+ ) -> typing .Tuple [bool , WebhookRequestStatus , typing .Optional [str ], typing .Optional [Exception ]]:
130
+ status : WebhookRequestStatus = {
110
131
"url" : None ,
111
132
"request_trigger" : None ,
112
133
"request_headers" : None ,
@@ -172,9 +193,9 @@ def make_request(webhook, alert_group, data):
172
193
173
194
174
195
@shared_dedicated_queue_retry_task (
175
- autoretry_for = (Exception ,), retry_backoff = True , max_retries = 1 if settings .DEBUG else 3
196
+ autoretry_for = (Exception ,), retry_backoff = True , max_retries = 1 if settings .DEBUG else EXECUTE_WEBHOOK_RETRIES
176
197
)
177
- def execute_webhook (webhook_pk , alert_group_id , user_id , escalation_policy_id ):
198
+ def execute_webhook (webhook_pk , alert_group_id , user_id , escalation_policy_id , manual_retry_num = 0 ):
178
199
from apps .webhooks .models import Webhook
179
200
180
201
try :
@@ -244,5 +265,21 @@ def execute_webhook(webhook_pk, alert_group_id, user_id, escalation_policy_id):
244
265
escalation_error_code = error_code ,
245
266
)
246
267
247
- if exception :
268
+ if isinstance (exception , EXECUTE_WEBHOOK_EXCEPTIONS_TO_MANUALLY_RETRY ):
269
+ msg_details = (
270
+ f"webhook={ webhook_pk } alert_group={ alert_group_id } user={ user_id } escalation_policy={ escalation_policy_id } "
271
+ )
272
+
273
+ if manual_retry_num < EXECUTE_WEBHOOK_RETRIES :
274
+ retry_num = manual_retry_num + 1
275
+ logger .warning (f"Manually retrying execute_webhook for { msg_details } manual_retry_num={ retry_num } " )
276
+ execute_webhook .apply_async (
277
+ (webhook_pk , alert_group_id , user_id , escalation_policy_id , retry_num ),
278
+ countdown = 10 ,
279
+ )
280
+ else :
281
+ # don't raise an exception if we've exhausted retries for
282
+ # exceptions within EXECUTE_WEBHOOK_EXCEPTIONS_TO_MANUALLY_RETRY, simply give up trying
283
+ logger .warning (f"Exhausted execute_webhook retries for { msg_details } " )
284
+ elif exception :
248
285
raise exception
0 commit comments