Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Add managed-commit table feature and CommitStore interface #2627

Closed
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.delta.hooks.AutoCompactType
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.{DataSkippingReader, StatisticsCollection}
import org.apache.spark.sql.delta.util.JsonUtils

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.util.{DateTimeConstants, IntervalUtils}
Expand Down Expand Up @@ -723,6 +724,27 @@ trait DeltaConfigsBase extends DeltaLogging {
_ => true,
"needs to be a boolean."
)

val MANAGED_COMMIT_OWNER_NAME = buildConfig[Option[String]](
"managedCommits.commitOwner-dev",
null,
v => Option(v),
_ => true,
"""The managed commit provider name for this table. This is used to determine which
|implementation of CommitStore to use when committing to this table. This must be present
|for tables where managed-commits are enabled.
|""".stripMargin)

val MANAGED_COMMIT_OWNER_CONF = buildConfig[Map[String, String]](
"managedCommits.commitOwnerConf-dev",
null,
v => JsonUtils.fromJson[Map[String, String]](Option(v).getOrElse("{}")),
_ => true,
"""The managed commit provider name for this table. This is used to determine which
|implementation of CommitStore to use when committing to this table. This must be present
|for tables where managed-commits are enabled.
|""".stripMargin)

}

object DeltaConfigs extends DeltaConfigsBase
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,7 @@ object DeltaLog extends DeltaLogging {
private type DeltaLogCacheKey = (Path, Map[String, String])

/** The name of the subdirectory that holds Delta metadata files */
private val LOG_DIR_NAME = "_delta_log"
private[delta] val LOG_DIR_NAME = "_delta_log"

private[delta] def logPathFor(dataPath: String): Path = logPathFor(new Path(dataPath))
private[delta] def logPathFor(dataPath: Path): Path =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite
tags = if (tags.nonEmpty) Some(tags) else None,
txnId = Some(txnId))

// Validate that the [[DeltaConfigs.MANAGED_COMMIT_PROVIDER_CONF]] is json parse-able.
DeltaConfigs.MANAGED_COMMIT_OWNER_CONF.fromMetaData(metadata)

val currentTransactionInfo = CurrentTransactionInfo(
txnId = txnId,
readPredicates = readPredicates.toSeq,
Expand Down
16 changes: 16 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ object TableFeature {
TestFeatureWithDependency,
TestFeatureWithTransitiveDependency,
TestWriterFeatureWithTransitiveDependency,
// managed-commits are under development and only available in testing.
ManagedCommitTableFeature,
// Row IDs are still under development and only available in testing.
RowTrackingFeature)
}
Expand Down Expand Up @@ -609,6 +611,20 @@ object V2CheckpointTableFeature
V2CheckpointPreDowngradeCommand(table)
}

/** Table feature to represent tables whose commits are managed by separate commit-store */
object ManagedCommitTableFeature
extends ReaderWriterFeature(name = "managed-commit-dev")
with FeatureAutomaticallyEnabledByMetadata {

override def automaticallyUpdateProtocolOfExistingTables: Boolean = true

override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata,
spark: SparkSession): Boolean = {
DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.fromMetaData(metadata).nonEmpty
}
}

