Skip to content

Commit e9e43ee

Browse files
badrishcTalZaccai
andauthored
Fix debug assert in streaming snapshot checkpoint (#1024)
* Fix debug assert in streaming snapshot checkpoint * more fix * updates * nit --------- Co-authored-by: Tal Zaccai <[email protected]>
1 parent c14e728 commit e9e43ee

File tree

1 file changed

+23
-12
lines changed

1 file changed

+23
-12
lines changed

libs/storage/Tsavorite/cs/src/core/Index/Synchronization/StreamingSnapshotTsavoriteKV.cs

+23-12
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
// Licensed under the MIT license.
33

44
using System;
5-
using System.Diagnostics;
5+
using System.Threading;
6+
using Microsoft.Extensions.Logging;
67

78
namespace Tsavorite.core
89
{
@@ -65,8 +66,6 @@ internal void StreamingSnapshotScanPhase1()
6566
{
6667
try
6768
{
68-
Debug.Assert(systemState.Phase == Phase.PREP_STREAMING_SNAPSHOT_CHECKPOINT);
69-
7069
// Iterate all the read-only records in the store
7170
scannedUntilAddressCursor = Log.SafeReadOnlyAddress;
7271
var scanFunctions = new ScanPhase1Functions(streamingSnapshotIteratorFunctions, _hybridLogCheckpointToken, _hybridLogCheckpoint.info.version, _hybridLogCheckpoint.info.nextVersion);
@@ -75,9 +74,17 @@ internal void StreamingSnapshotScanPhase1()
7574
_ = s.ScanCursor(ref cursor, long.MaxValue, scanFunctions, scannedUntilAddressCursor);
7675
this.numberOfRecords = scanFunctions.numberOfRecords;
7776
}
77+
catch (Exception e)
78+
{
79+
logger?.LogError(e, "Exception in StreamingSnapshotScanPhase1");
80+
throw;
81+
}
7882
finally
7983
{
80-
Debug.Assert(systemState.Phase == Phase.PREP_STREAMING_SNAPSHOT_CHECKPOINT);
84+
// We started this task before entering PREP_STREAMING_SNAPSHOT_CHECKPOINT, so we
85+
// need to wait until the state machine is in PREP_STREAMING_SNAPSHOT_CHECKPOINT
86+
while (systemState.Phase != Phase.PREP_STREAMING_SNAPSHOT_CHECKPOINT)
87+
Thread.Yield();
8188
GlobalStateMachineStep(systemState);
8289
}
8390
}
@@ -120,14 +127,10 @@ internal void StreamingSnapshotScanPhase2(long untilAddress)
120127
{
121128
try
122129
{
123-
Debug.Assert(systemState.Phase == Phase.WAIT_FLUSH);
124-
125130
// Iterate all the (v) records in the store
126131
var scanFunctions = new ScanPhase2Functions(streamingSnapshotIteratorFunctions, this.numberOfRecords);
127132
using var s = NewSession<Empty, Empty, Empty, StreamingSnapshotSessionFunctions>(new());
128133

129-
// TODO: This requires ScanCursor to provide a consistent snapshot considering only records up to untilAddress
130-
// There is a bug in the current implementation of ScanCursor, where it does not provide such a consistent snapshot
131134
_ = s.ScanCursor(ref scannedUntilAddressCursor, long.MaxValue, scanFunctions, endAddress: untilAddress, maxAddress: untilAddress);
132135

133136
// Reset the cursor to 0
@@ -136,13 +139,21 @@ internal void StreamingSnapshotScanPhase2(long untilAddress)
136139

137140
// Reset the callback functions
138141
streamingSnapshotIteratorFunctions = null;
139-
140-
// Release the semaphore to allow the checkpoint waiting task to proceed
141-
_hybridLogCheckpoint.flushedSemaphore.Release();
142+
}
143+
catch (Exception e)
144+
{
145+
logger?.LogError(e, "Exception in StreamingSnapshotScanPhase2");
146+
throw;
142147
}
143148
finally
144149
{
145-
Debug.Assert(systemState.Phase == Phase.WAIT_FLUSH);
150+
// Release the semaphore to allow the checkpoint waiting task to proceed
151+
_hybridLogCheckpoint.flushedSemaphore.Release();
152+
153+
// We started this task before entering WAIT_FLUSH, so we
154+
// need to wait until the state machine is in WAIT_FLUSH
155+
while (systemState.Phase != Phase.WAIT_FLUSH)
156+
Thread.Yield();
146157
GlobalStateMachineStep(systemState);
147158
}
148159
}

0 commit comments

Comments
 (0)