Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Compatibility] Added CLIENT UNBLOCK command #886

Merged
merged 19 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions libs/resources/RespCommandsDocs.json
Original file line number Diff line number Diff line change
Expand Up @@ -1132,6 +1132,43 @@
"Type": "String"
}
]
},
{
"Command": "CLIENT_UNBLOCK",
"Name": "CLIENT|UNBLOCK",
"Summary": "Unblocks a client blocked by a blocking command from a different connection.",
"Group": "Connection",
"Complexity": "O(log N) where N is the number of client connections",
"Arguments": [
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "CLIENT-ID",
"DisplayText": "client-id",
"Type": "Integer"
},
{
"TypeDiscriminator": "RespCommandContainerArgument",
"Name": "UNBLOCK-TYPE",
"Type": "OneOf",
"ArgumentFlags": "Optional",
"Arguments": [
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "TIMEOUT",
"DisplayText": "timeout",
"Type": "PureToken",
"Token": "TIMEOUT"
},
{
"TypeDiscriminator": "RespCommandBasicArgument",
"Name": "ERROR",
"DisplayText": "error",
"Type": "PureToken",
"Token": "ERROR"
}
]
}
]
}
]
},
Expand Down
7 changes: 7 additions & 0 deletions libs/resources/RespCommandsInfo.json
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,13 @@
"request_policy:all_nodes",
"response_policy:all_succeeded"
]
},
{
"Command": "CLIENT_UNBLOCK",
"Name": "CLIENT|UNBLOCK",
"Arity": -3,
"Flags": "Admin, Loading, NoScript, Stale",
"AclCategories": "Admin, Connection, Dangerous, Slow"
}
]
},
Expand Down
12 changes: 12 additions & 0 deletions libs/server/Objects/ItemBroker/CollectionItemBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ public class CollectionItemBroker : IDisposable
private bool disposed = false;
private bool isStarted = false;

/// <summary>
/// Tries to get the observer associated with the given session ID.
/// </summary>
/// <param name="sessionId">The ID of the session to retrieve the observer for.</param>
/// <param name="observer">When this method returns, contains the observer associated with the specified session ID, if the session ID is found; otherwise, null. This parameter is passed uninitialized.</param>
/// <returns>true if the observer is found; otherwise, false.</returns>
internal bool TryGetObserver(int sessionId, out CollectionItemObserver observer)
{
return SessionIdToObserver.TryGetValue(sessionId, out observer);
}

/// <summary>
/// Asynchronously wait for item from collection object
/// </summary>
Expand Down Expand Up @@ -125,6 +136,7 @@ private async Task<CollectionItemResult> GetCollectionItemAsync(CollectionItemOb
}
catch (OperationCanceledException)
{
// Session is disposed
}

SessionIdToObserver.TryRemove(observer.Session.ObjectStoreSessionID, out _);
Expand Down
25 changes: 25 additions & 0 deletions libs/server/Objects/ItemBroker/CollectionItemObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,31 @@ internal void HandleSetResult(CollectionItemResult result)
}
}

internal bool TryForceUnblock(bool throwError = false)
{
// If the result is already set or the observer session is disposed
// There is no need to set the result
if (Status != ObserverStatus.WaitingForResult)
return false;

ObserverStatusLock.EnterWriteLock();
try
{
if (Status != ObserverStatus.WaitingForResult)
return false;

// Set the result, update the status and release the semaphore
Result = throwError ? CollectionItemResult.ForceUnblocked : CollectionItemResult.Empty;
Status = ObserverStatus.ResultSet;
ResultFoundSemaphore.Release();
return true;
}
finally
{
ObserverStatusLock.ExitWriteLock();
}
}

/// <summary>
/// Safely set the status of the observer to reflect that its calling session was disposed
/// </summary>
Expand Down
15 changes: 15 additions & 0 deletions libs/server/Objects/ItemBroker/CollectionItemResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public CollectionItemResult(byte[] key, double[] scores, byte[][] items)
Items = items;
}

private CollectionItemResult(bool isForceUnblocked)
{
IsForceUnblocked = isForceUnblocked;
}

/// <summary>
/// True if item was found
/// </summary>
Expand Down Expand Up @@ -64,9 +69,19 @@ public CollectionItemResult(byte[] key, double[] scores, byte[][] items)
/// </summary>
internal double[] Scores { get; }

/// <summary>
/// Gets a value indicating whether the item retrieval was force unblocked.
/// </summary>
internal readonly bool IsForceUnblocked { get; }

/// <summary>
/// Instance of empty result
/// </summary>
internal static readonly CollectionItemResult Empty = new(null, item: null);

/// <summary>
/// Instance representing an Force Unblocked result.
/// </summary>
internal static readonly CollectionItemResult ForceUnblocked = new(true);
}
}
75 changes: 75 additions & 0 deletions libs/server/Resp/ClientCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -567,5 +567,80 @@ private bool NetworkCLIENTSETINFO()

return true;
}

