Skip to content

Commit 4a401ea

Browse files
zsxwingvkorukanti
authored andcommitted
Call DeltaLog.update in Delta streaming source to ensure we use latest table schema
When a `DeltaLog` instance is cached, `DeltaSource` will get the cached `DeltaLog` when calling `DeltaLog.forTable`. However, it doesn't call `DeltaLog.update`. This means if nobody on the same cluster touches `DeltaLog`, running the streaming query on this cluster will always use a stale `Snapshot` in the cached `DeltaLog`. This breaks one use case: when a streaming query detects a schema change in a Delta table, it will fail. But when the streaming query gets restarted on the same cluster, it should recover and continue to run like running on a different cluster. Due to the above bug, the streaming query cannot get the latest schema (`DeltaSource.schema` is using the stale `Snapshot` to get the schema) and fail during restart. This PR adds the missing `update` calls to make sure `DeltaDataSource.sourceSchema` and `DeltaSource.schema` always get the latest table schema. The new added unit test. GitOrigin-RevId: b5488671ceaf942e48c4cbdb068b305fdc582d46
1 parent 155be0d commit 4a401ea

File tree

3 files changed

+64
-15
lines changed

3 files changed

+64
-15
lines changed

core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala

+5-6
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,8 @@ class DeltaDataSource
8888
throw DeltaErrors.timeTravelNotSupportedException
8989
}
9090

91-
val deltaLog = DeltaLog.forTable(sqlContext.sparkSession, path)
92-
val schemaToUse = ColumnWithDefaultExprUtils.removeDefaultExpressions(
93-
deltaLog.unsafeVolatileSnapshot.schema)
91+
val snapshot = DeltaLog.forTableWithSnapshot(sqlContext.sparkSession, path)._2
92+
val schemaToUse = ColumnWithDefaultExprUtils.removeDefaultExpressions(snapshot.schema)
9493
if (schemaToUse.isEmpty) {
9594
throw DeltaErrors.schemaNotSetException
9695
}
@@ -114,12 +113,12 @@ class DeltaDataSource
114113
val path = parameters.getOrElse("path", {
115114
throw DeltaErrors.pathNotSpecifiedException
116115
})
117-
val deltaLog = DeltaLog.forTable(sqlContext.sparkSession, path)
118-
if (deltaLog.unsafeVolatileSnapshot.schema.isEmpty) {
116+
val (deltaLog, snapshot) = DeltaLog.forTableWithSnapshot(sqlContext.sparkSession, path)
117+
if (snapshot.schema.isEmpty) {
119118
throw DeltaErrors.schemaNotSetException
120119
}
121120
val options = new DeltaOptions(parameters, sqlContext.sparkSession.sessionState.conf)
122-
DeltaSource(sqlContext.sparkSession, deltaLog, options)
121+
DeltaSource(sqlContext.sparkSession, deltaLog, options, snapshot)
123122
}
124123

