Skip to content

Commit f5c35cd

Browse files
authored
Merge pull request delta-io#15 from delta-io/master
update fork
2 parents 5717ee3 + caa7a39 commit f5c35cd

File tree

14 files changed

+160
-53
lines changed

14 files changed

+160
-53
lines changed

.github/workflows/test.yaml

-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ jobs:
2828
# cache new stuff.
2929
key: delta-sbt-cache-spark3.2-scala${{ matrix.scala }}
3030
- name: Install Job dependencies
31-
shell: bash -l {0}
3231
run: |
3332
sudo apt-get update
3433
sudo apt-get install -y make build-essential libssl-dev zlib1g-dev libbz2-dev libreadline-dev libsqlite3-dev wget curl llvm libncurses5-dev libncursesw5-dev xz-utils tk-dev libffi-dev liblzma-dev python-openssl git

PROTOCOL.md

+12-10
Original file line numberDiff line numberDiff line change
@@ -164,18 +164,20 @@ Subsequent` metaData` actions completely overwrite the current metadata of the t
164164

165165
There can be at most one metadata action in a given version of the table.
166166

167+
Every metadata action **must** include required fields at a minimum.
168+
167169
The schema of the `metaData` action is as follows:
168170

169-
Field Name | Data Type | Description
170-
-|-|-
171-
id|`GUID`|Unique identifier for this table
172-
name|`String`| User-provided identifier for this table
173-
description|`String`| User-provided description for this table
174-
format|[Format Struct](#Format-Specification)| Specification of the encoding for the files stored in the table
175-
schemaString|[Schema Struct](#Schema-Serialization-Format)| Schema of the table
176-
partitionColumns|`Array[String]`| An array containing the names of columns by which the data should be partitioned
177-
createdTime|`Option[Long]`| The time when this metadata action is created, in milliseconds since the Unix epoch
178-
configuration|`Map[String, String]`| A map containing configuration options for the metadata action
171+
Field Name | Data Type | Description | optional/required
172+
-|-|-|-
173+
id|`GUID`|Unique identifier for this table | required
174+
name|`String`| User-provided identifier for this table | optional
175+
description|`String`| User-provided description for this table | optional
176+
format|[Format Struct](#Format-Specification)| Specification of the encoding for the files stored in the table | required
177+
schemaString|[Schema Struct](#Schema-Serialization-Format)| Schema of the table | required
178+
partitionColumns|`Array[String]`| An array containing the names of columns by which the data should be partitioned | required
179+
createdTime|`Option[Long]`| The time when this metadata action is created, in milliseconds since the Unix epoch | optional
180+
configuration|`Map[String, String]`| A map containing configuration options for the metadata action | required
179181

180182
#### Format Specification
181183
Field Name | Data Type | Description

core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala

+34-11
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import org.apache.spark.sql.functions.{col, struct, when}
4141
import org.apache.spark.sql.internal.SQLConf
4242
import org.apache.spark.sql.types.StructType
4343
import org.apache.spark.util.SerializableConfiguration
44+
import org.apache.spark.util.Utils
4445

4546
/**
4647
* Records information about a checkpoint.
@@ -129,21 +130,43 @@ trait Checkpoints extends DeltaLogging {
129130

130131
/**
131132
* Creates a checkpoint using snapshotToCheckpoint. By default it uses the current log version.
133+
* Note that this function captures and logs all exceptions, since the checkpoint shouldn't fail
134+
* the overall commit operation.
132135
*/
133136
def checkpoint(snapshotToCheckpoint: Snapshot): Unit =
134137
recordDeltaOperation(this, "delta.checkpoint") {
135-
if (snapshotToCheckpoint.version < 0) {
136-
throw DeltaErrors.checkpointNonExistTable(dataPath)
138+
try {
139+
if (snapshotToCheckpoint.version < 0) {
140+
throw DeltaErrors.checkpointNonExistTable(dataPath)
141+
}
142+
val checkpointMetaData = writeCheckpointFiles(snapshotToCheckpoint)
143+
val json = JsonUtils.toJson(checkpointMetaData)
144+
store.write(
145+
LAST_CHECKPOINT,
146+
Iterator(json),
147+
overwrite = true,
148+
newDeltaHadoopConf())
149+
150+
doLogCleanup()
151+
} catch {
152+
// Catch all non-fatal exceptions, since the checkpoint is written after the commit
153+
// has completed. From the perspective of the user, the commit completed successfully.
154+
// However, throw if this is in a testing environment - that way any breaking changes
155+
// can be caught in unit tests.
156+
case NonFatal(e) =>
157+
recordDeltaEvent(
158+
snapshotToCheckpoint.deltaLog,
159+
"delta.checkpoint.sync.error",
160+
data = Map(
161+
"exception" -> e.getMessage(),
162+
"stackTrace" -> e.getStackTrace()
163+
)
164+
)
165+
logWarning(s"Error when writing checkpoint synchronously", e)
166+
if (Utils.isTesting) {
167+
throw e
168+
}
137169
}
138-
val checkpointMetaData = writeCheckpointFiles(snapshotToCheckpoint)
139-
val json = JsonUtils.toJson(checkpointMetaData)
140-
store.write(
141-
LAST_CHECKPOINT,
142-
Iterator(json),
143-
overwrite = true,
144-
newDeltaHadoopConf())
145-
146-
doLogCleanup()
147170
}
148171

149172
protected def writeCheckpointFiles(snapshotToCheckpoint: Snapshot): CheckpointMetaData = {

core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala

+32-8
Original file line numberDiff line numberDiff line change
@@ -975,29 +975,53 @@ object DeltaErrors
975975
new AnalysisException("Cannot describe the history of a view.")
976976
}
977977

978-
def copyIntoEncryptionOnlyS3(scheme: String): Throwable = {
978+
def copyIntoEncryptionNotAllowedOn(scheme: String): Throwable = {
979+
// TODO: add `wasbs` once supported
979980
new IllegalArgumentException(
980-
s"Invalid scheme $scheme. COPY INTO source encryption is only supported for S3 paths.")
981+
s"Invalid scheme $scheme. " +
982+
s"COPY INTO source encryption currently only supports s3/s3n/s3a/abfss.")
981983
}
982984

983985
def copyIntoEncryptionSseCRequired(): Throwable = {
984986
new IllegalArgumentException(
985-
s"Invalid encryption type. COPY INTO source encryption must specify 'type' = 'SSE-C'.")
987+
s"Invalid encryption type. COPY INTO source encryption must specify 'TYPE' = 'AWS_SSE_C'.")
986988
}
987989

988990
def copyIntoEncryptionMasterKeyRequired(): Throwable = {
989991
new IllegalArgumentException(
990-
s"Invalid encryption arguments. COPY INTO source encryption must specify a masterKey.")
992+
s"Invalid encryption arguments. COPY INTO source encryption must specify a MASTER_KEY.")
991993
}
992994

993-
def copyIntoCredentialsOnlyS3(scheme: String): Throwable = {
995+
def copyIntoCredentialsNotAllowedOn(scheme: String): Throwable = {
996+
new IllegalArgumentException(
997+
s"Invalid scheme $scheme. " +
998+
s"COPY INTO source encryption currently only supports s3/s3n/s3a/wasbs/abfss.")
999+
}
1000+
1001+
def copyIntoCredentialsAllRequiredForS3(cause: Throwable): Throwable = {
1002+
new IllegalArgumentException(
1003+
"COPY INTO credentials must include AWS_ACCESS_KEY, AWS_SECRET_KEY, and AWS_SESSION_TOKEN.",
1004+
cause)
1005+
}
1006+
1007+
def copyIntoEncryptionRequiredForAzure(key: String, value: Option[String] = None): Throwable = {
9941008
new IllegalArgumentException(
995-
s"Invalid scheme $scheme. COPY INTO source credentials are only supported for S3 paths.")
1009+
if (value.nonEmpty) {
1010+
s"Invalid encryption option $key. " +
1011+
s"COPY INTO source encryption must specify '$key' = '${value.get}'."
1012+
} else {
1013+
s"COPY INTO source encryption must specify '$key'."
1014+
}
1015+
)
9961016
}
9971017

998-
def copyIntoCredentialsAllRequired(cause: Throwable): Throwable = {
1018+
def copyIntoEncryptionNotSupportedForAzure: Throwable = {
9991019
new IllegalArgumentException(
1000-
"COPY INTO credentials must include awsKeyId, awsSecretKey, and awsSessionToken.", cause)
1020+
"COPY INTO encryption only supports ADLS Gen2, or abfss:// file scheme")
1021+
}
1022+
1023+
def copyIntoCredentialsRequiredForAzure(key: String): Throwable = {
1024+
new IllegalArgumentException(s"COPY INTO source credentials must specify '$key'.")
10011025
}
10021026

10031027
def postCommitHookFailedException(

core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala

+7-10
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ case class CommitStats(
6969
isolationLevel: String,
7070
fileSizeHistogram: Option[FileSizeHistogram] = None,
7171
addFilesHistogram: Option[FileSizeHistogram] = None,
72-
removeFilesHistogram: Option[FileSizeHistogram] = None
72+
removeFilesHistogram: Option[FileSizeHistogram] = None,
73+
txnId: Option[String] = None
7374
)
7475

7576
/**
@@ -638,14 +639,9 @@ trait OptimisticTransactionImpl extends TransactionalWrite with SQLMetricsReport
638639
protected def postCommit(commitVersion: Long): Unit = {
639640
committed = true
640641
if (shouldCheckpoint(commitVersion)) {
641-
try {
642-
// We checkpoint the version to be committed to so that no two transactions will checkpoint
643-
// the same version.
644-
deltaLog.checkpoint(deltaLog.getSnapshotAt(commitVersion))
645-
} catch {
646-
case e: IllegalStateException =>
647-
logWarning("Failed to checkpoint table state.", e)
648-
}
642+
// We checkpoint the version to be committed to so that no two transactions will checkpoint
643+
// the same version.
644+
deltaLog.checkpoint(deltaLog.getSnapshotAt(commitVersion))
649645
}
650646
}
651647

@@ -785,7 +781,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite with SQLMetricsReport
785781
newMetadata = newMetadata,
786782
numAbsolutePathsInAdd = numAbsolutePaths,
787783
numDistinctPartitionsInAdd = distinctPartitions.size,
788-
isolationLevel = isolationLevel.toString)
784+
isolationLevel = isolationLevel.toString,
785+
txnId = Some(txnId))
789786
recordDeltaEvent(deltaLog, "delta.commit.stats", data = stats)
790787

791788
attemptVersion

core/src/main/scala/org/apache/spark/sql/delta/PreprocessTableMerge.scala

+1
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ case class PreprocessTableMerge(override val conf: SQLConf)
177177
}
178178
}
179179

180+
180181
val targetColNames = m.resolvedActions.map(_.targetColNameParts.head)
181182
if (targetColNames.distinct.size < targetColNames.size) {
182183
throw new AnalysisException(s"Duplicate column names in INSERT clause")

core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala

+14
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,9 @@ case class AddFile(
254254
.getOrElse(TimeUnit.MICROSECONDS.convert(modificationTime, TimeUnit.MILLISECONDS).toString)
255255
.toLong
256256

257+
@JsonIgnore
258+
lazy val numAutoCompactions: Int = tag(AddFile.Tags.NUM_AUTO_COMPACTIONS).getOrElse("0").toInt
259+
257260
def tag(tag: AddFile.Tags.KeyType): Option[String] =
258261
Option(tags).getOrElse(Map.empty).get(tag.name)
259262

@@ -297,6 +300,17 @@ object AddFile {
297300

298301
/** [[OPTIMIZE_TARGET_SIZE]]: target file size the file was optimized to. */
299302
object OPTIMIZE_TARGET_SIZE extends AddFile.Tags.KeyType("OPTIMIZE_TARGET_SIZE")
303+
304+
/**
305+
* [[NUM_AUTO_COMPACTIONS]]: The number of times Auto Compaction is applied to the content of
306+
* a file.
307+
*
308+
* Note: 'NUM_AUTO_OPTIMIZES' is used externally since Compaction is one of Optimize
309+
* command. By using 'NUM_AUTO_OPTIMIZES', it hides detail and can support other
310+
* optimize than Compaction. 'NUM_AUTO_COMPACTIONS' is used internally before current
311+
* only Auto Compaction is using it.
312+
*/
313+
object NUM_AUTO_COMPACTIONS extends AddFile.Tags.KeyType("NUM_AUTO_OPTIMIZES")
300314
}
301315

302316
/** Convert a [[Tags.KeyType]] to a string to be used in the AddMap.tags Map[String, String]. */

core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package org.apache.spark.sql.delta.catalog
1818

1919
import java.util
2020
import java.util.Locale
21+
2122
// scalastyle:off import.ordering.noEmptyLine
2223

2324
import scala.collection.JavaConverters._
@@ -43,6 +44,7 @@ import org.apache.spark.sql.connector.catalog.TableCapability._
4344
import org.apache.spark.sql.connector.catalog.TableChange._
4445
import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, IdentityTransform, Transform}
4546
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, V1Write, WriteBuilder}
47+
import org.apache.spark.sql.errors.QueryCompilationErrors
4648
import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils}
4749
import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
4850
import org.apache.spark.sql.internal.SQLConf