/**
* Features below are for testing only, and are being registered to the system only in the testing
* environment. See [[TableFeature.allSupportedFeaturesMap]] for the registration.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.managedcommit

import org.apache.spark.sql.delta.{DeltaLog, SerializableFileStatus}
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.spark.sql.delta.util.FileNames
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileAlreadyExistsException, FileStatus, FileSystem, Path}

import org.apache.spark.internal.Logging

/**
* An abstract [[CommitStore]] which triggers backfills every n commits.
* - every commit version which satisfies `commitVersion % batchSize == 0` will trigger a backfill.
*/
trait AbstractBatchBackfillingCommitStore extends CommitStore with Logging {

/**
* Size of batch that should be backfilled. So every commit version which satisfies
* `commitVersion % batchSize == 0` will trigger a backfill.
*/
val batchSize: Long

/**
* Commit a given `commitFile` to the table represented by given `tablePath` at the
* given `commitVersion`
*/
protected def commitImpl(
logStore: LogStore,
hadoopConf: Configuration,
tablePath: Path,
commitVersion: Long,
commitFile: FileStatus,
commitTimestamp: Long): CommitResponse

override def commit(
logStore: LogStore,
hadoopConf: Configuration,
tablePath: Path,
commitVersion: Long,
actions: Iterator[String],
updatedActions: UpdatedActions): CommitResponse = {

val fs = tablePath.getFileSystem(hadoopConf)
if (batchSize <= 1) {
// Backfill until `commitVersion - 1`
logInfo(s"Making sure commits are backfilled until $commitVersion version for" +
s"table ${tablePath.toString}")
backfillToVersion(fs, logStore, hadoopConf, tablePath)
}

// Write new commit file in _commits directory
val fileStatus = writeCommitFile(logStore, hadoopConf, tablePath, commitVersion, actions)

// Do the actual commit
val commitTimestamp = updatedActions.commitInfo.getTimestamp
var commitResponse =
commitImpl(logStore, hadoopConf, tablePath, commitVersion, fileStatus, commitTimestamp)

// Backfill if needed
if (commitVersion == 0 || batchSize <= 1) {
// Always backfill zeroth commit or when batch size is configured as 1
val backfilledCommit =
backfill(fs, logStore, hadoopConf, tablePath, commitVersion, fileStatus)
val newCommit = commitResponse.commit.copy(serializableFileStatus = backfilledCommit)
commitResponse = commitResponse.copy(commit = newCommit)
} else if (commitVersion % batchSize == 0) {
logInfo(s"Making sure commits are backfilled till $commitVersion version for" +
s"table ${tablePath.toString}")
backfillToVersion(fs, logStore, hadoopConf, tablePath)
}
commitResponse
}

protected def writeCommitFile(
logStore: LogStore,
hadoopConf: Configuration,
tablePath: Path,
commitVersion: Long,
actions: Iterator[String]): FileStatus = {
val commitPath = FileNames.uuidDeltaFile(logPath(tablePath), commitVersion)
logStore.write(commitPath, actions, overwrite = false, hadoopConf)
commitPath.getFileSystem(hadoopConf).getFileStatus(commitPath)
}

/** Backfills commits from [`lastKnownBackfill`, `commitVersion - 1`] */
protected def backfillToVersion(
fs: FileSystem,
logStore: LogStore,
hadoopConf: Configuration,
tablePath: Path): Unit = {
getCommits(tablePath, startVersion = 0).foreach { case commit =>
val fileStatus = commit.serializableFileStatus.toFileStatus
backfill(fs, logStore, hadoopConf, tablePath, commit.version, fileStatus)
}
}

/** Backfills a given `fileStatus` to `version`.json */
protected def backfill(
fs: FileSystem,
logStore: LogStore,
hadoopConf: Configuration,
tablePath: Path,
version: Long,
fileStatus: FileStatus): SerializableFileStatus = {
val targetFile = FileNames.deltaFile(logPath(tablePath), version)
logInfo(s"Backfilling commit ${fileStatus.getPath} to ${targetFile.toString}")
try {
logStore.write(
targetFile,
logStore.readAsIterator(fileStatus, hadoopConf),
overwrite = false,
hadoopConf)
} catch {
case _: FileAlreadyExistsException =>
logInfo(s"The backfilled file $targetFile already exists.")
}
SerializableFileStatus.fromStatus(fs.getFileStatus(targetFile))
}

protected def logPath(tablePath: Path): Path = new Path(tablePath, DeltaLog.LOG_DIR_NAME)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.managedcommit

import scala.collection.mutable

import org.apache.spark.sql.delta.{DeltaConfigs, ManagedCommitTableFeature, SerializableFileStatus, SnapshotDescriptor}
import org.apache.spark.sql.delta.actions.{Action, CommitInfo, Metadata, Protocol}
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}

import org.apache.spark.internal.Logging


/** Representation of a commit file */
case class Commit(
version: Long,
serializableFileStatus: SerializableFileStatus,
commitTimestamp: Long)


/**
* Exception raised by [[CommitStore.commit]] method.
* | retryable | conflict | meaning |
* | no | no | something bad happened (e.g. auth failure) |
* | no | yes | permanent transaction conflict (e.g. multi-table commit failed) |
* | yes | no | transient error (e.g. network hiccup) |
* | yes | yes | physical conflict (allowed to rebase and retry) |
*/
class CommitFailedException(
retryable: Boolean, conflict: Boolean, message: String) extends Exception(message)

/** Response container for [[CommitStore.commit]] API */
case class CommitResponse(commit: Commit)