125124
override def createSink(

core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala

+1-9
Original file line numberDiff line numberDiff line change
@@ -107,15 +107,6 @@ trait DeltaSourceBase extends Source
107107
with SupportsTriggerAvailableNow
108108
with DeltaLogging { self: DeltaSource =>
109109

110-
/**
111-
* Pin down the snapshot during initialization of DeltaSource so we could consistently use this
112-
* same snapshot across the lifespan of this Delta Source.
113-
*
114-
* Visible for testing.
115-
*/
116-
// TODO: Should this be pinned to the latest snapshot via deltaLog.update()?
117-
protected[delta] val snapshotAtSourceInit: Snapshot = deltaLog.unsafeVolatileSnapshot
118-
119110
/**
120111
* Flag that allows user to force enable unsafe streaming read on Delta table with
121112
* column mapping enabled AND drop/rename actions.
@@ -474,6 +465,7 @@ case class DeltaSource(
474465
spark: SparkSession,
475466
deltaLog: DeltaLog,
476467
options: DeltaOptions,
468+
snapshotAtSourceInit: Snapshot,
477469
filters: Seq[Expression] = Nil)
478470
extends DeltaSourceBase
479471
with DeltaSourceCDCSupport {

core/src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala

+58
Original file line numberDiff line numberDiff line change
@@ -1710,6 +1710,8 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase
17101710
addData()
17111711

17121712
val srcLog = DeltaLog.forTable(spark, srcData)
1713+
// Create a checkpoint so that we can create a snapshot without json files before version 3
1714+
srcLog.checkpoint()
17131715
// Delete the first file
17141716
assert(new File(FileNames.deltaFile(srcLog.logPath, 1).toUri).delete())
17151717

@@ -1743,6 +1745,8 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase
17431745
addData()
17441746

17451747
val srcLog = DeltaLog.forTable(spark, srcData)
1748+
// Create a checkpoint so that we can create a snapshot without json files before version 3
1749+
srcLog.checkpoint()
17461750
// Delete the second file
17471751
assert(new File(FileNames.deltaFile(srcLog.logPath, 2).toUri).delete())
17481752

@@ -1777,6 +1781,8 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase
17771781
addData()
17781782

17791783
val srcLog = DeltaLog.forTable(spark, srcData)
1784+
// Create a checkpoint so that we can create a snapshot without json files before version 3
1785+
srcLog.checkpoint()
17801786
// Delete the first file
17811787
assert(new File(FileNames.deltaFile(srcLog.logPath, 1).toUri).delete())
17821788

@@ -1811,6 +1817,8 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase
18111817
addData()
18121818

18131819
val srcLog = DeltaLog.forTable(spark, srcData)
1820+
// Create a checkpoint so that we can create a snapshot without json files before version 3
1821+
srcLog.checkpoint()
18141822
// Delete the second file
18151823
assert(new File(FileNames.deltaFile(srcLog.logPath, 2).toUri).delete())
18161824

@@ -2028,6 +2036,56 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase
20282036
Row(0, 0) :: Nil)
20292037
}
20302038
}
2039+
2040+
test("restarting a query should pick up latest table schema and recover") {
2041+
withTempDir { inputDir =>
2042+
withTempDir { checkpointDir =>
2043+
spark.range(10)
2044+
.write
2045+
.format("delta")
2046+
.mode("append")
2047+
.save(inputDir.getCanonicalPath)
2048+
2049+
// Store a `DeltaLog` instance outside the cache.
2050+
val deltaLog = DeltaLog.forTable(spark, inputDir.getCanonicalPath)
2051+
DeltaLog.clearCache()
2052+
2053+
def startQuery(): StreamingQuery = {
2054+
spark.readStream.format("delta")
2055+
.load(inputDir.getCanonicalPath)
2056+
.writeStream
2057+
.option("checkpointLocation", checkpointDir.getCanonicalPath)
2058+
.format("noop")
2059+
.start()
2060+
}
2061+
2062+
var q = startQuery()
2063+
try {
2064+
q.processAllAvailable()
2065+
2066+
// Change the table schema using the non-cached `DeltaLog` to mimic the case that the
2067+
// table schema change happens on a different cluster
2068+
val txn = deltaLog.startTransaction()
2069+
val oldSchema = deltaLog.snapshot.metadata.schema
2070+
val newSchema = StructType(Seq(oldSchema(0).copy(nullable = false)))
2071+
val newMetadata = deltaLog.snapshot.metadata.copy(schemaString = newSchema.json)
2072+
txn.commit(Seq(newMetadata), DeltaOperations.ManualUpdate)
2073+
2074+
// The streaming query should fail when detecting a schema change
2075+
val e = intercept[StreamingQueryException] {
2076+
q.processAllAvailable()
2077+
}
2078+
assert(e.getMessage.contains("Detected schema change"))
2079+
2080+
// Restarting the query should recover from the schema change error
2081+
q = startQuery()
2082+
q.processAllAvailable()
2083+
} finally {
2084+
q.stop()
2085+
}
2086+
}
2087+
}
2088+
}
20312089
}
20322090

20332091
/**

0 commit comments

Comments
 (0)