Skip to content

Commit 59b4c8e

Browse files
alexoss68allisonport-db
alexoss68
authored andcommitted
Improve row metrics for UPDATE
GitOrigin-RevId: 26ec189bbdc45d795511c6f282e17f911b9a6621
1 parent 2d9968f commit 59b4c8e

File tree

3 files changed

+75
-75
lines changed

3 files changed

+75
-75
lines changed

core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala

-8
Original file line numberDiff line numberDiff line change
@@ -185,14 +185,6 @@ object DeltaOperations {
185185
override val parameters: Map[String, Any] = predicate.map("predicate" -> _).toMap
186186
override val operationMetrics: Set[String] = DeltaOperationMetrics.UPDATE
187187

188-
override def transformMetrics(metrics: Map[String, SQLMetric]): Map[String, String] = {
189-
val numOutputRows = metrics("numOutputRows").value
190-
val numUpdatedRows = metrics("numUpdatedRows").value
191-
var strMetrics = super.transformMetrics(metrics)
192-
val numCopiedRows = numOutputRows - numUpdatedRows
193-
strMetrics += "numCopiedRows" -> numCopiedRows.toString
194-
strMetrics
195-
}
196188
override def changesData: Boolean = true
197189
}
198190
/** Recorded when the table is created. */

core/src/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala

+2
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ case class UpdateCommand(
5858
"numAddedFiles" -> createMetric(sc, "number of files added."),
5959
"numRemovedFiles" -> createMetric(sc, "number of files removed."),
6060
"numUpdatedRows" -> createMetric(sc, "number of rows updated."),
61+
"numCopiedRows" -> createMetric(sc, "number of rows copied."),
6162
"executionTimeMs" -> createMetric(sc, "time taken to execute the entire operation"),
6263
"scanTimeMs" -> createMetric(sc, "time taken to scan the files for matches"),
6364
"rewriteTimeMs" -> createMetric(sc, "time taken to rewrite the matched files")
@@ -180,6 +181,7 @@ case class UpdateCommand(
180181
if (metrics("numUpdatedRows").value == 0 && outputRows != 0) {
181182
metrics("numUpdatedRows").set(outputRows)
182183
}
184+
metrics("numCopiedRows").set(outputRows - metrics("numUpdatedRows").value)
183185
txn.registerSQLMetrics(sparkSession, metrics)
184186
txn.commit(actions, DeltaOperations.Update(condition.map(_.toString)))
185187
// This is needed to make the SQL metrics visible in the Spark UI

core/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala

+73-67
Original file line numberDiff line numberDiff line change
@@ -713,81 +713,87 @@ trait DescribeDeltaHistorySuiteBase
713713
}
714714
}
715715