/** A container class to inform the CommitStore about any changes in Protocol/Metadata */
case class UpdatedActions(
commitInfo: CommitInfo,
newMetadata: Option[Metadata],
newProtocol: Option[Protocol])

/**
* CommitStore is responsible for managing commits for a managed-commit delta table.
* 1. It provides API to commit a new version of the table. See [[CommitStore.commit]] API.
* 2. It makes sure that commits are backfilled if/when needed
* 3. It also tracks and returns unbackfilled commits. See [[CommitStore.getCommits]] API.
*/
trait CommitStore {
/**
* API to commit the given set of `actions` to the table represented by given `tablePath` at the
* given `commitVersion`.
* @return CommitResponse which contains the file status of the commit file. If the commit is
* already backfilled, then the fileStatus could be omitted from response and the client
* could get the info by themselves.
*/
def commit(
logStore: LogStore,
hadoopConf: Configuration,
tablePath: Path,
commitVersion: Long,
actions: Iterator[String],
updatedActions: UpdatedActions): CommitResponse

/**
* API to get the un-backfilled commits for the table represented by the given `tablePath`.
* Commits older than `startVersion`, or newer than `endVersion` (if given), are ignored. The
* returned commits are contiguous and in ascending version order.
* Note that the first version returned by this API may not be equal to the `startVersion`. This
* happens when few versions starting from `startVersion` are already backfilled and so
* CommitStore may have stopped tracking them.
*
* @return a sequence of [[Commit]] which are tracked by [[CommitStore]].
*/
def getCommits(
tablePath: Path,
startVersion: Long,
endVersion: Option[Long] = None): Seq[Commit]
}

/** A builder interface for CommitStore */
trait CommitStoreBuilder {

/** Name of the commit-store */
def name: String

/** Returns a commit store based on the given conf */
def build(conf: Map[String, String]): CommitStore
}

/** Factory to get the correct CommitStore for a table */
object CommitStoreProvider {
// mapping from different commit-owner names to the corresponding [[CommitStoreBuilder]]s.
private val nameToBuilderMapping = mutable.Map.empty[String, CommitStoreBuilder]

/** Returns a [[CommitStore]] for the given `name` and `conf` */
def getCommitStore(name: String, conf: Map[String, String]): CommitStore = {
nameToBuilderMapping.get(name).map(_.build(conf)).getOrElse {
throw new IllegalArgumentException(s"Unknown commit store: $name")
}
}

/** Registers a new [[CommitStoreBuilder]] with the [[CommitStoreProvider]] */
def registerBuilder(commitStoreBuilder: CommitStoreBuilder): Unit = {
nameToBuilderMapping.get(commitStoreBuilder.name) match {
case Some(commitStoreBuilder: CommitStoreBuilder) =>
throw new IllegalArgumentException(s"commit store: ${commitStoreBuilder.name} already" +
s" registered with builder ${commitStoreBuilder.getClass.getName}")
case None =>
nameToBuilderMapping.put(commitStoreBuilder.name, commitStoreBuilder)
}
}

def getCommitStore(snapshotDescriptor: SnapshotDescriptor): Option[CommitStore] = {
DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.fromMetaData(snapshotDescriptor.metadata) match {
case Some(commitOwnerStr) =>
assert(snapshotDescriptor.protocol.isFeatureSupported(ManagedCommitTableFeature))
val conf = DeltaConfigs.MANAGED_COMMIT_OWNER_CONF.fromMetaData(snapshotDescriptor.metadata)
Some(CommitStoreProvider.getCommitStore(commitOwnerStr, conf))
case None =>
assert(!snapshotDescriptor.protocol.isFeatureSupported(ManagedCommitTableFeature),
s"Snapshot version ${snapshotDescriptor.version} doesn't have ManagedCommitTableFeature" +
s" but it has commitStore defined. Snapshot: $snapshotDescriptor")
None
}
}

// Visible only for UTs
private[delta] def clearNonDefaultBuilders(): Unit = synchronized {
val initialCommitStoreBuilderNames = initialCommitStoreBuilders.map(_.name).toSet
val extraKeys = nameToBuilderMapping.keys.filterNot(initialCommitStoreBuilderNames.contains)
nameToBuilderMapping --= extraKeys
}

val initialCommitStoreBuilders = Seq[CommitStoreBuilder](
// Any new commit-store builder will be registered here.
)
initialCommitStoreBuilders.foreach(registerBuilder)
}
Loading
Loading