core/src/main/scala/org/apache/spark/sql/delta/commands/DeltaCommand.scala

+2-6
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf}
2929
import org.apache.spark.sql.delta.stats.FileSizeHistogram
3030
import org.apache.spark.sql.delta.util.DeltaFileOperations
3131
import org.apache.spark.sql.delta.util.FileNames.deltaFile
32+
3233
import org.apache.hadoop.fs.Path
3334

3435
import org.apache.spark.sql.{AnalysisException, SparkSession}
@@ -232,12 +233,7 @@ trait DeltaCommand extends DeltaLogging {
232233

233234
logInfo(s"Committed delta #$attemptVersion to ${deltaLog.logPath}. Wrote $commitSize actions.")
234235

235-
try {
236-
deltaLog.checkpoint(currentSnapshot)
237-
} catch {
238-
case e: IllegalStateException =>
239-
logWarning("Failed to checkpoint table state.", e)
240-
}
236+
deltaLog.checkpoint(currentSnapshot)
241237
}
242238

243239
/**

core/src/main/scala/org/apache/spark/sql/delta/commands/WriteIntoDelta.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ case class WriteIntoDelta(
117117
// change the actual behavior, but makes DESC TABLE to show varchar instead of char.
118118
val dataSchema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(
119119
replaceCharWithVarchar(CharVarcharUtils.getRawSchema(data.schema)).asInstanceOf[StructType])
120-
updateMetadata(data.sparkSession, txn, schemaInCatalog.getOrElse(dataSchema),
120+
var finalSchema = schemaInCatalog.getOrElse(dataSchema)
121+
updateMetadata(data.sparkSession, txn, finalSchema,
121122
partitionColumns, configuration, isOverwriteOperation, rearrangeOnly)
122123

123124
val replaceOnDataColsEnabled =

core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path
2424

2525
import org.apache.spark.SparkContext
2626
import org.apache.spark.sql._
27+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2728
import org.apache.spark.sql.execution.SQLExecution
2829
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
2930
import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric
@@ -64,6 +65,7 @@ class DeltaSink(
6465
throw DeltaErrors.streamWriteNullTypeException
6566
}
6667

68+
6769
// If the batch reads the same Delta table as this sink is going to write to, then this
6870
// write has dependencies. Then make sure that this commit set hasDependencies to true
6971
// by injecting a read on the whole table. This needs to be done explicitly because

core/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingTestUtils.scala

+45-3
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.delta
1919
import java.io.File
2020

2121
import org.apache.spark.sql.delta.schema.SchemaUtils
22+
import io.delta.tables.{DeltaTable => OSSDeltaTable}
2223
import org.apache.hadoop.fs.Path
2324

2425
import org.apache.spark.SparkConf
@@ -341,23 +342,64 @@ trait DeltaColumnMappingTestUtilsBase extends SharedSparkSession {
341342
}
342343
}
343344

345+
/**
346+
* Standard CONVERT TO DELTA
347+
* @param tableOrPath String
348+
*/
349+
protected def convertToDelta(tableOrPath: String): Unit = {
350+
sql(s"CONVERT TO DELTA $tableOrPath")
351+
}
352+
344353
}
345354