716+
def metricsUpdateTest : Unit = withTempDir { tempDir =>
717+
// Create the initial table as a single file
718+
Seq(1, 2, 5, 11, 21, 3, 4, 6, 9, 7, 8, 0).toDF("key")
719+
.withColumn("value", 'key % 2)
720+
.write
721+
.format("delta")
722+
.save(tempDir.getAbsolutePath)
723+
724+
// append additional data with the same number range to the table.
725+
// This data is saved as a separate file as well
726+
Seq(15, 16, 17).toDF("key")
727+
.withColumn("value", 'key % 2)
728+
.repartition(1)
729+
.write
730+
.format("delta")
731+
.mode("append")
732+
.save(tempDir.getAbsolutePath)
733+
val deltaTable = io.delta.tables.DeltaTable.forPath(spark, tempDir.getAbsolutePath)
734+
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
735+
deltaLog.snapshot.numOfFiles
736+
737+
// update the table
738+
deltaTable.update(col("key") === lit("16"), Map("value" -> lit("1")))
739+
// The file from the append gets updated but the file from the initial table gets scanned
740+
// as well. We want to make sure numCopied rows is calculated from written files and not
741+
// scanned files[SC-33980]
742+
743+
// get operation metrics
744+
val operationMetrics = getOperationMetrics(deltaTable.history(1))
745+
val expectedMetrics = Map(
746+
"numAddedFiles" -> "1",
747+
"numRemovedFiles" -> "1",
748+
"numUpdatedRows" -> "1",
749+
"numCopiedRows" -> "2" // There should be only three rows in total(updated + copied)
750+
)
751+
checkOperationMetrics(expectedMetrics, operationMetrics, DeltaOperationMetrics.UPDATE)
752+
val expectedTimeMetrics = Set("executionTimeMs", "scanTimeMs", "rewriteTimeMs")
753+
checkOperationTimeMetricsInvariant(expectedTimeMetrics, operationMetrics)
754+
}
755+
716756
test("operation metrics - update") {
717-
withSQLConf(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED.key -> "true") {
718-
withTempDir { tempDir =>
719-
// Create the initial table as a single file
720-
Seq(1, 2, 5, 11, 21, 3, 4, 6, 9, 7, 8, 0).toDF("key")
721-
.withColumn("value", 'key % 2)
722-
.write
723-
.format("delta")
724-
.save(tempDir.getAbsolutePath)
757+
withSQLConf((DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED.key -> "true")) {
758+
metricsUpdateTest
759+
}
760+
}
725761

726-
// append additional data with the same number range to the table.
727-
// This data is saved as a separate file as well
728-
Seq(15, 16, 17).toDF("key")
729-
.withColumn("value", 'key % 2)
730-
.repartition(1)
731-
.write
732-
.format("delta")
733-
.mode("append")
734-
.save(tempDir.getAbsolutePath)
735-
val deltaTable = io.delta.tables.DeltaTable.forPath(spark, tempDir.getAbsolutePath)
736-
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
737-
deltaLog.snapshot.numOfFiles
762+
def metricsUpdatePartitionedColumnTest : Unit = {
763+
val numRows = 100
764+
val numPartitions = 5
765+
withTempDir { tempDir =>
766+
spark.range(numRows)
767+
.withColumn("c1", 'id + 1)
768+
.withColumn("c2", 'id % numPartitions)
769+
.write
770+
.partitionBy("c2")
771+
.format("delta")
772+
.save(tempDir.getAbsolutePath)
738773

739-
// update the table
740-
deltaTable.update(col("key") === lit("16"), Map("value" -> lit("1")))
741-
// The file from the append gets updated but the file from the initial table gets scanned
742-
// as well. We want to make sure numCopied rows is calculated from written files and not
743-
// scanned files[SC-33980]
774+
val deltaTable = io.delta.tables.DeltaTable.forPath(tempDir.getAbsolutePath)
775+
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
776+
val numFilesBeforeUpdate = deltaLog.snapshot.numOfFiles
777+
deltaTable.update(col("c2") < 1, Map("c2" -> lit("1")))
778+
val numFilesAfterUpdate = deltaLog.snapshot.numOfFiles
744779

745-
// get operation metrics
746-
val operationMetrics = getOperationMetrics(deltaTable.history(1))
747-
val expectedMetrics = Map(
748-
"numAddedFiles" -> "1",
749-
"numRemovedFiles" -> "1",
750-
"numUpdatedRows" -> "1",
751-
"numCopiedRows" -> "2" // There should be only three rows in total(updated + copied)
752-
)
753-
checkOperationMetrics(expectedMetrics, operationMetrics, DeltaOperationMetrics.UPDATE)
754-
val expectedTimeMetrics = Set("executionTimeMs", "scanTimeMs", "rewriteTimeMs")
755-
checkOperationTimeMetricsInvariant(expectedTimeMetrics, operationMetrics)
756-
}
780+
val operationMetrics = getOperationMetrics(deltaTable.history(1))
781+
val newFiles = numFilesAfterUpdate - numFilesBeforeUpdate
782+
val oldFiles = numFilesBeforeUpdate / numPartitions
783+
val addedFiles = newFiles + oldFiles
784+
val expectedMetrics = Map(
785+
"numUpdatedRows" -> (numRows / numPartitions).toString,
786+
"numCopiedRows" -> "0",
787+
"numAddedFiles" -> addedFiles.toString,
788+
"numRemovedFiles" -> (numFilesBeforeUpdate / numPartitions).toString
789+
)
790+
checkOperationMetrics(expectedMetrics, operationMetrics, DeltaOperationMetrics.UPDATE)
757791
}
758792
}
759793

760794
test("operation metrics - update - partitioned column") {
761-
withSQLConf(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED.key -> "true") {
762-
val numRows = 100
763-
val numPartitions = 5
764-
withTempDir { tempDir =>
765-
spark.range(numRows)
766-
.withColumn("c1", 'id + 1)
767-
.withColumn("c2", 'id % numPartitions)
768-
.write
769-
.partitionBy("c2")
770-
.format("delta")
771-
.save(tempDir.getAbsolutePath)
772-
773-
val deltaTable = io.delta.tables.DeltaTable.forPath(tempDir.getAbsolutePath)
774-
val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath)
775-
val numFilesBeforeUpdate = deltaLog.snapshot.numOfFiles
776-
deltaTable.update(col("c2") < 1, Map("c2" -> lit("1")))
777-
val numFilesAfterUpdate = deltaLog.snapshot.numOfFiles
778-
779-
val operationMetrics = getOperationMetrics(deltaTable.history(1))
780-
val newFiles = numFilesAfterUpdate - numFilesBeforeUpdate
781-
val oldFiles = numFilesBeforeUpdate / numPartitions
782-
val addedFiles = newFiles + oldFiles
783-
val expectedMetrics = Map(
784-
"numUpdatedRows" -> (numRows / numPartitions).toString,
785-
"numCopiedRows" -> "0",
786-
"numAddedFiles" -> addedFiles.toString,
787-
"numRemovedFiles" -> (numFilesBeforeUpdate / numPartitions).toString
788-
)
789-
checkOperationMetrics(expectedMetrics, operationMetrics, DeltaOperationMetrics.UPDATE)
790-
}
795+
withSQLConf((DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED.key -> "true")) {
796+
metricsUpdatePartitionedColumnTest
791797
}
792798
}
793799

0 commit comments

Comments
 (0)