Skip to content

Commit 5199485

Browse files
[Compatibility] Added BZMPOP, BZPOPMAX and BZPOPMIN commands (#884)
* Added BZMPOP, BZPOPMAX and BZPOPIN commands * Fixed code format * Review command fix * Reusing Items property --------- Co-authored-by: Tal Zaccai <[email protected]>
1 parent 7483efc commit 5199485

File tree

14 files changed

+707
-34
lines changed

14 files changed

+707
-34
lines changed

libs/resources/RespCommandsDocs.json

+104
Original file line numberDiff line numberDiff line change
@@ -773,6 +773,110 @@
773773
}
774774
]
775775
},
776+
{
777+
"Command": "BZMPOP",
778+
"Name": "BZMPOP",
779+
"Summary": "Removes and returns a member by score from one or more sorted sets. Blocks until a member is available otherwise. Deletes the sorted set if the last element was popped.",
780+
"Group": "SortedSet",
781+
"Complexity": "O(K) \u002B O(M*log(N)) where K is the number of provided keys, N being the number of elements in the sorted set, and M being the number of elements popped.",
782+
"Arguments": [
783+
{
784+
"TypeDiscriminator": "RespCommandBasicArgument",
785+
"Name": "TIMEOUT",
786+
"DisplayText": "timeout",
787+
"Type": "Double"
788+
},
789+
{
790+
"TypeDiscriminator": "RespCommandBasicArgument",
791+
"Name": "NUMKEYS",
792+
"DisplayText": "numkeys",
793+
"Type": "Integer"
794+
},
795+
{
796+
"TypeDiscriminator": "RespCommandKeyArgument",
797+
"Name": "KEY",
798+
"DisplayText": "key",
799+
"Type": "Key",
800+
"ArgumentFlags": "Multiple",
801+
"KeySpecIndex": 0
802+
},
803+
{
804+
"TypeDiscriminator": "RespCommandContainerArgument",
805+
"Name": "WHERE",
806+
"Type": "OneOf",
807+
"Arguments": [
808+
{
809+
"TypeDiscriminator": "RespCommandBasicArgument",
810+
"Name": "MIN",
811+
"DisplayText": "min",
812+
"Type": "PureToken",
813+
"Token": "MIN"
814+
},
815+
{
816+
"TypeDiscriminator": "RespCommandBasicArgument",
817+
"Name": "MAX",
818+
"DisplayText": "max",
819+
"Type": "PureToken",
820+
"Token": "MAX"
821+
}
822+
]
823+
},
824+
{
825+
"TypeDiscriminator": "RespCommandBasicArgument",
826+
"Name": "COUNT",
827+
"DisplayText": "count",
828+
"Type": "Integer",
829+
"Token": "COUNT",
830+
"ArgumentFlags": "Optional"
831+
}
832+
]
833+
},
834+
{
835+
"Command": "BZPOPMAX",
836+
"Name": "BZPOPMAX",
837+
"Summary": "Removes and returns the member with the highest score from one or more sorted sets. Blocks until a member available otherwise. Deletes the sorted set if the last element was popped.",
838+
"Group": "SortedSet",
839+
"Complexity": "O(log(N)) with N being the number of elements in the sorted set.",
840+
"Arguments": [
841+
{
842+
"TypeDiscriminator": "RespCommandKeyArgument",
843+
"Name": "KEY",
844+
"DisplayText": "key",
845+
"Type": "Key",
846+
"ArgumentFlags": "Multiple",
847+
"KeySpecIndex": 0
848+
},
849+
{
850+
"TypeDiscriminator": "RespCommandBasicArgument",
851+
"Name": "TIMEOUT",
852+
"DisplayText": "timeout",
853+
"Type": "Double"
854+
}
855+
]
856+
},
857+
{
858+
"Command": "BZPOPMIN",
859+
"Name": "BZPOPMIN",
860+
"Summary": "Removes and returns the member with the lowest score from one or more sorted sets. Blocks until a member is available otherwise. Deletes the sorted set if the last element was popped.",
861+
"Group": "SortedSet",
862+
"Complexity": "O(log(N)) with N being the number of elements in the sorted set.",
863+
"Arguments": [
864+
{
865+
"TypeDiscriminator": "RespCommandKeyArgument",
866+
"Name": "KEY",
867+
"DisplayText": "key",
868+
"Type": "Key",
869+
"ArgumentFlags": "Multiple",
870+
"KeySpecIndex": 0
871+
},
872+
{
873+
"TypeDiscriminator": "RespCommandBasicArgument",
874+
"Name": "TIMEOUT",
875+
"DisplayText": "timeout",
876+
"Type": "Double"
877+
}
878+
]
879+
},
776880
{
777881
"Command": "CLIENT",
778882
"Name": "CLIENT",

libs/resources/RespCommandsInfo.json

+72
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,78 @@
415415
}
416416
]
417417
},
418+
{
419+
"Command": "BZMPOP",
420+
"Name": "BZMPOP",
421+
"Arity": -5,
422+
"Flags": "Blocking, MovableKeys, Write",
423+
"AclCategories": "Blocking, SortedSet, Slow, Write",
424+
"KeySpecifications": [
425+
{
426+
"BeginSearch": {
427+
"TypeDiscriminator": "BeginSearchIndex",
428+
"Index": 2
429+
},
430+
"FindKeys": {
431+
"TypeDiscriminator": "FindKeysKeyNum",
432+
"KeyNumIdx": 0,
433+
"FirstKey": 1,
434+
"KeyStep": 1
435+
},
436+
"Flags": "RW, Access, Delete"
437+
}
438+
]
439+
},
440+
{
441+
"Command": "BZPOPMAX",
442+
"Name": "BZPOPMAX",
443+
"Arity": -3,
444+
"Flags": "Blocking, Fast, Write",
445+
"FirstKey": 1,
446+
"LastKey": -2,
447+
"Step": 1,
448+
"AclCategories": "Blocking, Fast, SortedSet, Write",
449+
"KeySpecifications": [
450+
{
451+
"BeginSearch": {
452+
"TypeDiscriminator": "BeginSearchIndex",
453+
"Index": 1
454+
},
455+
"FindKeys": {
456+
"TypeDiscriminator": "FindKeysRange",
457+
"LastKey": -2,
458+
"KeyStep": 1,
459+
"Limit": 0
460+
},
461+
"Flags": "RW, Access, Delete"
462+
}
463+
]
464+
},
465+
{
466+
"Command": "BZPOPMIN",
467+
"Name": "BZPOPMIN",
468+
"Arity": -3,
469+
"Flags": "Blocking, Fast, Write",
470+
"FirstKey": 1,
471+
"LastKey": -2,
472+
"Step": 1,
473+
"AclCategories": "Blocking, Fast, SortedSet, Write",
474+
"KeySpecifications": [
475+
{
476+
"BeginSearch": {
477+
"TypeDiscriminator": "BeginSearchIndex",
478+
"Index": 1
479+
},
480+
"FindKeys": {
481+
"TypeDiscriminator": "FindKeysRange",
482+
"LastKey": -2,
483+
"KeyStep": 1,
484+
"Limit": 0
485+
},
486+
"Flags": "RW, Access, Delete"
487+
}
488+
]
489+
},
418490
{
419491
"Command": "CLIENT",
420492
"Name": "CLIENT",

libs/server/Objects/ItemBroker/CollectionItemBroker.cs

+36-23
Original file line numberDiff line numberDiff line change
@@ -383,37 +383,47 @@ private static bool TryMoveNextListItem(ListObject srcListObj, ListObject dstLis
383383
}
384384

385385
/// <summary>
386-
/// Try to get next available item from sorted set object
386+
/// Try to get next available item from sorted set object based on command type
387+
/// BZPOPMIN and BZPOPMAX share same implementation since Dictionary.First() and Last()
388+
/// handle the ordering automatically based on sorted set scores
387389
/// </summary>
388-
/// <param name="sortedSetObj">Sorted set object</param>
389-
/// <param name="command">RESP command</param>
390-
/// <param name="nextItem">Item retrieved</param>
391-
/// <returns>True if found available item</returns>
392-
private static bool TryGetNextSetObject(SortedSetObject sortedSetObj, RespCommand command, out byte[] nextItem)
390+
private static unsafe bool TryGetNextSetObjects(byte[] key, SortedSetObject sortedSetObj, RespCommand command, ArgSlice[] cmdArgs, out CollectionItemResult result)
393391
{
394-
nextItem = default;
392+
result = default;
395393

396-
// If object has no items, return
397394
if (sortedSetObj.Dictionary.Count == 0) return false;
398395

399-
// Get the next object according to operation type
400396
switch (command)
401397
{
398+
case RespCommand.BZPOPMIN:
399+
case RespCommand.BZPOPMAX:
400+
var element = sortedSetObj.PopMinOrMax(command == RespCommand.BZPOPMAX);
401+
result = new CollectionItemResult(key, element.Score, element.Element);
402+
return true;
403+
404+
case RespCommand.BZMPOP:
405+
var lowScoresFirst = *(bool*)cmdArgs[0].ptr;
406+
var popCount = *(int*)cmdArgs[1].ptr;
407+
popCount = Math.Min(popCount, sortedSetObj.Dictionary.Count);
408+
409+
var scores = new double[popCount];
410+
var items = new byte[popCount][];
411+
412+
for (int i = 0; i < popCount; i++)
413+
{
414+
var popResult = sortedSetObj.PopMinOrMax(!lowScoresFirst);
415+
scores[i] = popResult.Score;
416+
items[i] = popResult.Element;
417+
}
418+
419+
result = new CollectionItemResult(key, scores, items);
420+
return true;
421+
402422
default:
403423
return false;
404424
}
405425
}
406426

407-
/// <summary>
408-
/// Try to get next available item from object
409-
/// </summary>
410-
/// <param name="key">Key of object</param>
411-
/// <param name="storageSession">Current storage session</param>
412-
/// <param name="command">RESP command</param>
413-
/// <param name="cmdArgs">Additional command arguments</param>
414-
/// <param name="currCount">Collection size</param>
415-
/// <param name="result">Result of command</param>
416-
/// <returns>True if found available item</returns>
417427
private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, RespCommand command, ArgSlice[] cmdArgs, out int currCount, out CollectionItemResult result)
418428
{
419429
currCount = default;
@@ -423,6 +433,7 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp
423433
var objectType = command switch
424434
{
425435
RespCommand.BLPOP or RespCommand.BRPOP or RespCommand.BLMOVE or RespCommand.BLMPOP => GarnetObjectType.List,
436+
RespCommand.BZPOPMIN or RespCommand.BZPOPMAX or RespCommand.BZMPOP => GarnetObjectType.SortedSet,
426437
_ => throw new NotSupportedException()
427438
};
428439

@@ -524,11 +535,13 @@ private unsafe bool TryGetResult(byte[] key, StorageSession storageSession, Resp
524535
}
525536
case SortedSetObject setObj:
526537
currCount = setObj.Dictionary.Count;
527-
if (objectType != GarnetObjectType.SortedSet) return false;
538+
if (objectType != GarnetObjectType.SortedSet)
539+
return false;
540+
if (currCount == 0)
541+
return false;
542+
543+
return TryGetNextSetObjects(key, setObj, command, cmdArgs, out result);
528544

529-
var hasValue = TryGetNextSetObject(setObj, command, out var sortedSetNextItem);
530-
result = new CollectionItemResult(key, sortedSetNextItem);
531-
return hasValue;
532545
default:
533546
return false;
534547
}