/// <summary>
/// CLIENT UNBLOCK
/// </summary>
private bool NetworkCLIENTUNBLOCK()
{
if (parseState.Count is not (1 or 2))
{
return AbortWithWrongNumberOfArguments("client|unblock");
}

if (!parseState.TryGetLong(0, out var clientId))
{
return AbortWithErrorMessage(CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER);
}

var toThrowError = false;
if (parseState.Count == 2)
{
var option = parseState.GetArgSliceByRef(1);
if (option.Span.EqualsUpperCaseSpanIgnoringCase(CmdStrings.TIMEOUT))
{
toThrowError = false;
}
else if (option.Span.EqualsUpperCaseSpanIgnoringCase(CmdStrings.ERROR))
{
toThrowError = true;
}
else
{
return AbortWithErrorMessage(CmdStrings.RESP_ERR_INVALID_CLIENT_UNBLOCK_REASON);
}
}

if (Server is GarnetServerBase garnetServer)
{
var session = garnetServer.ActiveConsumers().OfType<RespServerSession>().FirstOrDefault(x => x.Id == clientId);

if (session is null)
{
while (!RespWriteUtils.TryWriteInt32(0, ref dcurr, dend))
SendAndReset();
return true;
}

if (session.storeWrapper?.itemBroker is not null)
{
var isBlocked = session.storeWrapper.itemBroker.TryGetObserver(session.ObjectStoreSessionID, out var observer);

if (!isBlocked)
{
while (!RespWriteUtils.TryWriteInt32(0, ref dcurr, dend))
SendAndReset();
return true;
}

var result = observer.TryForceUnblock(toThrowError);

while (!RespWriteUtils.TryWriteInt32(result ? 1 : 0, ref dcurr, dend))
SendAndReset();
}
else
{
while (!RespWriteUtils.TryWriteInt32(0, ref dcurr, dend))
SendAndReset();
}
}
else
{
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_UBLOCKING_CLINET, ref dcurr, dend))
SendAndReset();
}

return true;
}
}
}
6 changes: 6 additions & 0 deletions libs/server/Resp/CmdStrings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ static partial class CmdStrings
public static ReadOnlySpan<byte> GETIFNOTMATCH => "GETIFNOTMATCH"u8;
public static ReadOnlySpan<byte> SETIFMATCH => "SETIFMATCH"u8;
public static ReadOnlySpan<byte> FIELDS => "FIELDS"u8;
public static ReadOnlySpan<byte> TIMEOUT => "TIMEOUT"u8;
public static ReadOnlySpan<byte> ERROR => "ERROR"u8;

/// <summary>
/// Response strings
Expand Down Expand Up @@ -224,6 +226,7 @@ static partial class CmdStrings
public static ReadOnlySpan<byte> RESP_ERR_LIMIT_NOT_SUPPORTED => "ERR syntax error, LIMIT is only supported in combination with either BYSCORE or BYLEX"u8;
public static ReadOnlySpan<byte> RESP_ERR_NO_SCRIPT => "NOSCRIPT No matching script. Please use EVAL."u8;
public static ReadOnlySpan<byte> RESP_ERR_CANNOT_LIST_CLIENTS => "ERR Clients cannot be listed."u8;
public static ReadOnlySpan<byte> RESP_ERR_UBLOCKING_CLINET => "ERR Unable to unblock client because of error."u8;
public static ReadOnlySpan<byte> RESP_ERR_NO_SUCH_CLIENT => "ERR No such client"u8;
public static ReadOnlySpan<byte> RESP_ERR_INVALID_CLIENT_ID => "ERR Invalid client ID"u8;
public static ReadOnlySpan<byte> RESP_ERR_ACL_AUTH_DISABLED => "ERR ACL Authenticator is disabled."u8;
Expand All @@ -239,6 +242,8 @@ static partial class CmdStrings
public static ReadOnlySpan<byte> RESP_ERR_HCOLLECT_ALREADY_IN_PROGRESS => "ERR HCOLLECT scan already in progress"u8;
public static ReadOnlySpan<byte> RESP_INVALID_COMMAND_SPECIFIED => "Invalid command specified"u8;
public static ReadOnlySpan<byte> RESP_COMMAND_HAS_NO_KEY_ARGS => "The command has no key arguments"u8;
public static ReadOnlySpan<byte> RESP_ERR_INVALID_CLIENT_UNBLOCK_REASON => "ERR CLIENT UNBLOCK reason should be TIMEOUT or ERROR"u8;
public static ReadOnlySpan<byte> RESP_UNBLOCKED_CLIENT_VIA_CLIENT_UNBLOCK => "UNBLOCKED client unblocked via CLIENT UNBLOCK"u8;

