From dde57a3b330ed34a9c71c09977c53ee394ef8c31 Mon Sep 17 00:00:00 2001 From: Rahul Shivu Mahadev Date: Mon, 18 Apr 2022 12:25:56 -0700 Subject: [PATCH] Fix generate symlink manifest to delete partitions that contain spaces - Problem : URI paths were being compared with regular paths - Fixed by decoding URI path to get regular path - added unit test GitOrigin-RevId: 6ddf2dd4b2292aa8da7970fe7a37afed2523433c --- .../delta/hooks/GenerateSymlinkManifest.scala | 6 ++- .../DeltaGenerateSymlinkManifestSuite.scala | 49 +++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/hooks/GenerateSymlinkManifest.scala b/core/src/main/scala/org/apache/spark/sql/delta/hooks/GenerateSymlinkManifest.scala index ddb5350291b..807dca21f77 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/hooks/GenerateSymlinkManifest.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/hooks/GenerateSymlinkManifest.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.delta.hooks // scalastyle:off import.ordering.noEmptyLine +import java.net.URI + import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.metering.DeltaLogging @@ -208,7 +210,9 @@ trait GenerateSymlinkManifestImpl extends PostCommitHook with DeltaLogging with new Path(relativeManifestFilePath).getParent.toString // returns "col1=0/col2=0" }.filterNot(_.trim.isEmpty).toSet } else Set.empty[String] - } + }.map(uri => new Path(new URI(uri)).toString) + // paths returned from inputFiles are URI encoded so we need to convert them back to string. + // So that they can compared with newManifestPartitionRelativePaths in the next step. // Delete manifest files for partitions that are not in current and so weren't overwritten val manifestFilePartitionsToDelete = diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaGenerateSymlinkManifestSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaGenerateSymlinkManifestSuite.scala index 037f4bf3879..1b770906526 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaGenerateSymlinkManifestSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaGenerateSymlinkManifestSuite.scala @@ -484,6 +484,55 @@ trait DeltaGenerateSymlinkManifestSuiteBase extends QueryTest } } + Seq(true, false).foreach { useIncremental => + test(s"delete partition column with special char - incremental=$useIncremental") { + + def writePartition(dir: File, partName: String): Unit = { + spark.range(10) + .withColumn("part", lit(partName)) + .repartition(1) + .write + .format("delta") + .mode("append") + .partitionBy("part") + .save(dir.toString) + } + + withTempDir { dir => + // create table and write first manifest + writePartition(dir, "noSpace") + generateSymlinkManifest(dir.toString) + + withIncrementalManifest(dir, useIncremental) { + // 1. test paths with spaces + writePartition(dir, "yes space") + + if (!useIncremental) { generateSymlinkManifest(dir.toString) } + assertManifest(dir, expectSameFiles = true, expectedNumFiles = 2) + + // delete partition + sql(s"""DELETE FROM delta.`${dir.toString}` WHERE part="yes space";""") + + if (!useIncremental) { generateSymlinkManifest(dir.toString) } + assertManifest(dir, expectSameFiles = true, expectedNumFiles = 1) + + // 2. test special characters + // scalastyle:off nonascii + writePartition(dir, "库尔 勒") + if (!useIncremental) { generateSymlinkManifest(dir.toString) } + assertManifest(dir, expectSameFiles = true, expectedNumFiles = 2) + + // delete partition + sql(s"""DELETE FROM delta.`${dir.toString}` WHERE part="库尔 勒";""") + // scalastyle:on nonascii + + if (!useIncremental) { generateSymlinkManifest(dir.toString) } + assertManifest(dir, expectSameFiles = true, expectedNumFiles = 1) + } + } + } + } + /** * Assert that the manifest files in the table meet the expectations. * @param tablePath Path of the Delta table