Skip to content

Commit 864b5e3

Browse files
[Kernel] Throw InvalidTableException when we encounter a known invalid table state (#3288)
#### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description During log reconstruction we throw a bunch of various errors when we encounter a table state that we consider invalid. This PR updates some of those errors to be `InvalidTableException`. This doesn't necessarily cover all possible exceptions due to an invalid table, just some of the known and clear cases during log reconstruction. ## How was this patch tested? Updates existing tests. ## Does this PR introduce _any_ user-facing changes? Yes, instead of internal `RuntimeExceptions` or `IllegalStateExceptions`, etc a `InvalidTableException` will be thrown when certain invalid table states are encountered. --------- Co-authored-by: Venki Korukanti <[email protected]>
1 parent 956b950 commit 864b5e3

File tree

4 files changed

+118
-81
lines changed

4 files changed

+118
-81
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright (2024) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.delta.kernel.exceptions;
17+
18+
/**
19+
* Thrown when an invalid table is encountered; the table's log and/or checkpoint files are
20+
* in an invalid state.
21+
*/
22+
public class InvalidTableException extends KernelException {
23+
24+
private static final String message = "Invalid table found at %s: %s";
25+
26+
public InvalidTableException(String tablePath, String reason) {
27+
super(String.format(message, tablePath, reason));
28+
}
29+
}

kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java

+26-14
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.delta.kernel.*;
3232
import io.delta.kernel.engine.Engine;
3333
import io.delta.kernel.exceptions.CheckpointAlreadyExistsException;
34+
import io.delta.kernel.exceptions.InvalidTableException;
3435
import io.delta.kernel.exceptions.TableNotFoundException;
3536
import io.delta.kernel.utils.CloseableIterator;
3637
import io.delta.kernel.utils.FileStatus;
@@ -79,15 +80,19 @@ public SnapshotManager(Path logPath, Path tablePath) {
7980
public static void verifyDeltaVersions(
8081
List<Long> versions,
8182
Optional<Long> expectedStartVersion,
82-
Optional<Long> expectedEndVersion) {
83+
Optional<Long> expectedEndVersion,
84+
Path tablePath) {
8385
if (!versions.isEmpty()) {
8486
List<Long> contVersions = LongStream
8587
.rangeClosed(versions.get(0), versions.get(versions.size() -1))
8688
.boxed()
8789
.collect(Collectors.toList());
8890
if (!contVersions.equals(versions)) {
89-
throw new IllegalStateException(
90-
format("Versions (%s) are not continuous", versions));
91+
throw new InvalidTableException(
92+
tablePath.toString(),
93+
String.format(
94+
"Missing delta files: versions are not continuous: (%s)", versions)
95+
);
9196
}
9297
}
9398
expectedStartVersion.ifPresent(v -> {
@@ -628,11 +633,13 @@ protected Optional<LogSegment> getLogSegmentForVersion(
628633
final long newVersion = deltaVersionsAfterCheckpoint.isEmpty() ?
629634
newCheckpointOpt.get().version : deltaVersionsAfterCheckpoint.getLast();
630635

631-
// In the case where `deltasAfterCheckpoint` is empty, `deltas` should still not be empty,
632-
// they may just be before the checkpoint version unless we have a bug in log cleanup.
633-
if (deltas.isEmpty()) {
634-
throw new IllegalStateException(
635-
format("Could not find any delta files for version %s", newVersion)
636+
// There should be a delta file present for the newVersion that we are loading
637+
// (Even if `deltasAfterCheckpoint` is empty, `deltas` should not be)
638+
if (deltas.isEmpty() ||
639+
FileNames.deltaVersion(deltas.get(deltas.size() - 1).getPath()) < newVersion) {
640+
throw new InvalidTableException(
641+
tablePath.toString(),
642+
String.format("Missing delta file for version %s", newVersion)
636643
);
637644
}
638645

@@ -642,19 +649,24 @@ protected Optional<LogSegment> getLogSegmentForVersion(
642649

643650
// We may just be getting a checkpoint file after the filtering
644651
if (!deltaVersionsAfterCheckpoint.isEmpty()) {
652+
// If we have deltas after the checkpoint, the first file should be 1 greater than our
653+
// last checkpoint version. If no checkpoint is present, this means the first delta file
654+
// should be version 0.
645655
if (deltaVersionsAfterCheckpoint.getFirst() != newCheckpointVersion + 1) {
646-
throw new RuntimeException(
647-
format(
648-
"Log file not found.\nExpected: %s\nFound: %s",
649-
FileNames.deltaFile(logPath, newCheckpointVersion + 1),
650-
FileNames.deltaFile(logPath, deltaVersionsAfterCheckpoint.get(0))
656+
throw new InvalidTableException(
657+
tablePath.toString(),
658+
String.format(
659+
"Unable to reconstruct table state: missing log file for version %s",
660+
newCheckpointVersion + 1
651661
)
652662
);
653663
}
654664
verifyDeltaVersions(
655665
deltaVersionsAfterCheckpoint,
656666
Optional.of(newCheckpointVersion + 1),
657-
versionToLoadOpt);
667+
versionToLoadOpt,
668+
tablePath
669+
);
658670
}
659671

660672
final long lastCommitTimestamp = deltas.get(deltas.size() - 1).getModificationTime();

kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala

+60-64
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@ import java.util.{Arrays, Collections, Optional}
2020
import scala.collection.JavaConverters._
2121
import scala.reflect.ClassTag
2222

23-
import io.delta.kernel.data.{ColumnVector, ColumnarBatch}
23+
import io.delta.kernel.data.{ColumnarBatch, ColumnVector}
24+
import io.delta.kernel.exceptions.InvalidTableException
2425
import io.delta.kernel.expressions.Predicate
2526
import io.delta.kernel.internal.checkpoints.{CheckpointInstance, SidecarFile}
27+
import io.delta.kernel.internal.fs.Path
2628
import io.delta.kernel.internal.snapshot.{LogSegment, SnapshotManager}
2729
import io.delta.kernel.internal.util.{FileNames, Utils}
2830
import io.delta.kernel.test.{BaseMockJsonHandler, BaseMockParquetHandler, MockFileSystemClientUtils, VectorTestUtils}
@@ -37,70 +39,82 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {
3739
SnapshotManager.verifyDeltaVersions(
3840
Collections.emptyList(),
3941
Optional.empty(),
40-
Optional.empty())
42+
Optional.empty(),
43+
new Path("/path/to/table"))
4144
// contiguous versions
4245
SnapshotManager.verifyDeltaVersions(
4346
Arrays.asList(1, 2, 3),
4447
Optional.empty(),
45-
Optional.empty())
48+
Optional.empty(),
49+
new Path("/path/to/table"))
4650
// contiguous versions with correct `expectedStartVersion` and `expectedStartVersion`
4751
SnapshotManager.verifyDeltaVersions(
4852
Arrays.asList(1, 2, 3),
4953
Optional.empty(),
50-
Optional.of(3))
54+
Optional.of(3),
55+
new Path("/path/to/table"))
5156
SnapshotManager.verifyDeltaVersions(
5257
Arrays.asList(1, 2, 3),
5358
Optional.of(1),
54-
Optional.empty())
59+
Optional.empty(),
60+
new Path("/path/to/table"))
5561
SnapshotManager.verifyDeltaVersions(
5662
Arrays.asList(1, 2, 3),
5763
Optional.of(1),
58-
Optional.of(3))
64+
Optional.of(3),
65+
new Path("/path/to/table"))
5966
// `expectedStartVersion` or `expectedEndVersion` doesn't match
6067
intercept[IllegalArgumentException] {
6168
SnapshotManager.verifyDeltaVersions(
6269
Arrays.asList(1, 2),
6370
Optional.of(0),
64-
Optional.empty())
71+
Optional.empty(),
72+
new Path("/path/to/table"))
6573
}
6674
intercept[IllegalArgumentException] {
6775
SnapshotManager.verifyDeltaVersions(
6876
Arrays.asList(1, 2),
6977
Optional.empty(),
70-
Optional.of(3))
78+
Optional.of(3),
79+
new Path("/path/to/table"))
7180
}
7281
intercept[IllegalArgumentException] {
7382
SnapshotManager.verifyDeltaVersions(
7483
Collections.emptyList(),
7584
Optional.of(0),
76-
Optional.empty())
85+
Optional.empty(),
86+
new Path("/path/to/table"))
7787
}
7888
intercept[IllegalArgumentException] {
7989
SnapshotManager.verifyDeltaVersions(
8090
Collections.emptyList(),
8191
Optional.empty(),
82-
Optional.of(3))
92+
Optional.of(3),
93+
new Path("/path/to/table"))
8394
}
8495
// non contiguous versions
85-
intercept[IllegalStateException] {
96+
intercept[InvalidTableException] {
8697
SnapshotManager.verifyDeltaVersions(
8798
Arrays.asList(1, 3),
8899
Optional.empty(),
89-
Optional.empty())
100+
Optional.empty(),
101+
new Path("/path/to/table"))
90102
}
91103
// duplicates in versions
92-
intercept[IllegalStateException] {
104+
intercept[InvalidTableException] {
93105
SnapshotManager.verifyDeltaVersions(
94106
Arrays.asList(1, 2, 2, 3),
95107
Optional.empty(),
96-
Optional.empty())
108+
Optional.empty(),
109+
new Path("/path/to/table"))
97110
}
98111
// unsorted versions
99-
intercept[IllegalStateException] {
112+
intercept[InvalidTableException] {
100113
SnapshotManager.verifyDeltaVersions(
101114
Arrays.asList(3, 2, 1),
102115
Optional.empty(),
103-
Optional.empty())
116+
Optional.empty(),
117+
new Path("/path/to/table"))
104118
}
105119
}
106120

@@ -631,11 +645,11 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {
631645
Seq(Optional.empty(), Optional.of(10L)): Seq[Optional[java.lang.Long]]) {
632646
for (startCheckpoint <-
633647
Seq(Optional.empty(), Optional.of(10L)): Seq[Optional[java.lang.Long]]) {
634-
testExpectedError[IllegalStateException](
648+
testExpectedError[InvalidTableException](
635649
files = singularCheckpointFileStatuses(Seq(10L)),
636650
startCheckpoint = startCheckpoint,
637651
versionToLoad = versionToLoad,
638-
expectedErrorMessageContains = "Could not find any delta files for version 10"
652+
expectedErrorMessageContains = "Missing delta file for version 10"
639653
)
640654
}
641655
}
@@ -645,60 +659,47 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {
645659
// checkpoint(10), 010.json, 011.json, 013.json
646660
val fileList = deltaFileStatuses(Seq(10L, 11L)) ++ deltaFileStatuses(Seq(13L)) ++
647661
singularCheckpointFileStatuses(Seq(10L))
648-
testExpectedError[RuntimeException](
662+
testExpectedError[InvalidTableException](
649663
fileList,
650-
expectedErrorMessageContains = "Versions ([11, 13]) are not continuous"
664+
expectedErrorMessageContains = "versions are not continuous: ([11, 13])"
651665
)
652-
testExpectedError[RuntimeException](
666+
testExpectedError[InvalidTableException](
653667
fileList,
654668
startCheckpoint = Optional.of(10),
655-
expectedErrorMessageContains = "Versions ([11, 13]) are not continuous"
669+
expectedErrorMessageContains = "versions are not continuous: ([11, 13])"
656670
)
657-
testExpectedError[RuntimeException](
671+
testExpectedError[InvalidTableException](
658672
fileList,
659673
versionToLoad = Optional.of(13),
660-
expectedErrorMessageContains = "Versions ([11, 13]) are not continuous"
674+
expectedErrorMessageContains = "versions are not continuous: ([11, 13])"
661675
)
662676
}
663677

664-
// TODO address the inconsistent behaviors and throw better error messages for corrupt listings?
665-
// (delta-io/delta#2283)
666678
test("getLogSegmentForVersion: corrupt listing 000.json...009.json + checkpoint(10)") {
667679
val fileList = deltaFileStatuses((0L until 10L)) ++ singularCheckpointFileStatuses(Seq(10L))
668680

669681
/* ---------- version to load is 15 (greater than latest checkpoint/delta file) ---------- */
670-
testExpectedError[RuntimeException](
682+
testExpectedError[InvalidTableException](
671683
fileList,
672684
versionToLoad = Optional.of(15),
673-
expectedErrorMessageContains = "Could not find any delta files for version 10"
685+
expectedErrorMessageContains = "Missing delta file for version 10"
674686
)
675-
testExpectedError[IllegalStateException](
687+
testExpectedError[InvalidTableException](
676688
fileList,
677689
startCheckpoint = Optional.of(10),
678690
versionToLoad = Optional.of(15),
679-
expectedErrorMessageContains = "Could not find any delta files for version 10"
691+
expectedErrorMessageContains = "Missing delta file for version 10"
680692
)
681693

682694
/* ---------- versionToLoad is latest (10) ---------- */
683-
// (?) fails when startCheckpoint is provided, passes when it's not
684-
testExpectedError[IllegalStateException](
695+
testExpectedError[InvalidTableException](
685696
fileList,
686697
startCheckpoint = Optional.of(10),
687-
expectedErrorMessageContains = "Could not find any delta files for version 10"
688-
)
689-
val logSegment = snapshotManager.getLogSegmentForVersion(
690-
createMockFSListFromEngine(fileList),
691-
Optional.empty(),
692-
Optional.empty()
698+
expectedErrorMessageContains = "Missing delta file for version 10"
693699
)
694-
assert(logSegment.isPresent())
695-
checkLogSegment(
696-
logSegment.get(),
697-
10,
698-
Seq.empty,
699-
singularCheckpointFileStatuses(Seq(10L)),
700-
Some(10),
701-
90 // is the last available delta file
700+
testExpectedError[InvalidTableException](
701+
fileList,
702+
expectedErrorMessageContains = "Missing delta file for version 10"
702703
)
703704
}
704705

@@ -718,41 +719,36 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {
718719
)
719720
/* ---------- versionToLoad is 10 ---------- */
720721
// (?) throws an error
721-
testExpectedError[IllegalStateException](
722+
testExpectedError[InvalidTableException](
722723
fileList,
723724
versionToLoad = Optional.of(10),
724-
expectedErrorMessageContains = "Could not find any delta files for version 10"
725+
expectedErrorMessageContains = "Missing delta file for version 10"
725726
)
726-
testExpectedError[IllegalStateException](
727+
testExpectedError[InvalidTableException](
727728
fileList,
728729
startCheckpoint = Optional.of(10),
729730
versionToLoad = Optional.of(10),
730-
expectedErrorMessageContains = "Could not find any delta files for version 10"
731+
expectedErrorMessageContains = "Missing delta file for version 10"
731732
)
732733
}
733734

734735
test("getLogSegmentForVersion: corrupted log missing json files / no way to construct history") {
735-
def expectedErrorMessage(v: Int): String = {
736-
s"""|Log file not found.
737-
|Expected: ${FileNames.deltaFile(logPath, 0)}
738-
|Found: ${FileNames.deltaFile(logPath, v)}""".stripMargin
739-
}
740-
testExpectedError[RuntimeException](
736+
testExpectedError[InvalidTableException](
741737
deltaFileStatuses(1L until 10L),
742-
expectedErrorMessageContains = expectedErrorMessage(1)
738+
expectedErrorMessageContains = "missing log file for version 0"
743739
)
744-
testExpectedError[RuntimeException](
740+
testExpectedError[InvalidTableException](
745741
deltaFileStatuses(15L until 25L) ++ singularCheckpointFileStatuses(Seq(20L)),
746742
versionToLoad = Optional.of(17),
747-
expectedErrorMessageContains = expectedErrorMessage(15)
743+
expectedErrorMessageContains = "missing log file for version 0"
748744
)
749-
testExpectedError[RuntimeException](
745+
testExpectedError[InvalidTableException](
750746
deltaFileStatuses(15L until 25L) ++ singularCheckpointFileStatuses(Seq(20L)),
751747
startCheckpoint = Optional.of(20),
752748
versionToLoad = Optional.of(17),
753-
expectedErrorMessageContains = expectedErrorMessage(15)
749+
expectedErrorMessageContains = "missing log file for version 0"
754750
)
755-
testExpectedError[RuntimeException](
751+
testExpectedError[InvalidTableException](
756752
deltaFileStatuses((0L until 5L) ++ (6L until 9L)),
757753
expectedErrorMessageContains = "are not continuous"
758754
)
@@ -761,11 +757,11 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {
761757
.map(p => FileStatus.of(p.toString, 10, 10))
762758
.take(4)
763759
val deltas = deltaFileStatuses(10L to 13L)
764-
testExpectedError[RuntimeException](
760+
testExpectedError[InvalidTableException](
765761
corruptedCheckpointStatuses ++ deltas,
766762
Optional.empty(),
767763
Optional.empty(),
768-
expectedErrorMessageContains = expectedErrorMessage(10)
764+
expectedErrorMessageContains = "missing log file for version 0"
769765
)
770766
}
771767

0 commit comments

Comments
 (0)