Skip to content


[Spark] Read support for log compactions
Browse files Browse the repository at this point in the history
This PR adds read support for log compactions described here: #2072

Closes #2073

GitOrigin-RevId: 6f4a09c3fa09c303cdeb747c382cedcfda5a2a4c
  • Loading branch information
prakharjain09 authored and vkorukanti committed Oct 2, 2023
1 parent 2d92266 commit 0e05caf
Show file tree
Hide file tree
Showing 4 changed files with 605 additions and 28 deletions.
156 changes: 130 additions & 26 deletions spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package
import java.util.Objects

import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal

Expand All @@ -29,6 +30,7 @@ import
import com.fasterxml.jackson.annotation.JsonIgnore
import org.apache.hadoop.fs.{BlockLocation, FileStatus, LocatedFileStatus, Path}

Expand Down Expand Up @@ -105,19 +107,24 @@ trait SnapshotManagement { self: DeltaLog =>
* @return Some array of files found (possibly empty, if no usable commit files are present), or
* None if the listing returned no files at all.
protected final def listDeltaAndCheckpointFiles(
protected final def listDeltaCompactedDeltaAndCheckpointFiles(
startVersion: Long,
versionToLoad: Option[Long]): Option[Array[FileStatus]] =
versionToLoad: Option[Long],
includeMinorCompactions: Boolean): Option[Array[FileStatus]] =
recordDeltaOperation(self, "delta.deltaLog.listDeltaAndCheckpointFiles") {
listFromOrNone(startVersion).map { _
// Pick up all checkpoint and delta files
.filter { file => isDeltaCommitOrCheckpointFile(file.getPath) }
// Checkpoint files of 0 size are invalid but Spark will ignore them silently when reading
// such files, hence we drop them so that we never pick up such checkpoints.
.filterNot { file => isCheckpointFile(file) && file.getLen == 0 }
.collect {
case DeltaFile(f, fileVersion) =>
(f, fileVersion)
case CompactedDeltaFile(f, startVersion, endVersion)
if includeMinorCompactions && versionToLoad.forall(endVersion <= _) =>
(f, startVersion)
case CheckpointFile(f, fileVersion) if f.getLen > 0 =>
(f, fileVersion)
// take files until the version we want to load
.takeWhile(f => versionToLoad.forall(getFileVersion(f) <= _))
.takeWhile { case (_, fileVersion) => versionToLoad.forall(fileVersion <= _) }

Expand Down Expand Up @@ -146,12 +153,18 @@ trait SnapshotManagement { self: DeltaLog =>
// if that is -1, list from version 0L
val lastCheckpointVersion = getCheckpointVersion(lastCheckpointInfo, oldCheckpointProviderOpt)
val listingStartVersion = Math.max(0L, lastCheckpointVersion)
val newFiles = listDeltaAndCheckpointFiles(listingStartVersion, versionToLoad)
val includeMinorCompactions =
val newFiles = listDeltaCompactedDeltaAndCheckpointFiles(
startVersion = listingStartVersion,
versionToLoad = versionToLoad,
includeMinorCompactions = includeMinorCompactions)
oldCheckpointProviderOpt = oldCheckpointProviderOpt,
lastCheckpointInfo = lastCheckpointInfo)
lastCheckpointInfo = lastCheckpointInfo

Expand Down Expand Up @@ -185,6 +198,7 @@ trait SnapshotManagement { self: DeltaLog =>
selectedDeltas.headOption.foreach { headDelta =>
val headDeltaVersion = deltaVersion(headDelta)
val lastDeltaVersion = selectedDeltas.last match {
case CompactedDeltaFile(_, _, endV) => endV
case DeltaFile(_, v) => v

Expand All @@ -195,6 +209,7 @@ trait SnapshotManagement { self: DeltaLog =>
unsafeVolatileMetadata) // metadata is best-effort only
val deltaVersions = selectedDeltas.flatMap {
case CompactedDeltaFile(_, startV, endV) => (startV to endV)
case DeltaFile(_, v) => Seq(v)
verifyDeltaVersions(spark, deltaVersions, Some(checkpointVersion + 1), versionToLoad)
Expand All @@ -216,13 +231,13 @@ trait SnapshotManagement { self: DeltaLog =>
.getOrElse {
// No files found even when listing from 0 => empty directory => table does not exist yet.
if (lastCheckpointVersion < 0) return None
// [SC-95011] FIXME(ryan.johnson): We always write the commit and checkpoint files
// before updating _last_checkpoint. If the listing came up empty, then we either
// encountered a list-after-put inconsistency in the underlying log store, or somebody
// corrupted the table by deleting files. Either way, we can't safely continue.
// We always write the commit and checkpoint files before updating _last_checkpoint.
// If the listing came up empty, then we either encountered a list-after-put
// inconsistency in the underlying log store, or somebody corrupted the table by
// deleting files. Either way, we can't safely continue.
// For now, we preserve existing behavior by returning Array.empty, which will trigger a
// recursive call to [[getLogSegmentForVersion]] below (same as before the refactor).
// recursive call to [[getLogSegmentForVersion]] below.

Expand All @@ -235,7 +250,8 @@ trait SnapshotManagement { self: DeltaLog =>
// singleton, so try listing from the first version
return getLogSegmentForVersion(versionToLoad = versionToLoad)
val (checkpoints, deltas) = newFiles.partition(isCheckpointFile)
val (checkpoints, deltasAndCompactedDeltas) = newFiles.partition(isCheckpointFile)
val (deltas, compactedDeltas) = deltasAndCompactedDeltas.partition(isDeltaFile)
// Find the latest checkpoint in the listing that is not older than the versionToLoad
val checkpointFiles = => CheckpointInstance(f.getPath))
val newCheckpoint = getLatestCompleteCheckpointFromList(checkpointFiles, versionToLoad)
Expand All @@ -246,9 +262,8 @@ trait SnapshotManagement { self: DeltaLog =>
// `startCheckpoint` was given but no checkpoint found on delta log. This means that the
// last checkpoint we thought should exist (the `_last_checkpoint` file) no longer exists.
// Try to look up another valid checkpoint and create `LogSegment` from it.
// [SC-95011] FIXME(ryan.johnson): Something has gone very wrong if the checkpoint doesn't
// exist at all. This code should only handle rejected incomplete checkpoints.
// This case can arise if the user deleted the table (all commits and checkpoints) but
// left the _last_checkpoint intact.
recordDeltaEvent(this, "delta.checkpoint.error.partial")
val snapshotVersion = versionToLoad.getOrElse(deltaVersion(deltas.last))
getLogSegmentWithMaxExclusiveCheckpointVersion(snapshotVersion, lastCheckpointVersion)
Expand All @@ -268,6 +283,10 @@ trait SnapshotManagement { self: DeltaLog =>
deltaVersion(file) > newCheckpointVersion

// Here we validate that we are able to create a valid LogSegment by just using commit deltas
// and without considering minor-compacted deltas. We want to fail early if log is messed up
// i.e. some commit deltas are missing (although compacted-deltas are present).
validateDeltaVersions(deltasAfterCheckpoint, newCheckpointVersion, versionToLoad)

val newVersion =
Expand All @@ -288,17 +307,96 @@ trait SnapshotManagement { self: DeltaLog =>
val lastCommitTimestamp = deltas.last.getModificationTime

validateDeltaVersions(deltasAfterCheckpoint, newCheckpointVersion, versionToLoad)
val deltasAndCompactedDeltasForLogSegment = useCompactedDeltasForLogSegment(
latestCommitVersion = newVersion,
checkpointVersionToUse = newCheckpointVersion)

deltasAndCompactedDeltasForLogSegment, newCheckpointVersion, versionToLoad)


* @param deltasAndCompactedDeltas - all deltas or compacted deltas which could be used
* @param deltasAfterCheckpoint - deltas after the last checkpoint file
* @param latestCommitVersion - commit version for which we are trying to create Snapshot for
* @param checkpointVersionToUse - underlying checkpoint version to use in Snapshot, -1 if no
* checkpoint is used.
* @return Returns a list of deltas/compacted-deltas which can be used to construct the
* [[LogSegment]] instead of `deltasAfterCheckpoint`.
protected def useCompactedDeltasForLogSegment(
deltasAndCompactedDeltas: Seq[FileStatus],
deltasAfterCheckpoint: Array[FileStatus],
latestCommitVersion: Long,
checkpointVersionToUse: Long): Array[FileStatus] = {

val selectedDeltas = mutable.ArrayBuffer.empty[FileStatus]
var highestVersionSeen = checkpointVersionToUse
val commitRangeCovered = mutable.ArrayBuffer.empty[Long]
// track if there is at least 1 compacted delta in `deltasAndCompactedDeltas`
var hasCompactedDeltas = false
for (file <- deltasAndCompactedDeltas) {
val (startVersion, endVersion) = file match {
case CompactedDeltaFile(_, startVersion, endVersion) =>
hasCompactedDeltas = true
(startVersion, endVersion)
case DeltaFile(_, version) =>
(version, version)

// select the compacted delta if the startVersion doesn't straddle `highestVersionSeen` and
// the endVersion doesn't cross the latestCommitVersion.
if (highestVersionSeen < startVersion && endVersion <= latestCommitVersion) {
commitRangeCovered.appendAll(startVersion to endVersion)
selectedDeltas += file
highestVersionSeen = endVersion
// If there are no compacted deltas in the `deltasAndCompactedDeltas` list, return from this
// method.
if (!hasCompactedDeltas) return deltasAfterCheckpoint
// Validation-1: Commits represented by `compactedDeltasToUse` should be unique and there must
// not be any duplicates.
val coveredCommits = commitRangeCovered.toSet
val hasDuplicates = (commitRangeCovered.size != coveredCommits.size)

// Validation-2: All commits from (CheckpointVersion + 1) to latestCommitVersion should be
// either represented by compacted delta or by the delta.
val requiredCommits = (checkpointVersionToUse + 1) to latestCommitVersion
val missingCommits = requiredCommits.toSet -- coveredCommits
if (!hasDuplicates && missingCommits.isEmpty) return selectedDeltas.toArray

// If the above check failed, that means the compacted delta validation failed.
// Just record that event and return just the deltas (deltasAfterCheckpoint).
val eventData = Map(
"deltasAndCompactedDeltas" ->,
"deltasAfterCheckpoint" ->,
"latestCommitVersion" -> latestCommitVersion,
"checkpointVersionToUse" -> checkpointVersionToUse,
"hasDuplicates" -> hasDuplicates,
"missingCommits" -> missingCommits
deltaLog = this,
opType = "delta.getLogSegmentForVersion.compactedDeltaValidationFailed",
data = eventData)
if (Utils.isTesting) {
assert(false, s"Validation around Compacted deltas failed while creating Snapshot. " +

* Load the Snapshot for this Delta table at initialization. This method uses the `lastCheckpoint`
* file as a hint on where to start listing the transaction log directory. If the _delta_log
Expand Down Expand Up @@ -398,10 +496,11 @@ trait SnapshotManagement { self: DeltaLog =>
if (upperBoundVersion > 0) findLastCompleteCheckpointBefore(upperBoundVersion) else None
previousCp match {
case Some(cp) =>
val filesSinceCheckpointVersion = listDeltaAndCheckpointFiles(
val filesSinceCheckpointVersion = listDeltaCompactedDeltaAndCheckpointFiles(
startVersion = cp.version,
versionToLoad = Some(snapshotVersion))
versionToLoad = Some(snapshotVersion),
includeMinorCompactions = false
val (checkpoints, deltas) = filesSinceCheckpointVersion.partition(isCheckpointFile)
if (deltas.isEmpty) {
// We cannot find any delta files. Returns None as we cannot construct a `LogSegment` only
Expand Down Expand Up @@ -436,8 +535,13 @@ trait SnapshotManagement { self: DeltaLog =>
case None =>
val listFromResult =
startVersion = 0,
versionToLoad = Some(snapshotVersion),
includeMinorCompactions = false)
val (deltas, deltaVersions) =
listDeltaAndCheckpointFiles(startVersion = 0, versionToLoad = Some(snapshotVersion))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1302,6 +1302,13 @@ trait DeltaSQLConfBase {

.doc("If true, minor compacted delta log files will be used for creating Snapshots")

val ICEBERG_MAX_COMMITS_TO_CONVERT = buildConf("iceberg.maxPendingCommits")
|The maximum number of pending Delta commits to convert to Iceberg incrementally.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import org.apache.hadoop.fs.{FileStatus, Path}
object FileNames {

val deltaFileRegex = raw"(\d+)\.json".r
val compactedDeltaFileRegex = raw"(\d+).(\d+).compacted.json".r
val checksumFileRegex = raw"(\d+)\.crc".r
val checkpointFileRegex = raw"(\d+)\.checkpoint((\.\d+\.\d+)?\.parquet|\.[^.]+\.(json|parquet))".r

val deltaFilePattern = deltaFileRegex.pattern
val compactedDeltaFilePattern = compactedDeltaFileRegex.pattern
val checksumFilePattern = checksumFileRegex.pattern
val checkpointFilePattern = checkpointFileRegex.pattern

Expand All @@ -40,6 +42,14 @@ object FileNames {
/** Returns the path to the checksum file for the given version. */
def checksumFile(path: Path, version: Long): Path = new Path(path, f"$version%020d.crc")

/** Returns the path to the compacted delta file for the given version range. */
def compactedDeltaFile(
path: Path,
fromVersion: Long,
toVersion: Long): Path = {
new Path(path, f"$fromVersion%020d.$toVersion%020d.compacted.json")

/** Returns the version for the given delta path. */
def deltaVersion(path: Path): Long = path.getName.split("\\.")(0).toLong
def deltaVersion(file: FileStatus): Long = deltaVersion(file.getPath)
Expand All @@ -48,6 +58,12 @@ object FileNames {
def checksumVersion(path: Path): Long = path.getName.stripSuffix(".crc").toLong
def checksumVersion(file: FileStatus): Long = checksumVersion(file.getPath)

def compactedDeltaVersions(path: Path): (Long, Long) = {
val parts = path.getName.split("\\.")
(parts(0).toLong, parts(1).toLong)
def compactedDeltaVersions(file: FileStatus): (Long, Long) = compactedDeltaVersions(file.getPath)

* Returns the prefix of all delta log files for the given version.
Expand Down Expand Up @@ -93,9 +109,23 @@ object FileNames {
def isChecksumFile(path: Path): Boolean = checksumFilePattern.matcher(path.getName).matches()
def isChecksumFile(file: FileStatus): Boolean = isChecksumFile(file.getPath)

def isCompactedDeltaFile(path: Path): Boolean =
def isCompactedDeltaFile(file: FileStatus): Boolean = isCompactedDeltaFile(file.getPath)

def checkpointVersion(path: Path): Long = path.getName.split("\\.")(0).toLong
def checkpointVersion(file: FileStatus): Long = checkpointVersion(file.getPath)

object CompactedDeltaFile {
def unapply(f: FileStatus): Option[(FileStatus, Long, Long)] =
unapply(f.getPath).map { case (_, startVersion, endVersion) => (f, startVersion, endVersion) }
def unapply(path: Path): Option[(Path, Long, Long)] = path.getName match {
case compactedDeltaFileRegex(lo, hi) => Some(path, lo.toLong, hi.toLong)
case _ => None

* Get the version of the checkpoint, checksum or delta file. Returns None if an unexpected
* file type is seen.
Expand All @@ -104,6 +134,7 @@ object FileNames {
case DeltaFile(_, version) => Some(version)
case ChecksumFile(_, version) => Some(version)
case CheckpointFile(_, version) => Some(version)
case CompactedDeltaFile(_, _, endVersion) => Some(endVersion)
case _ => None

Expand Down Expand Up @@ -145,10 +176,9 @@ object FileNames {

object FileType extends Enumeration {

/** File path for a new V2 Checkpoint Json file */
def newV2CheckpointJsonFile(path: Path, version: Long): Path =
new Path(path, f"$version%020d.checkpoint.${UUID.randomUUID.toString}.json")
Expand Down

0 comments on commit 0e05caf

Please sign in to comment.