Skip to content

Commit

Permalink
fix: make async webhooks fully async
Browse files Browse the repository at this point in the history
  • Loading branch information
zepatrik committed Feb 16, 2023
1 parent bb12fe7 commit 950a51d
Showing 1 changed file with 62 additions and 61 deletions.
123 changes: 62 additions & 61 deletions selfservice/hook/web_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,63 +266,74 @@ func (e *WebHook) ExecuteSettingsPrePersistHook(_ http.ResponseWriter, req *http
}

func (e *WebHook) execute(ctx context.Context, data *templateContext) error {
builder, err := request.NewBuilder(e.conf, e.deps)
if err != nil {
return err
}

req, err := builder.BuildRequest(ctx, data)
if errors.Is(err, request.ErrCancel) {
return nil
} else if err != nil {
return err
}

attrs := semconv.HTTPClientAttributesFromHTTPRequest(req.Request)
if data.Identity != nil {
attrs = append(attrs,
attribute.String("webhook.identity.id", data.Identity.ID.String()),
attribute.String("webhook.identity.nid", data.Identity.NID.String()),
)
}

var (
httpClient = e.deps.HTTPClient(ctx)
ignoreResponse = gjson.GetBytes(e.conf, "response.ignore").Bool()
canInterrupt = gjson.GetBytes(e.conf, "can_interrupt").Bool()
parseResponse = gjson.GetBytes(e.conf, "response.parse").Bool()
tracer = trace.SpanFromContext(ctx).TracerProvider().Tracer("kratos-webhooks")
spanOpts = []trace.SpanStartOption{trace.WithAttributes(attrs...)}
errChan = make(chan error, 1)
)

if ignoreResponse && (parseResponse || canInterrupt) {
return errors.WithStack(herodot.ErrInternalServerError.WithReasonf("A webhook is configured to ignore the response but also to parse the response. This is not possible."))
}

ctx, span := tracer.Start(ctx, "selfservice.webhook", spanOpts...)
e.deps.Logger().WithRequest(req.Request).Info("Dispatching webhook")

req = req.WithContext(ctx)
if ignoreResponse {
// This is one of the few places where spawning a context.Background() is ok. We need to do this
// because the function runs asynchronously and we don't want to cancel the request if the
// incoming request context is cancelled.
//
// The webhook will still cancel after 30 seconds as that is the configured timeout for the HTTP client.
req = req.WithContext(trace.ContextWithSpan(context.Background(), span))
}
makeRequest := func() (finalErr error) {
if ignoreResponse {
// This is one of the few places where spawning a context.Background() is ok. We need to do this
// because the function runs asynchronously and we don't want to cancel the request if the
// incoming request context is cancelled.
//
// The webhook will still cancel after 30 seconds as that is the configured timeout for the HTTP client.
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), httpClient.HTTPClient.Timeout)
defer cancel()
}
ctx, span := tracer.Start(ctx, "selfservice.webhook")
startTime := time.Now()

defer func() {
traceID, spanID := span.SpanContext().TraceID(), span.SpanContext().SpanID()
logger := e.deps.Logger().WithField("otel", map[string]string{
"trace_id": traceID.String(),
"span_id": spanID.String(),
})
if finalErr != nil {
logger.WithField("duration", time.Since(startTime)).WithError(finalErr).Warning("Webhook request failed but the error was ignored because the configuration indicated that the upstream response should be ignored.")
} else {
logger.WithField("duration", time.Since(startTime)).Info("Webhook request succeeded")
}
}()

builder, err := request.NewBuilder(e.conf, e.deps)
if err != nil {
return err
}

req, err := builder.BuildRequest(ctx, data)
if errors.Is(err, request.ErrCancel) {
return nil
} else if err != nil {
return err
}

span.SetAttributes(semconv.HTTPClientAttributesFromHTTPRequest(req.Request)...)
if data.Identity != nil {
span.SetAttributes(
attribute.String("webhook.identity.id", data.Identity.ID.String()),
attribute.String("webhook.identity.nid", data.Identity.NID.String()),
)
}

e.deps.Logger().WithRequest(req.Request).Info("Dispatching webhook")

req = req.WithContext(ctx)

startTime := time.Now()
go func() {
defer close(errChan)
defer span.End()

resp, err := httpClient.Do(req)
if err != nil {
span.SetStatus(codes.Error, err.Error())
errChan <- errors.WithStack(err)
return
return errors.WithStack(err)
}
defer resp.Body.Close()
span.SetAttributes(semconv.HTTPAttributesFromHTTPStatusCode(resp.StatusCode)...)
Expand All @@ -332,40 +343,30 @@ func (e *WebHook) execute(ctx context.Context, data *templateContext) error {
if canInterrupt || parseResponse {
if err := parseWebhookResponse(resp, data.Identity); err != nil {
span.SetStatus(codes.Error, err.Error())
errChan <- err
return err
}
}
errChan <- fmt.Errorf("webhook failed with status code %v", resp.StatusCode)
return
return fmt.Errorf("webhook failed with status code %v", resp.StatusCode)
}

if parseResponse {
if err := parseWebhookResponse(resp, data.Identity); err != nil {
span.SetStatus(codes.Error, err.Error())
errChan <- err
return err
}
}

errChan <- nil
}()

if ignoreResponse {
traceID, spanID := span.SpanContext().TraceID(), span.SpanContext().SpanID()
logger := e.deps.Logger().WithField("otel", map[string]string{
"trace_id": traceID.String(),
"span_id": spanID.String(),
})
go func() {
if err := <-errChan; err != nil {
logger.WithField("duration", time.Since(startTime)).WithError(err).Warning("Webhook request failed but the error was ignored because the configuration indicated that the upstream response should be ignored.")
} else {
logger.WithField("duration", time.Since(startTime)).Info("Webhook request succeeded")
}
}()
return nil
}

return <-errChan
if !ignoreResponse {
return makeRequest()
}
go func() {
// we cannot handle the error as we are running async, and it is logged anyway
_ = makeRequest()
}()
return nil
}

func parseWebhookResponse(resp *http.Response, id *identity.Identity) (err error) {
Expand Down

0 comments on commit 950a51d

Please sign in to comment.