Skip to content

Commit

Permalink
[SC-89410] Capture non-fatal exceptions while writing checkponts
Browse files Browse the repository at this point in the history
We discovered that we might throw an exception to the user when a checkpoint fails during a write command. However, this is undesirable, since the write has succeeded in the user's perspective, even if the checkpoint failed. Capture non-fatal exception types (don't try to capture fatal exceptions like JVM failures though), and log when this occurs

Updated existing unit tests that captured exceptions, and added a new one to test for different types of exceptions (like fs exceptions)

GitOrigin-RevId: 4911f90d5d46bac44defe561bf32ce817f50656c
  • Loading branch information
husseinnagr-db authored and tdas committed Dec 17, 2021
1 parent 2a367f4 commit 7e88e6b
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 25 deletions.
45 changes: 34 additions & 11 deletions core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.spark.sql.functions.{col, struct, when}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.Utils

/**
* Records information about a checkpoint.
Expand Down Expand Up @@ -129,21 +130,43 @@ trait Checkpoints extends DeltaLogging {

/**
* Creates a checkpoint using snapshotToCheckpoint. By default it uses the current log version.
* Note that this function captures and logs all exceptions, since the checkpoint shouldn't fail
* the overall commit operation.
*/
def checkpoint(snapshotToCheckpoint: Snapshot): Unit =
recordDeltaOperation(this, "delta.checkpoint") {
if (snapshotToCheckpoint.version < 0) {
throw DeltaErrors.checkpointNonExistTable(dataPath)
try {
if (snapshotToCheckpoint.version < 0) {
throw DeltaErrors.checkpointNonExistTable(dataPath)
}
val checkpointMetaData = writeCheckpointFiles(snapshotToCheckpoint)
val json = JsonUtils.toJson(checkpointMetaData)
store.write(
LAST_CHECKPOINT,
Iterator(json),
overwrite = true,
newDeltaHadoopConf())

doLogCleanup()
} catch {
// Catch all non-fatal exceptions, since the checkpoint is written after the commit
// has completed. From the perspective of the user, the commit completed successfully.
// However, throw if this is in a testing environment - that way any breaking changes
// can be caught in unit tests.
case NonFatal(e) =>
recordDeltaEvent(
snapshotToCheckpoint.deltaLog,
"delta.checkpoint.sync.error",
data = Map(
"exception" -> e.getMessage(),
"stackTrace" -> e.getStackTrace()
)
)
logWarning(s"Error when writing checkpoint synchronously", e)
if (Utils.isTesting) {
throw e
}
}
val checkpointMetaData = writeCheckpointFiles(snapshotToCheckpoint)
val json = JsonUtils.toJson(checkpointMetaData)
store.write(
LAST_CHECKPOINT,
Iterator(json),
overwrite = true,
newDeltaHadoopConf())

doLogCleanup()
}

protected def writeCheckpointFiles(snapshotToCheckpoint: Snapshot): CheckpointMetaData = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,14 +638,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite with SQLMetricsReport
protected def postCommit(commitVersion: Long): Unit = {
committed = true
if (shouldCheckpoint(commitVersion)) {
try {
// We checkpoint the version to be committed to so that no two transactions will checkpoint
// the same version.
deltaLog.checkpoint(deltaLog.getSnapshotAt(commitVersion))
} catch {
case e: IllegalStateException =>
logWarning("Failed to checkpoint table state.", e)
}
// We checkpoint the version to be committed to so that no two transactions will checkpoint
// the same version.
deltaLog.checkpoint(deltaLog.getSnapshotAt(commitVersion))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf}
import org.apache.spark.sql.delta.stats.FileSizeHistogram
import org.apache.spark.sql.delta.util.DeltaFileOperations
import org.apache.spark.sql.delta.util.FileNames.deltaFile

import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{AnalysisException, SparkSession}
Expand Down Expand Up @@ -232,12 +233,7 @@ trait DeltaCommand extends DeltaLogging {

logInfo(s"Committed delta #$attemptVersion to ${deltaLog.logPath}. Wrote $commitSize actions.")

try {
deltaLog.checkpoint(currentSnapshot)
} catch {
case e: IllegalStateException =>
logWarning("Failed to checkpoint table state.", e)
}
deltaLog.checkpoint(currentSnapshot)
}

/**
Expand Down

0 comments on commit 7e88e6b

Please sign in to comment.