Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a callback when an Ack message is received from the client #833

Merged
merged 3 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/MagicOnion.Server/Diagnostics/MagicOnionMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void StreamingHubMethodCompleted(in MetricsContext context, StreamingHubH
var tags = InitializeTagListForStreamingHub(handler.HubName);
tags.Add("rpc.method", handler.MethodInfo.Name);
tags.Add("magiconion.streaminghub.is_error", isErrorOrInterrupted ? BoxedTrue : BoxedFalse);
streamingHubMethodDuration.Record((long)StopwatchHelper.GetElapsedTime(startingTimestamp, endingTimestamp).TotalMilliseconds, tags);
streamingHubMethodDuration.Record((long)TimeProvider.System.GetElapsedTime(startingTimestamp, endingTimestamp).TotalMilliseconds, tags);
streamingHubMethodCompletedCounter.Add(1, tags);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ public interface IMagicOnionHeartbeatFeature
/// </summary>
CancellationToken TimeoutToken { get; }

/// <summary>
/// Sets the callback action to be performed when an Ack message is received from the client.
/// </summary>
/// <param name="callbackAction"></param>
void SetAckCallback(Action<TimeSpan>? callbackAction);

/// <summary>
/// Unregister the current StreamingHub connection from the HeartbeatManager.
/// </summary>
Expand All @@ -37,4 +43,6 @@ internal sealed class MagicOnionHeartbeatFeature(StreamingHubHeartbeatHandle han
public CancellationToken TimeoutToken => handle.TimeoutToken;

public void Unregister() => handle.Unregister();

public void SetAckCallback(Action<TimeSpan>? callbackAction) => handle.SetAckCallback(callbackAction);
}
19 changes: 11 additions & 8 deletions src/MagicOnion.Server/Hubs/StreamingHub.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
using System.Buffers;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Cysharp.Runtime.Multicast;
using Cysharp.Runtime.Multicast.Remoting;
using Grpc.Core;
using MagicOnion.Internal;
Expand All @@ -23,6 +21,7 @@ public abstract class StreamingHubBase<THubInterface, TReceiver> : ServiceBase<T
{
IRemoteClientResultPendingTaskRegistry remoteClientResultPendingTasks = default!;
StreamingHubHeartbeatHandle heartbeatHandle = default!;
TimeProvider timeProvider = default!;

protected static readonly Task<Nil> NilTask = Task.FromResult(Nil.Default);
protected static readonly ValueTask CompletedTask = new ValueTask();
Expand Down Expand Up @@ -88,10 +87,11 @@ internal async Task<DuplexStreamingResult<StreamingHubPayload, StreamingHubPaylo

var features = this.Context.CallContext.GetHttpContext().Features;
var magicOnionOptions = serviceProvider.GetRequiredService<IOptions<MagicOnionOptions>>().Value;
timeProvider = magicOnionOptions.TimeProvider ?? TimeProvider.System;

var remoteProxyFactory = serviceProvider.GetRequiredService<IRemoteProxyFactory>();
var remoteSerializer = serviceProvider.GetRequiredService<IRemoteSerializer>();
this.remoteClientResultPendingTasks = new RemoteClientResultPendingTaskRegistry(magicOnionOptions.ClientResultsDefaultTimeout, magicOnionOptions.TimeProvider ?? TimeProvider.System);
this.remoteClientResultPendingTasks = new RemoteClientResultPendingTaskRegistry(magicOnionOptions.ClientResultsDefaultTimeout, timeProvider);
this.Client = remoteProxyFactory.CreateDirect<TReceiver>(new MagicOnionRemoteReceiverWriter(StreamingServiceContext), remoteSerializer, remoteClientResultPendingTasks);

var handlerRepository = serviceProvider.GetRequiredService<StreamingHubHandlerRepository>();
Expand Down Expand Up @@ -134,10 +134,13 @@ internal async Task<DuplexStreamingResult<StreamingHubPayload, StreamingHubPaylo
{
Metrics.StreamingHubConnectionDecrement(Context.Metrics, Context.MethodHandler.ServiceName);

heartbeatHandle.Dispose();
StreamingServiceContext.CompleteStreamingHub();
heartbeatHandle.Unregister(); // NOTE: To be able to use CancellationToken within OnDisconnected event, separate the calls to Dispose and Unregister.

await OnDisconnected();

await this.Group.DisposeAsync();
heartbeatHandle.Dispose();
remoteClientResultPendingTasks.Dispose();
}

Expand Down Expand Up @@ -267,10 +270,10 @@ async ValueTask ProcessRequestAsync(UniqueHashDictionary<StreamingHubHandler> ha
hubInstance: this,
request: body,
messageId: messageId,
timestamp: DateTime.UtcNow
timestamp: timeProvider.GetUtcNow().UtcDateTime
);

var methodStartingTimestamp = Stopwatch.GetTimestamp();
var methodStartingTimestamp = timeProvider.GetTimestamp();
var isErrorOrInterrupted = false;
MagicOnionServerLog.BeginInvokeHubMethod(Context.MethodHandler.Logger, context, context.Request, handler.RequestType);
try
Expand All @@ -297,8 +300,8 @@ async ValueTask ProcessRequestAsync(UniqueHashDictionary<StreamingHubHandler> ha
}
finally
{
var methodEndingTimestamp = Stopwatch.GetTimestamp();
MagicOnionServerLog.EndInvokeHubMethod(Context.MethodHandler.Logger, context, context.ResponseSize, context.ResponseType, StopwatchHelper.GetElapsedTime(methodStartingTimestamp, methodEndingTimestamp).TotalMilliseconds, isErrorOrInterrupted);
var methodEndingTimestamp = timeProvider.GetTimestamp();
MagicOnionServerLog.EndInvokeHubMethod(Context.MethodHandler.Logger, context, context.ResponseSize, context.ResponseType, timeProvider.GetElapsedTime(methodStartingTimestamp, methodEndingTimestamp).TotalMilliseconds, isErrorOrInterrupted);
Metrics.StreamingHubMethodCompleted(Context.Metrics, handler, methodStartingTimestamp, methodEndingTimestamp, isErrorOrInterrupted);

StreamingHubContextPool.Shared.Return(context);
Expand Down
65 changes: 48 additions & 17 deletions src/MagicOnion.Server/Hubs/StreamingHubHeartbeatManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Diagnostics;
using MagicOnion.Internal;
using MagicOnion.Server.Diagnostics;
using MagicOnion.Server.Internal;
using Microsoft.Extensions.Logging;

namespace MagicOnion.Server.Hubs;
Expand All @@ -17,26 +18,27 @@ internal interface IStreamingHubHeartbeatManager : IDisposable

internal class StreamingHubHeartbeatHandle : IDisposable
{
readonly object gate = new();
readonly IStreamingHubHeartbeatManager manager;
readonly CancellationTokenSource timeoutToken;
readonly TimeSpan timeoutDuration;
bool disposed;
short waitingSequence;
bool unregistered;
short waitingSequence = -1;
bool timeoutTimerIsRunning;
DateTimeOffset lastSentAt;
DateTimeOffset lastReceivedAt;
long lastSentAtTimestamp;
Action<TimeSpan>? onAckCallback;

/// <summary>
/// Gets the last received time.
/// </summary>
public DateTimeOffset LastReceivedAt => lastReceivedAt;
public DateTimeOffset LastReceivedAt { get; private set; }

/// <summary>
/// Gets the latency between client and server. Returns <see cref="TimeSpan.Zero"/> if not sent or received.
/// </summary>
public TimeSpan Latency => (lastSentAt == default || lastReceivedAt == default)
? TimeSpan.Zero
: lastReceivedAt - lastSentAt;
public TimeSpan Latency { get; private set; }

public IStreamingServiceContext<StreamingHubPayload, StreamingHubPayload> ServiceContext { get; }
public CancellationToken TimeoutToken => timeoutToken.Token;
Expand All @@ -53,16 +55,21 @@ public StreamingHubHeartbeatHandle(IStreamingHubHeartbeatManager manager, IStrea
);
}

public void RestartTimeoutTimer(short sequence, DateTimeOffset sentAt)
public void RestartTimeoutTimer(short sequence, DateTimeOffset sentAt, long sentAtTimestamp)
{
if (disposed || timeoutDuration == Timeout.InfiniteTimeSpan) return;
waitingSequence = sequence;
lastSentAt = sentAt;

if (!timeoutTimerIsRunning)
lock (gate)
{
timeoutToken.CancelAfter(timeoutDuration);
timeoutTimerIsRunning = true;
waitingSequence = sequence;
lastSentAt = sentAt;
lastSentAtTimestamp = sentAtTimestamp;

if (!timeoutTimerIsRunning)
{
timeoutToken.CancelAfter(timeoutDuration);
timeoutTimerIsRunning = true;
}
}
}

Expand All @@ -71,23 +78,46 @@ public void Ack(short sequence)
if (disposed || timeoutDuration == Timeout.InfiniteTimeSpan) return;

if (waitingSequence != sequence) return;
lastReceivedAt = manager.TimeProvider.GetUtcNow();
timeoutToken.CancelAfter(Timeout.InfiniteTimeSpan);
timeoutTimerIsRunning = false;

lock (gate)
{
var receivedAtTimestamp = manager.TimeProvider.GetTimestamp();
var elapsed = manager.TimeProvider.GetElapsedTime(lastSentAtTimestamp, receivedAtTimestamp);

LastReceivedAt = lastSentAt.Add(elapsed);
Latency = elapsed;
timeoutToken.CancelAfter(Timeout.InfiniteTimeSpan);
timeoutTimerIsRunning = false;

onAckCallback?.Invoke(Latency);
}
}

public void SetAckCallback(Action<TimeSpan>? callbackAction)
{
this.onAckCallback = callbackAction;
}

public void Unregister()
{
if (unregistered) return;

manager.Unregister(ServiceContext);
timeoutToken.CancelAfter(Timeout.InfiniteTimeSpan);
timeoutTimerIsRunning = false;
unregistered = true;
}

public void Dispose()
{
if (disposed) return;

disposed = true;
manager.Unregister(ServiceContext);
onAckCallback = null;
if (!unregistered)
{
manager.Unregister(ServiceContext);
}
timeoutToken.Dispose();
}
}
Expand Down Expand Up @@ -172,6 +202,7 @@ async Task StartHeartbeatAsync(PeriodicTimer runningTimer, string method)
while (await runningTimer.WaitForNextTickAsync())
{
var now = TimeProvider.GetUtcNow();
var timestamp = TimeProvider.GetTimestamp();
StreamingHubMessageWriter.WriteServerHeartbeatMessageHeader(writer, sequence, now);
if (!(heartbeatMetadataProvider?.TryWriteMetadata(writer) ?? false))
{
Expand All @@ -183,7 +214,7 @@ async Task StartHeartbeatAsync(PeriodicTimer runningTimer, string method)
{
foreach (var (contextId, handle) in contexts)
{
handle.RestartTimeoutTimer(sequence, now);
handle.RestartTimeoutTimer(sequence, now, timestamp);
handle.ServiceContext.QueueResponseStreamWrite(StreamingHubPayloadPool.Shared.RentOrCreate(writer.WrittenSpan));
}
}
Expand Down
20 changes: 0 additions & 20 deletions src/MagicOnion.Server/Internal/StopwatchHelper.cs

This file was deleted.

Loading
Loading