Skip to content

Commit 582bc95

Browse files
FileNotFoundException does not imply InitialSnapshot
This PR changes Delta snapshot management and file listing code to return options, with `None` meaning the directory was empty or missing. Otherwise, they return `Some(logSegment)` -- possibly with an empty file list, if the search found no usable commit files. That way, we can reliably distinguish a truly empty/missing Delta table from one whose log files are corrupted or missing in a way that prevents snapshot construction. The former should produce an `InitialSnapshot` while the latter should propagate an error. Previously, Delta snapshot management code made the unsafe assumption that `FileNotFoundException` always necessarily meant the directory was empty, and several code locations caught the exception in order to create an `InitialSnapshot` that designates an empty table. This led to an awkward and brittle design, where code had to either avoid throwing `FileNotFoundException` -- even if the problem was, in fact, a file not found... or else catch and wrap the exception to ensure it propagated past the catch clauses that would wrongly create an `InitialSnapshot`. Existing unit tests cover this code. GitOrigin-RevId: 6d4330b43cdfa11f69f64ee3849eab9192c9b268
1 parent 01ce4f5 commit 582bc95

File tree

1 file changed

+69
-38
lines changed

1 file changed

+69
-38
lines changed

core/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala

+69-38
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,12 @@ trait SnapshotManagement { self: DeltaLog =>
5151

5252
/**
5353
* Get the LogSegment that will help in computing the Snapshot of the table at DeltaLog
54-
* initialization.
54+
* initialization, or None if the directory was empty/missing.
5555
*
5656
* @param startingCheckpoint A checkpoint that we can start our listing from
5757
*/
5858
protected def getLogSegmentFrom(
59-
startingCheckpoint: Option[CheckpointMetaData]): LogSegment = {
59+
startingCheckpoint: Option[CheckpointMetaData]): Option[LogSegment] = {
6060
getLogSegmentForVersion(startingCheckpoint.map(_.version))
6161
}
6262

@@ -81,11 +81,22 @@ trait SnapshotManagement { self: DeltaLog =>
8181
*
8282
* @param startVersion the version to start. Inclusive.
8383
* @param versionToLoad the optional parameter to set the max version we should return. Inclusive.
84+
* @return Some array of files found (possibly empty, if no usable commit files are present), or
85+
* None if the listing returned no files at all.
8486
*/
8587
private final def listDeltaAndCheckpointFiles(
8688
startVersion: Long,
87-
versionToLoad: Option[Long]): Array[FileStatus] = {
88-
listFrom(startVersion)
89+
versionToLoad: Option[Long]): Option[Array[FileStatus]] = {
90+
// LIST the directory, starting from the provided lower bound (treat missing dir as empty).
91+
// NOTE: "empty/missing" is _NOT_ equivalent to "contains no useful commit files."
92+
val listing = try {
93+
listFrom(startVersion)
94+
} catch {
95+
case _: FileNotFoundException => Iterator.empty
96+
}
97+
if (listing.isEmpty) return None
98+
99+
val files = listing
89100
// Pick up all checkpoint and delta files
90101
.filter { file => isDeltaCommitOrCheckpointFile(file.getPath) }
91102
// Checkpoint files of 0 size are invalid but Spark will ignore them silently when reading
@@ -94,6 +105,9 @@ trait SnapshotManagement { self: DeltaLog =>
94105
// take files until the version we want to load
95106
.takeWhile(f => versionToLoad.forall(v => getFileVersion(f.getPath) <= v))
96107
.toArray
108+
109+
// NOTE: The file list may be empty if the listing returned no usable files.
110+
Some(files)
97111
}
98112

99113
/**
@@ -109,16 +123,32 @@ trait SnapshotManagement { self: DeltaLog =>
109123
* Delta streaming source. If not provided, we will try to load the latest
110124
* version of the table.
111125
* @return Some LogSegment to build a Snapshot if files do exist after the given
112-
* startCheckpoint. None, if there are no new files after `startCheckpoint`.
126+
* startCheckpoint. None, if the directory was missing or empty.
113127
*/
114128
protected def getLogSegmentForVersion(
115129
startCheckpoint: Option[Long],
116-
versionToLoad: Option[Long] = None): LogSegment = {
130+
versionToLoad: Option[Long] = None): Option[LogSegment] = {
117131
recordFrameProfile("Delta", "SnapshotManagement.getLogSegmentForVersion") {
118132
// List from the starting checkpoint. If a checkpoint doesn't exist, this will still return
119133
// deltaVersion=0.
120134
val newFiles = listDeltaAndCheckpointFiles(startCheckpoint.getOrElse(0L), versionToLoad)
135+
.getOrElse {
136+
// No files found even when listing from 0 => empty directory => table does not exist yet.
137+
if (startCheckpoint.isEmpty) return None
138+
139+
// [SC-95011] FIXME(ryan.johnson): We always write the commit and checkpoint files
140+
// before updating _last_checkpoint. If the listing came up empty, then we either
141+
// encountered a list-after-put inconsistency in the underlying log store, or somebody
142+
// corrupted the table by deleting files. Either way, we can't safely continue.
143+
//
144+
// For now, we preserve existing behavior by returning Array.empty, which will trigger a
145+
// recursive call to [[getLogSegmentForVersion]] below (same as before the refactor).
146+
Array.empty[FileStatus]
147+
}
148+
121149
if (newFiles.isEmpty && startCheckpoint.isEmpty) {
150+
// We can't construct a snapshot because the directory contained no usable commit
151+
// files... but we can't return None either, because it was not truly empty.
122152
throw DeltaErrors.emptyDirectoryException(logPath.toString)
123153
} else if (newFiles.isEmpty) {
124154
// The directory may be deleted and recreated and we may have stale state in our DeltaLog
@@ -135,19 +165,22 @@ trait SnapshotManagement { self: DeltaLog =>
135165
val newCheckpointVersion = newCheckpoint.map(_.version).getOrElse {
136166
// If we do not have any checkpoint, pass new checkpoint version as -1 so that first
137167
// delta version can be 0.
138-
if (startCheckpoint.isDefined) {
168+
startCheckpoint.foreach { startCheckpoint =>
139169
// `startCheckpoint` was given but no checkpoint found on delta log. This means that the
140170
// last checkpoint we thought should exist (the `_last_checkpoint` file) no longer exists.
141171
// Try to look up another valid checkpoint and create `LogSegment` from it.
172+
//
173+
// [SC-95011] FIXME(ryan.johnson): Something has gone very wrong if the checkpoint doesn't
174+
// exist at all. This code should only handle rejected incomplete checkpoints.
142175
recordDeltaEvent(this, "delta.checkpoint.error.partial")
143-
val alternativeLogSegment = getLogSegmentWithMaxExclusiveCheckpointVersion(
144-
snapshotVersion = versionToLoad.getOrElse(deltaVersion(deltas.last.getPath)),
145-
startCheckpoint.get)
146-
return alternativeLogSegment.getOrElse {
147-
throw DeltaErrors.missingPartFilesException(
148-
startCheckpoint.get, new FileNotFoundException(
149-
s"Checkpoint file to load version: ${startCheckpoint.get} is missing."))
150-
}
176+
val snapshotVersion = versionToLoad.getOrElse(deltaVersion(deltas.last.getPath))
177+
getLogSegmentWithMaxExclusiveCheckpointVersion(snapshotVersion, startCheckpoint)
178+
.foreach { alternativeLogSegment => return Some(alternativeLogSegment) }
179+
180+
// No alternative found, but the directory contains files so we cannot return None.
181+
throw DeltaErrors.missingPartFilesException(
182+
startCheckpoint, new FileNotFoundException(
183+
s"Checkpoint file to load version: $startCheckpoint is missing."))
151184
}
152185
-1L
153186
}
@@ -190,13 +223,13 @@ trait SnapshotManagement { self: DeltaLog =>
190223
}
191224
val lastCommitTimestamp = deltas.last.getModificationTime
192225

193-
LogSegment(
226+
Some(LogSegment(
194227
logPath,
195228
newVersion,
196229
deltasAfterCheckpoint,
197230
newCheckpointFiles,
198231
newCheckpoint.map(_.version),
199-
lastCommitTimestamp)
232+
lastCommitTimestamp))
200233
}
201234
}
202235

@@ -207,8 +240,7 @@ trait SnapshotManagement { self: DeltaLog =>
207240
*/
208241
protected def getSnapshotAtInit: Snapshot = {
209242
recordFrameProfile("Delta", "SnapshotManagement.getSnapshotAtInit") {
210-
try {
211-
val segment = getLogSegmentFrom(lastCheckpoint)
243+
getLogSegmentFrom(lastCheckpoint).map { segment =>
212244
val startCheckpoint = segment.checkpointVersionOpt
213245
.map(v => s" starting from checkpoint $v.").getOrElse(".")
214246
logInfo(s"Loading version ${segment.version}$startCheckpoint")
@@ -217,11 +249,9 @@ trait SnapshotManagement { self: DeltaLog =>
217249
lastUpdateTimestamp = clock.getTimeMillis()
218250
logInfo(s"Returning initial snapshot $snapshot")
219251
snapshot
220-
} catch {
221-
case e: FileNotFoundException =>
222-
logInfo(s"Creating initial snapshot without metadata, because the directory is empty")
223-
// The log directory may not exist
224-
new InitialSnapshot(logPath, this)
252+
}.getOrElse {
253+
logInfo(s"Creating initial snapshot without metadata, because the directory is empty")
254+
new InitialSnapshot(logPath, this)
225255
}
226256
}
227257
}
@@ -272,6 +302,7 @@ trait SnapshotManagement { self: DeltaLog =>
272302
val filesSinceCheckpointVersion = listDeltaAndCheckpointFiles(
273303
startVersion = cp.version,
274304
versionToLoad = Some(snapshotVersion))
305+
.getOrElse(Array.empty)
275306
val (checkpoints, deltas) =
276307
filesSinceCheckpointVersion.partition(f => isCheckpointFile(f.getPath))
277308
if (deltas.isEmpty) {
@@ -315,6 +346,7 @@ trait SnapshotManagement { self: DeltaLog =>
315346
case None =>
316347
val deltas =
317348
listDeltaAndCheckpointFiles(startVersion = 0, versionToLoad = Some(snapshotVersion))
349+
.getOrElse(Array.empty)
318350
.filter(file => isDeltaFile(file.getPath))
319351
val deltaVersions = deltas.map(f => deltaVersion(f.getPath))
320352
try {
@@ -438,8 +470,8 @@ trait SnapshotManagement { self: DeltaLog =>
438470
*/
439471
protected def updateInternal(isAsync: Boolean): Snapshot =
440472
recordDeltaOperation(this, "delta.log.update", Map(TAG_ASYNC -> isAsync.toString)) {
441-
try {
442-
val segment = getLogSegmentForVersion(currentSnapshot.logSegment.checkpointVersionOpt)
473+
val segmentOpt = getLogSegmentForVersion(currentSnapshot.logSegment.checkpointVersionOpt)
474+
val newSnapshot = segmentOpt.map { segment =>
443475
if (segment == currentSnapshot.logSegment) {
444476
// Exit early if there is no new file
445477
lastUpdateTimestamp = clock.getTimeMillis()
@@ -464,17 +496,13 @@ trait SnapshotManagement { self: DeltaLog =>
464496
"nextSnapshotMetadata" -> newSnapshot.metadata))
465497
}
466498

467-
replaceSnapshot(newSnapshot)
468499
logInfo(s"Updated snapshot to $newSnapshot")
469-
} catch {
470-
case e: FileNotFoundException =>
471-
if (Option(e.getMessage).exists(_.contains("reconstruct state at version"))) {
472-
throw e
473-
}
474-
val message = s"No delta log found for the Delta table at $logPath"
475-
logInfo(message)
476-
replaceSnapshot(new InitialSnapshot(logPath, this))
500+
newSnapshot
501+
}.getOrElse {
502+
logInfo(s"No delta log found for the Delta table at $logPath")
503+
new InitialSnapshot(logPath, this)
477504
}
505+
replaceSnapshot(newSnapshot)
478506
lastUpdateTimestamp = clock.getTimeMillis()
479507
currentSnapshot
480508
}
@@ -501,9 +529,12 @@ trait SnapshotManagement { self: DeltaLog =>
501529
// Do not use the hint if the version we're asking for is smaller than the last checkpoint hint
502530
val startingCheckpoint = lastCheckpointHint.collect { case ci if ci.version <= version => ci }
503531
.orElse(findLastCompleteCheckpoint(CheckpointInstance(version, None)))
504-
val segment = getLogSegmentForVersion(startingCheckpoint.map(_.version), Some(version))
505-
506-
createSnapshot(segment, minFileRetentionTimestamp)
532+
getLogSegmentForVersion(startingCheckpoint.map(_.version), Some(version)).map { segment =>
533+
createSnapshot(segment, minFileRetentionTimestamp)
534+
}.getOrElse {
535+
// We can't return InitialSnapshot because our caller asked for a specific snapshot version.
536+
throw DeltaErrors.emptyDirectoryException(logPath.toString)
537+
}
507538
}
508539
}
509540

0 commit comments

Comments
 (0)