346355
trait DeltaColumnMappingTestUtils extends DeltaColumnMappingTestUtilsBase
347356

348-
349357
/**
350358
* Include this trait to enable Id column mapping mode for a suite
351359
*/
352-
trait DeltaColumnMappingEnableIdMode extends SharedSparkSession {
360+
trait DeltaColumnMappingEnableIdMode extends SharedSparkSession
361+
with DeltaColumnMappingTestUtils {
353362
protected override def sparkConf: SparkConf =
354363
super.sparkConf.set(DeltaConfigs.COLUMN_MAPPING_MODE.defaultTablePropertyKey, "id")
364+
365+
/**
366+
* CONVERT TO DELTA blocked in id mode
367+
*/
368+
protected override def convertToDelta(tableOrPath: String): Unit =
369+
throw DeltaErrors.convertToDeltaWithColumnMappingNotSupported(
370+
DeltaColumnMappingMode(columnMappingModeString)
371+
)
355372
}
356373

357374
/**
358375
* Include this trait to enable Name column mapping mode for a suite
359376
*/
360-
trait DeltaColumnMappingEnableNameMode extends SharedSparkSession {
377+
trait DeltaColumnMappingEnableNameMode extends SharedSparkSession
378+
with DeltaColumnMappingTestUtils {
379+
361380
protected override def sparkConf: SparkConf =
362381
super.sparkConf.set(DeltaConfigs.COLUMN_MAPPING_MODE.defaultTablePropertyKey, "name")
382+
383+
/**
384+
* CONVERT TO DELTA can be possible under name mode in tests
385+
*/
386+
protected override def convertToDelta(tableOrPath: String): Unit = {
387+
withColumnMappingConf("none") {
388+
super.convertToDelta(tableOrPath)
389+
}
390+
391+
val deltaPath = if (tableOrPath.contains("parquet") && tableOrPath.contains("`")) {
392+
// parquet.`PATH`
393+
s"""delta.${tableOrPath.split('.').last}"""
394+
} else {
395+
tableOrPath
396+
}
397+
398+
sql(s"""ALTER TABLE $deltaPath SET TBLPROPERTIES (
399+
|${DeltaConfigs.COLUMN_MAPPING_MODE.key} = 'name',
400+
|${DeltaConfigs.MIN_READER_VERSION.key} = '2',
401+
|${DeltaConfigs.MIN_WRITER_VERSION.key} = '5'
402+
|)""".stripMargin)
403+
}
404+
363405
}

core/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala

+6-2
Original file line numberDiff line numberDiff line change
@@ -1392,9 +1392,13 @@ class DeltaSuite extends QueryTest
13921392
withTempDir { tempDir =>
13931393
val path = tempDir.getCanonicalPath + "/table"
13941394
spark.range(10).write.format("parquet").save(path)
1395-
sql(s"CONVERT TO DELTA parquet.`$path`")
1395+
convertToDelta(s"parquet.`$path`")
13961396

1397-
assert(spark.conf.get(DeltaSQLConf.DELTA_LAST_COMMIT_VERSION_IN_SESSION) === Some(0))
1397+
// In column mapping (name mode), we perform convertToDelta with a CONVERT and an ALTER,
1398+
// so the version has been updated
1399+
val commitVersion = if (columnMappingEnabled) 1 else 0
1400+
assert(spark.conf.get(DeltaSQLConf.DELTA_LAST_COMMIT_VERSION_IN_SESSION) ===
1401+
Some(commitVersion))
13981402
}
13991403
}
14001404

version.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
ThisBuild / version := "1.1.0-SNAPSHOT"
1+
ThisBuild / version := "1.2.0-SNAPSHOT"

0 commit comments

Comments
 (0)