/// <summary>
/// Response string templates
Expand Down Expand Up @@ -335,6 +340,7 @@ static partial class CmdStrings
public static ReadOnlySpan<byte> KILL => "KILL"u8;
public static ReadOnlySpan<byte> GETNAME => "GETNAME"u8;
public static ReadOnlySpan<byte> SETINFO => "SETINFO"u8;
public static ReadOnlySpan<byte> UNBLOCK => "UNBLOCK"u8;
public static ReadOnlySpan<byte> USER => "USER"u8;
public static ReadOnlySpan<byte> ADDR => "ADDR"u8;
public static ReadOnlySpan<byte> LADDR => "LADDR"u8;
Expand Down
13 changes: 13 additions & 0 deletions libs/server/Resp/Objects/ListCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,13 @@ private bool ListBlockingPop(RespCommand command)

var result = storeWrapper.itemBroker.GetCollectionItemAsync(command, keysBytes, this, timeout).Result;

if (result.IsForceUnblocked)
{
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_UNBLOCKED_CLIENT_VIA_CLIENT_UNBLOCK, ref dcurr, dend))
SendAndReset();
return true;
}

if (!result.Found)
{
while (!RespWriteUtils.TryWriteNullArray(ref dcurr, dend))
Expand Down Expand Up @@ -979,6 +986,12 @@ private unsafe bool ListBlockingPopMultiple()

var result = storeWrapper.itemBroker.GetCollectionItemAsync(RespCommand.BLMPOP, keysBytes, this, timeout, cmdArgs).Result;

if (result.IsForceUnblocked)
{
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_UNBLOCKED_CLIENT_VIA_CLIENT_UNBLOCK, ref dcurr, dend))
SendAndReset();
}

if (!result.Found)
{
while (!RespWriteUtils.TryWriteNull(ref dcurr, dend))
Expand Down
6 changes: 6 additions & 0 deletions libs/server/Resp/Parser/RespCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ public enum RespCommand : ushort
CLIENT_GETNAME,
CLIENT_SETNAME,
CLIENT_SETINFO,
CLIENT_UNBLOCK,

MONITOR,
MODULE,
Expand Down Expand Up @@ -391,6 +392,7 @@ public static class RespCommandExtensions
RespCommand.CLIENT_GETNAME,
RespCommand.CLIENT_SETNAME,
RespCommand.CLIENT_SETINFO,
RespCommand.CLIENT_UNBLOCK,
// Command
RespCommand.COMMAND,
RespCommand.COMMAND_COUNT,
Expand Down Expand Up @@ -1763,6 +1765,10 @@ private RespCommand SlowParseCommand(ref int count, ref ReadOnlySpan<byte> speci
{
return RespCommand.CLIENT_SETINFO;
}
else if (subCommand.SequenceEqual(CmdStrings.UNBLOCK))
{
return RespCommand.CLIENT_UNBLOCK;
}
}
}
else if (command.SequenceEqual(CmdStrings.AUTH))
Expand Down
1 change: 1 addition & 0 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,7 @@ private bool ProcessOtherCommands<TGarnetApi>(RespCommand command, ref TGarnetAp
RespCommand.CLIENT_GETNAME => NetworkCLIENTGETNAME(),
RespCommand.CLIENT_SETNAME => NetworkCLIENTSETNAME(),
RespCommand.CLIENT_SETINFO => NetworkCLIENTSETINFO(),
RespCommand.CLIENT_UNBLOCK => NetworkCLIENTUNBLOCK(),
RespCommand.COMMAND => NetworkCOMMAND(),
RespCommand.COMMAND_COUNT => NetworkCOMMAND_COUNT(),
RespCommand.COMMAND_DOCS => NetworkCOMMAND_DOCS(),
Expand Down
1 change: 1 addition & 0 deletions playground/CommandInfoUpdater/SupportedCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class SupportedCommand
new("CLIENT|GETNAME", RespCommand.CLIENT_GETNAME),
new("CLIENT|SETNAME", RespCommand.CLIENT_SETNAME),
new("CLIENT|SETINFO", RespCommand.CLIENT_SETINFO),
new("CLIENT|UNBLOCK", RespCommand.CLIENT_UNBLOCK),
]),
new("CLUSTER", RespCommand.CLUSTER,
[
Expand Down
15 changes: 15 additions & 0 deletions test/Garnet.test/Resp/ACL/RespCommandTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,21 @@ static async Task DoClientSetInfoAsync(GarnetClient client)
}
}

[Test]
public async Task ClientUnblockACLsAsync()
{
await CheckCommandsAsync(
"CLIENT UNBLOCK",
[DoClientUnblockAsync]
);

static async Task DoClientUnblockAsync(GarnetClient client)
{
var count = await client.ExecuteForLongResultAsync("CLIENT", ["UNBLOCK", "123"]);
ClassicAssert.AreEqual(0, count);
}
}

[Test]
public async Task ClusterAddSlotsACLsAsync()
{
Expand Down
1 change: 1 addition & 0 deletions test/Garnet.test/RespCommandTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,7 @@ public void AofIndependentCommandsTest()
RespCommand.CLIENT_GETNAME,
RespCommand.CLIENT_SETNAME,
RespCommand.CLIENT_SETINFO,
RespCommand.CLIENT_UNBLOCK,
// Command
RespCommand.COMMAND,
RespCommand.COMMAND_COUNT,
Expand Down
Loading
Loading