From 59b4c8e876c23404b8a6075c53d691a87896cce9 Mon Sep 17 00:00:00 2001 From: alexoss68 Date: Thu, 10 Mar 2022 09:35:00 -0500 Subject: [PATCH] Improve row metrics for UPDATE GitOrigin-RevId: 26ec189bbdc45d795511c6f282e17f911b9a6621 --- .../spark/sql/delta/DeltaOperations.scala | 8 - .../sql/delta/commands/UpdateCommand.scala | 2 + .../sql/delta/DescribeDeltaHistorySuite.scala | 140 +++++++++--------- 3 files changed, 75 insertions(+), 75 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala index d125025b472..6c54d8e40f4 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala @@ -185,14 +185,6 @@ object DeltaOperations { override val parameters: Map[String, Any] = predicate.map("predicate" -> _).toMap override val operationMetrics: Set[String] = DeltaOperationMetrics.UPDATE - override def transformMetrics(metrics: Map[String, SQLMetric]): Map[String, String] = { - val numOutputRows = metrics("numOutputRows").value - val numUpdatedRows = metrics("numUpdatedRows").value - var strMetrics = super.transformMetrics(metrics) - val numCopiedRows = numOutputRows - numUpdatedRows - strMetrics += "numCopiedRows" -> numCopiedRows.toString - strMetrics - } override def changesData: Boolean = true } /** Recorded when the table is created. */ diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala index 5fa2358cedb..be5dc493e6b 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/UpdateCommand.scala @@ -58,6 +58,7 @@ case class UpdateCommand( "numAddedFiles" -> createMetric(sc, "number of files added."), "numRemovedFiles" -> createMetric(sc, "number of files removed."), "numUpdatedRows" -> createMetric(sc, "number of rows updated."), + "numCopiedRows" -> createMetric(sc, "number of rows copied."), "executionTimeMs" -> createMetric(sc, "time taken to execute the entire operation"), "scanTimeMs" -> createMetric(sc, "time taken to scan the files for matches"), "rewriteTimeMs" -> createMetric(sc, "time taken to rewrite the matched files") @@ -180,6 +181,7 @@ case class UpdateCommand( if (metrics("numUpdatedRows").value == 0 && outputRows != 0) { metrics("numUpdatedRows").set(outputRows) } + metrics("numCopiedRows").set(outputRows - metrics("numUpdatedRows").value) txn.registerSQLMetrics(sparkSession, metrics) txn.commit(actions, DeltaOperations.Update(condition.map(_.toString))) // This is needed to make the SQL metrics visible in the Spark UI diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala index f3868c00391..1c827a79375 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DescribeDeltaHistorySuite.scala @@ -713,81 +713,87 @@ trait DescribeDeltaHistorySuiteBase } } + def metricsUpdateTest : Unit = withTempDir { tempDir => + // Create the initial table as a single file + Seq(1, 2, 5, 11, 21, 3, 4, 6, 9, 7, 8, 0).toDF("key") + .withColumn("value", 'key % 2) + .write + .format("delta") + .save(tempDir.getAbsolutePath) + + // append additional data with the same number range to the table. + // This data is saved as a separate file as well + Seq(15, 16, 17).toDF("key") + .withColumn("value", 'key % 2) + .repartition(1) + .write + .format("delta") + .mode("append") + .save(tempDir.getAbsolutePath) + val deltaTable = io.delta.tables.DeltaTable.forPath(spark, tempDir.getAbsolutePath) + val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) + deltaLog.snapshot.numOfFiles + + // update the table + deltaTable.update(col("key") === lit("16"), Map("value" -> lit("1"))) + // The file from the append gets updated but the file from the initial table gets scanned + // as well. We want to make sure numCopied rows is calculated from written files and not + // scanned files[SC-33980] + + // get operation metrics + val operationMetrics = getOperationMetrics(deltaTable.history(1)) + val expectedMetrics = Map( + "numAddedFiles" -> "1", + "numRemovedFiles" -> "1", + "numUpdatedRows" -> "1", + "numCopiedRows" -> "2" // There should be only three rows in total(updated + copied) + ) + checkOperationMetrics(expectedMetrics, operationMetrics, DeltaOperationMetrics.UPDATE) + val expectedTimeMetrics = Set("executionTimeMs", "scanTimeMs", "rewriteTimeMs") + checkOperationTimeMetricsInvariant(expectedTimeMetrics, operationMetrics) + } + test("operation metrics - update") { - withSQLConf(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED.key -> "true") { - withTempDir { tempDir => - // Create the initial table as a single file - Seq(1, 2, 5, 11, 21, 3, 4, 6, 9, 7, 8, 0).toDF("key") - .withColumn("value", 'key % 2) - .write - .format("delta") - .save(tempDir.getAbsolutePath) + withSQLConf((DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED.key -> "true")) { + metricsUpdateTest + } + } - // append additional data with the same number range to the table. - // This data is saved as a separate file as well - Seq(15, 16, 17).toDF("key") - .withColumn("value", 'key % 2) - .repartition(1) - .write - .format("delta") - .mode("append") - .save(tempDir.getAbsolutePath) - val deltaTable = io.delta.tables.DeltaTable.forPath(spark, tempDir.getAbsolutePath) - val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) - deltaLog.snapshot.numOfFiles + def metricsUpdatePartitionedColumnTest : Unit = { + val numRows = 100 + val numPartitions = 5 + withTempDir { tempDir => + spark.range(numRows) + .withColumn("c1", 'id + 1) + .withColumn("c2", 'id % numPartitions) + .write + .partitionBy("c2") + .format("delta") + .save(tempDir.getAbsolutePath) - // update the table - deltaTable.update(col("key") === lit("16"), Map("value" -> lit("1"))) - // The file from the append gets updated but the file from the initial table gets scanned - // as well. We want to make sure numCopied rows is calculated from written files and not - // scanned files[SC-33980] + val deltaTable = io.delta.tables.DeltaTable.forPath(tempDir.getAbsolutePath) + val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) + val numFilesBeforeUpdate = deltaLog.snapshot.numOfFiles + deltaTable.update(col("c2") < 1, Map("c2" -> lit("1"))) + val numFilesAfterUpdate = deltaLog.snapshot.numOfFiles - // get operation metrics - val operationMetrics = getOperationMetrics(deltaTable.history(1)) - val expectedMetrics = Map( - "numAddedFiles" -> "1", - "numRemovedFiles" -> "1", - "numUpdatedRows" -> "1", - "numCopiedRows" -> "2" // There should be only three rows in total(updated + copied) - ) - checkOperationMetrics(expectedMetrics, operationMetrics, DeltaOperationMetrics.UPDATE) - val expectedTimeMetrics = Set("executionTimeMs", "scanTimeMs", "rewriteTimeMs") - checkOperationTimeMetricsInvariant(expectedTimeMetrics, operationMetrics) - } + val operationMetrics = getOperationMetrics(deltaTable.history(1)) + val newFiles = numFilesAfterUpdate - numFilesBeforeUpdate + val oldFiles = numFilesBeforeUpdate / numPartitions + val addedFiles = newFiles + oldFiles + val expectedMetrics = Map( + "numUpdatedRows" -> (numRows / numPartitions).toString, + "numCopiedRows" -> "0", + "numAddedFiles" -> addedFiles.toString, + "numRemovedFiles" -> (numFilesBeforeUpdate / numPartitions).toString + ) + checkOperationMetrics(expectedMetrics, operationMetrics, DeltaOperationMetrics.UPDATE) } } test("operation metrics - update - partitioned column") { - withSQLConf(DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED.key -> "true") { - val numRows = 100 - val numPartitions = 5 - withTempDir { tempDir => - spark.range(numRows) - .withColumn("c1", 'id + 1) - .withColumn("c2", 'id % numPartitions) - .write - .partitionBy("c2") - .format("delta") - .save(tempDir.getAbsolutePath) - - val deltaTable = io.delta.tables.DeltaTable.forPath(tempDir.getAbsolutePath) - val deltaLog = DeltaLog.forTable(spark, tempDir.getAbsolutePath) - val numFilesBeforeUpdate = deltaLog.snapshot.numOfFiles - deltaTable.update(col("c2") < 1, Map("c2" -> lit("1"))) - val numFilesAfterUpdate = deltaLog.snapshot.numOfFiles - - val operationMetrics = getOperationMetrics(deltaTable.history(1)) - val newFiles = numFilesAfterUpdate - numFilesBeforeUpdate - val oldFiles = numFilesBeforeUpdate / numPartitions - val addedFiles = newFiles + oldFiles - val expectedMetrics = Map( - "numUpdatedRows" -> (numRows / numPartitions).toString, - "numCopiedRows" -> "0", - "numAddedFiles" -> addedFiles.toString, - "numRemovedFiles" -> (numFilesBeforeUpdate / numPartitions).toString - ) - checkOperationMetrics(expectedMetrics, operationMetrics, DeltaOperationMetrics.UPDATE) - } + withSQLConf((DeltaSQLConf.DELTA_HISTORY_METRICS_ENABLED.key -> "true")) { + metricsUpdatePartitionedColumnTest } }