Skip to content

Commit 7e88e6b

Browse files
husseinnagr-dbtdas
authored andcommitted
[SC-89410] Capture non-fatal exceptions while writing checkponts
We discovered that we might throw an exception to the user when a checkpoint fails during a write command. However, this is undesirable, since the write has succeeded in the user's perspective, even if the checkpoint failed. Capture non-fatal exception types (don't try to capture fatal exceptions like JVM failures though), and log when this occurs Updated existing unit tests that captured exceptions, and added a new one to test for different types of exceptions (like fs exceptions) GitOrigin-RevId: 4911f90d5d46bac44defe561bf32ce817f50656c
1 parent 2a367f4 commit 7e88e6b

File tree

3 files changed

+39
-25
lines changed

3 files changed

+39
-25
lines changed

core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala

+34-11
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import org.apache.spark.sql.functions.{col, struct, when}
4141
import org.apache.spark.sql.internal.SQLConf
4242
import org.apache.spark.sql.types.StructType
4343
import org.apache.spark.util.SerializableConfiguration
44+
import org.apache.spark.util.Utils
4445

4546
/**
4647
* Records information about a checkpoint.
@@ -129,21 +130,43 @@ trait Checkpoints extends DeltaLogging {
129130

130131
/**
131132
* Creates a checkpoint using snapshotToCheckpoint. By default it uses the current log version.
133+
* Note that this function captures and logs all exceptions, since the checkpoint shouldn't fail
134+
* the overall commit operation.
132135
*/
133136
def checkpoint(snapshotToCheckpoint: Snapshot): Unit =
134137
recordDeltaOperation(this, "delta.checkpoint") {
135-
if (snapshotToCheckpoint.version < 0) {
136-
throw DeltaErrors.checkpointNonExistTable(dataPath)
138+
try {
139+
if (snapshotToCheckpoint.version < 0) {
140+
throw DeltaErrors.checkpointNonExistTable(dataPath)
141+
}
142+
val checkpointMetaData = writeCheckpointFiles(snapshotToCheckpoint)
143+
val json = JsonUtils.toJson(checkpointMetaData)
144+
store.write(
145+
LAST_CHECKPOINT,
146+
Iterator(json),
147+
overwrite = true,
148+
newDeltaHadoopConf())
149+
150+
doLogCleanup()
151+
} catch {
152+
// Catch all non-fatal exceptions, since the checkpoint is written after the commit
153+
// has completed. From the perspective of the user, the commit completed successfully.
154+
// However, throw if this is in a testing environment - that way any breaking changes
155+
// can be caught in unit tests.
156+
case NonFatal(e) =>
157+
recordDeltaEvent(
158+
snapshotToCheckpoint.deltaLog,
159+
"delta.checkpoint.sync.error",
160+
data = Map(
161+
"exception" -> e.getMessage(),
162+
"stackTrace" -> e.getStackTrace()
163+
)
164+
)
165+
logWarning(s"Error when writing checkpoint synchronously", e)
166+
if (Utils.isTesting) {
167+
throw e
168+
}
137169
}
138-
val checkpointMetaData = writeCheckpointFiles(snapshotToCheckpoint)
139-
val json = JsonUtils.toJson(checkpointMetaData)
140-
store.write(
141-
LAST_CHECKPOINT,
142-
Iterator(json),
143-
overwrite = true,
144-
newDeltaHadoopConf())
145-
146-
doLogCleanup()
147170
}
148171

149172
protected def writeCheckpointFiles(snapshotToCheckpoint: Snapshot): CheckpointMetaData = {

core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala

+3-8
Original file line numberDiff line numberDiff line change
@@ -638,14 +638,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite with SQLMetricsReport
638638
protected def postCommit(commitVersion: Long): Unit = {
639639
committed = true
640640
if (shouldCheckpoint(commitVersion)) {
641-
try {
642-
// We checkpoint the version to be committed to so that no two transactions will checkpoint
643-
// the same version.
644-
deltaLog.checkpoint(deltaLog.getSnapshotAt(commitVersion))
645-
} catch {
646-
case e: IllegalStateException =>
647-
logWarning("Failed to checkpoint table state.", e)
648-
}
641+
// We checkpoint the version to be committed to so that no two transactions will checkpoint
642+
// the same version.
643+
deltaLog.checkpoint(deltaLog.getSnapshotAt(commitVersion))
649644
}
650645
}
651646

core/src/main/scala/org/apache/spark/sql/delta/commands/DeltaCommand.scala

+2-6
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf}
2929
import org.apache.spark.sql.delta.stats.FileSizeHistogram
3030
import org.apache.spark.sql.delta.util.DeltaFileOperations
3131
import org.apache.spark.sql.delta.util.FileNames.deltaFile
32+
3233
import org.apache.hadoop.fs.Path
3334

3435
import org.apache.spark.sql.{AnalysisException, SparkSession}
@@ -232,12 +233,7 @@ trait DeltaCommand extends DeltaLogging {
232233

233234
logInfo(s"Committed delta #$attemptVersion to ${deltaLog.logPath}. Wrote $commitSize actions.")
234235

235-
try {
236-
deltaLog.checkpoint(currentSnapshot)
237-
} catch {
238-
case e: IllegalStateException =>
239-
logWarning("Failed to checkpoint table state.", e)
240-
}
236+
deltaLog.checkpoint(currentSnapshot)
241237
}
242238

243239
/**

0 commit comments

Comments
 (0)