@@ -51,12 +51,12 @@ trait SnapshotManagement { self: DeltaLog =>
51
51
52
52
/**
53
53
* Get the LogSegment that will help in computing the Snapshot of the table at DeltaLog
54
- * initialization.
54
+ * initialization, or None if the directory was empty/missing .
55
55
*
56
56
* @param startingCheckpoint A checkpoint that we can start our listing from
57
57
*/
58
58
protected def getLogSegmentFrom (
59
- startingCheckpoint : Option [CheckpointMetaData ]): LogSegment = {
59
+ startingCheckpoint : Option [CheckpointMetaData ]): Option [ LogSegment ] = {
60
60
getLogSegmentForVersion(startingCheckpoint.map(_.version))
61
61
}
62
62
@@ -81,11 +81,22 @@ trait SnapshotManagement { self: DeltaLog =>
81
81
*
82
82
* @param startVersion the version to start. Inclusive.
83
83
* @param versionToLoad the optional parameter to set the max version we should return. Inclusive.
84
+ * @return Some array of files found (possibly empty, if no usable commit files are present), or
85
+ * None if the listing returned no files at all.
84
86
*/
85
87
private final def listDeltaAndCheckpointFiles (
86
88
startVersion : Long ,
87
- versionToLoad : Option [Long ]): Array [FileStatus ] = {
88
- listFrom(startVersion)
89
+ versionToLoad : Option [Long ]): Option [Array [FileStatus ]] = {
90
+ // LIST the directory, starting from the provided lower bound (treat missing dir as empty).
91
+ // NOTE: "empty/missing" is _NOT_ equivalent to "contains no useful commit files."
92
+ val listing = try {
93
+ listFrom(startVersion)
94
+ } catch {
95
+ case _ : FileNotFoundException => Iterator .empty
96
+ }
97
+ if (listing.isEmpty) return None
98
+
99
+ val files = listing
89
100
// Pick up all checkpoint and delta files
90
101
.filter { file => isDeltaCommitOrCheckpointFile(file.getPath) }
91
102
// Checkpoint files of 0 size are invalid but Spark will ignore them silently when reading
@@ -94,6 +105,9 @@ trait SnapshotManagement { self: DeltaLog =>
94
105
// take files until the version we want to load
95
106
.takeWhile(f => versionToLoad.forall(v => getFileVersion(f.getPath) <= v))
96
107
.toArray
108
+
109
+ // NOTE: The file list may be empty if the listing returned no usable files.
110
+ Some (files)
97
111
}
98
112
99
113
/**
@@ -109,16 +123,32 @@ trait SnapshotManagement { self: DeltaLog =>
109
123
* Delta streaming source. If not provided, we will try to load the latest
110
124
* version of the table.
111
125
* @return Some LogSegment to build a Snapshot if files do exist after the given
112
- * startCheckpoint. None, if there are no new files after `startCheckpoint` .
126
+ * startCheckpoint. None, if the directory was missing or empty .
113
127
*/
114
128
protected def getLogSegmentForVersion (
115
129
startCheckpoint : Option [Long ],
116
- versionToLoad : Option [Long ] = None ): LogSegment = {
130
+ versionToLoad : Option [Long ] = None ): Option [ LogSegment ] = {
117
131
recordFrameProfile(" Delta" , " SnapshotManagement.getLogSegmentForVersion" ) {
118
132
// List from the starting checkpoint. If a checkpoint doesn't exist, this will still return
119
133
// deltaVersion=0.
120
134
val newFiles = listDeltaAndCheckpointFiles(startCheckpoint.getOrElse(0L ), versionToLoad)
135
+ .getOrElse {
136
+ // No files found even when listing from 0 => empty directory => table does not exist yet.
137
+ if (startCheckpoint.isEmpty) return None
138
+
139
+ // [SC-95011] FIXME(ryan.johnson): We always write the commit and checkpoint files
140
+ // before updating _last_checkpoint. If the listing came up empty, then we either
141
+ // encountered a list-after-put inconsistency in the underlying log store, or somebody
142
+ // corrupted the table by deleting files. Either way, we can't safely continue.
143
+ //
144
+ // For now, we preserve existing behavior by returning Array.empty, which will trigger a
145
+ // recursive call to [[getLogSegmentForVersion]] below (same as before the refactor).
146
+ Array .empty[FileStatus ]
147
+ }
148
+
121
149
if (newFiles.isEmpty && startCheckpoint.isEmpty) {
150
+ // We can't construct a snapshot because the directory contained no usable commit
151
+ // files... but we can't return None either, because it was not truly empty.
122
152
throw DeltaErrors .emptyDirectoryException(logPath.toString)
123
153
} else if (newFiles.isEmpty) {
124
154
// The directory may be deleted and recreated and we may have stale state in our DeltaLog
@@ -135,19 +165,22 @@ trait SnapshotManagement { self: DeltaLog =>
135
165
val newCheckpointVersion = newCheckpoint.map(_.version).getOrElse {
136
166
// If we do not have any checkpoint, pass new checkpoint version as -1 so that first
137
167
// delta version can be 0.
138
- if ( startCheckpoint.isDefined) {
168
+ startCheckpoint.foreach { startCheckpoint =>
139
169
// `startCheckpoint` was given but no checkpoint found on delta log. This means that the
140
170
// last checkpoint we thought should exist (the `_last_checkpoint` file) no longer exists.
141
171
// Try to look up another valid checkpoint and create `LogSegment` from it.
172
+ //
173
+ // [SC-95011] FIXME(ryan.johnson): Something has gone very wrong if the checkpoint doesn't
174
+ // exist at all. This code should only handle rejected incomplete checkpoints.
142
175
recordDeltaEvent(this , " delta.checkpoint.error.partial" )
143
- val alternativeLogSegment = getLogSegmentWithMaxExclusiveCheckpointVersion(
144
- snapshotVersion = versionToLoad.getOrElse(deltaVersion(deltas.last.getPath)),
145
- startCheckpoint.get)
146
- return alternativeLogSegment.getOrElse {
147
- throw DeltaErrors .missingPartFilesException(
148
- startCheckpoint.get, new FileNotFoundException (
149
- s " Checkpoint file to load version: ${ startCheckpoint.get} is missing. " ))
150
- }
176
+ val snapshotVersion = versionToLoad.getOrElse(deltaVersion(deltas.last.getPath))
177
+ getLogSegmentWithMaxExclusiveCheckpointVersion( snapshotVersion, startCheckpoint)
178
+ .foreach { alternativeLogSegment => return Some (alternativeLogSegment) }
179
+
180
+ // No alternative found, but the directory contains files so we cannot return None.
181
+ throw DeltaErrors .missingPartFilesException (
182
+ startCheckpoint, new FileNotFoundException (
183
+ s " Checkpoint file to load version: $startCheckpoint is missing. " ))
151
184
}
152
185
- 1L
153
186
}
@@ -190,13 +223,13 @@ trait SnapshotManagement { self: DeltaLog =>
190
223
}
191
224
val lastCommitTimestamp = deltas.last.getModificationTime
192
225
193
- LogSegment (
226
+ Some ( LogSegment (
194
227
logPath,
195
228
newVersion,
196
229
deltasAfterCheckpoint,
197
230
newCheckpointFiles,
198
231
newCheckpoint.map(_.version),
199
- lastCommitTimestamp)
232
+ lastCommitTimestamp))
200
233
}
201
234
}
202
235
@@ -207,8 +240,7 @@ trait SnapshotManagement { self: DeltaLog =>
207
240
*/
208
241
protected def getSnapshotAtInit : Snapshot = {
209
242
recordFrameProfile(" Delta" , " SnapshotManagement.getSnapshotAtInit" ) {
210
- try {
211
- val segment = getLogSegmentFrom(lastCheckpoint)
243
+ getLogSegmentFrom(lastCheckpoint).map { segment =>
212
244
val startCheckpoint = segment.checkpointVersionOpt
213
245
.map(v => s " starting from checkpoint $v. " ).getOrElse(" ." )
214
246
logInfo(s " Loading version ${segment.version}$startCheckpoint" )
@@ -217,11 +249,9 @@ trait SnapshotManagement { self: DeltaLog =>
217
249
lastUpdateTimestamp = clock.getTimeMillis()
218
250
logInfo(s " Returning initial snapshot $snapshot" )
219
251
snapshot
220
- } catch {
221
- case e : FileNotFoundException =>
222
- logInfo(s " Creating initial snapshot without metadata, because the directory is empty " )
223
- // The log directory may not exist
224
- new InitialSnapshot (logPath, this )
252
+ }.getOrElse {
253
+ logInfo(s " Creating initial snapshot without metadata, because the directory is empty " )
254
+ new InitialSnapshot (logPath, this )
225
255
}
226
256
}
227
257
}
@@ -272,6 +302,7 @@ trait SnapshotManagement { self: DeltaLog =>
272
302
val filesSinceCheckpointVersion = listDeltaAndCheckpointFiles(
273
303
startVersion = cp.version,
274
304
versionToLoad = Some (snapshotVersion))
305
+ .getOrElse(Array .empty)
275
306
val (checkpoints, deltas) =
276
307
filesSinceCheckpointVersion.partition(f => isCheckpointFile(f.getPath))
277
308
if (deltas.isEmpty) {
@@ -315,6 +346,7 @@ trait SnapshotManagement { self: DeltaLog =>
315
346
case None =>
316
347
val deltas =
317
348
listDeltaAndCheckpointFiles(startVersion = 0 , versionToLoad = Some (snapshotVersion))
349
+ .getOrElse(Array .empty)
318
350
.filter(file => isDeltaFile(file.getPath))
319
351
val deltaVersions = deltas.map(f => deltaVersion(f.getPath))
320
352
try {
@@ -438,8 +470,8 @@ trait SnapshotManagement { self: DeltaLog =>
438
470
*/
439
471
protected def updateInternal (isAsync : Boolean ): Snapshot =
440
472
recordDeltaOperation(this , " delta.log.update" , Map (TAG_ASYNC -> isAsync.toString)) {
441
- try {
442
- val segment = getLogSegmentForVersion(currentSnapshot.logSegment.checkpointVersionOpt)
473
+ val segmentOpt = getLogSegmentForVersion(currentSnapshot.logSegment.checkpointVersionOpt)
474
+ val newSnapshot = segmentOpt.map { segment =>
443
475
if (segment == currentSnapshot.logSegment) {
444
476
// Exit early if there is no new file
445
477
lastUpdateTimestamp = clock.getTimeMillis()
@@ -464,17 +496,13 @@ trait SnapshotManagement { self: DeltaLog =>
464
496
" nextSnapshotMetadata" -> newSnapshot.metadata))
465
497
}
466
498
467
- replaceSnapshot(newSnapshot)
468
499
logInfo(s " Updated snapshot to $newSnapshot" )
469
- } catch {
470
- case e : FileNotFoundException =>
471
- if (Option (e.getMessage).exists(_.contains(" reconstruct state at version" ))) {
472
- throw e
473
- }
474
- val message = s " No delta log found for the Delta table at $logPath"
475
- logInfo(message)
476
- replaceSnapshot(new InitialSnapshot (logPath, this ))
500
+ newSnapshot
501
+ }.getOrElse {
502
+ logInfo(s " No delta log found for the Delta table at $logPath" )
503
+ new InitialSnapshot (logPath, this )
477
504
}
505
+ replaceSnapshot(newSnapshot)
478
506
lastUpdateTimestamp = clock.getTimeMillis()
479
507
currentSnapshot
480
508
}
@@ -501,9 +529,12 @@ trait SnapshotManagement { self: DeltaLog =>
501
529
// Do not use the hint if the version we're asking for is smaller than the last checkpoint hint
502
530
val startingCheckpoint = lastCheckpointHint.collect { case ci if ci.version <= version => ci }
503
531
.orElse(findLastCompleteCheckpoint(CheckpointInstance (version, None )))
504
- val segment = getLogSegmentForVersion(startingCheckpoint.map(_.version), Some (version))
505
-
506
- createSnapshot(segment, minFileRetentionTimestamp)
532
+ getLogSegmentForVersion(startingCheckpoint.map(_.version), Some (version)).map { segment =>
533
+ createSnapshot(segment, minFileRetentionTimestamp)
534
+ }.getOrElse {
535
+ // We can't return InitialSnapshot because our caller asked for a specific snapshot version.
536
+ throw DeltaErrors .emptyDirectoryException(logPath.toString)
537
+ }
507
538
}
508
539
}
509
540
0 commit comments