libs/server/Objects/ItemBroker/CollectionItemResult.cs

+24
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,20 @@ public CollectionItemResult(byte[] key, byte[][] items)
2020
Items = items;
2121
}
2222

23+
public CollectionItemResult(byte[] key, double score, byte[] item)
24+
{
25+
Key = key;
26+
Score = score;
27+
Item = item;
28+
}
29+
30+
public CollectionItemResult(byte[] key, double[] scores, byte[][] items)
31+
{
32+
Key = key;
33+
Scores = scores;
34+
Items = items;
35+
}
36+
2337
/// <summary>
2438
/// True if item was found
2539
/// </summary>
@@ -35,11 +49,21 @@ public CollectionItemResult(byte[] key, byte[][] items)
3549
/// </summary>
3650
internal byte[] Item { get; }
3751

52+
/// <summary>
53+
/// Score associated with the item retrieved from the collection
54+
/// </summary>
55+
internal double Score { get; }
56+
3857
/// <summary>
3958
/// Item retrieved from collection
4059
/// </summary>
4160
internal byte[][] Items { get; }
4261

62+
/// <summary>
63+
/// Scores associated with the items retrieved from the collection
64+
/// </summary>
65+
internal double[] Scores { get; }
66+
4367
/// <summary>
4468
/// Instance of empty result
4569
/// </summary>

libs/server/Objects/SortedSet/SortedSetObjectImpl.cs

+18
Original file line numberDiff line numberDiff line change
@@ -886,6 +886,24 @@ private void SortedSetRank(ref ObjectInput input, ref SpanByteAndMemory output,
886886
}
887887
}
888888

889+
/// <summary>
890+
/// Removes and returns the element with the highest or lowest score from the sorted set.
891+
/// </summary>
892+
/// <param name="popMaxScoreElement">If true, pops the element with the highest score; otherwise, pops the element with the lowest score.</param>
893+
/// <returns>A tuple containing the score and the element as a byte array.</returns>
894+
public (double Score, byte[] Element) PopMinOrMax(bool popMaxScoreElement = false)
895+
{
896+
if (sortedSet.Count == 0)
897+
return default;
898+
899+
var element = popMaxScoreElement ? sortedSet.Max : sortedSet.Min;
900+
sortedSet.Remove(element);
901+
sortedSetDict.Remove(element.Element);
902+
this.UpdateSize(element.Element, false);
903+
904+
return element;
905+
}
906+
889907
/// <summary>
890908
/// Removes and returns up to COUNT members with the low or high score
891909
/// </summary>

0 commit comments

Comments
 (0)