From 0c2a6a17af93f7af0622fe3df9085b75d5b88a0b Mon Sep 17 00:00:00 2001 From: Lin Zhou Date: Tue, 9 Jan 2024 10:04:21 -0800 Subject: [PATCH] [Spark][Sharing] Adds snapshot support for "delta format sharing" Adds snapshot support for "delta format sharing", this is the second PR of issue #2291 - DeltaSharingDataSource with snapshot query support - DeltaSharingDataSourceDeltaSuite - DeltaSharingDataSourceDeltaTestUtils/TestClientForDeltaFormatSharing/TestDeltaSharingFileSystem Closes delta-io/delta#2440 GitOrigin-RevId: a095445b6da809ee9a5b4ece7c38d04a172ff70f --- ...pache.spark.sql.sources.DataSourceRegister | 1 + .../DeltaFormatSharingLimitPushDown.scala | 53 ++ .../spark/DeltaSharingDataSource.scala | 272 +++++++++ .../sharing/spark/DeltaSharingUtils.scala | 54 ++ .../DeltaSharingDataSourceDeltaSuite.scala | 564 ++++++++++++++++++ ...DeltaSharingDataSourceDeltaTestUtils.scala | 538 +++++++++++++++++ .../spark/DeltaSharingFileIndexSuite.scala | 15 +- .../TestClientForDeltaFormatSharing.scala | 270 +++++++++ .../spark/TestDeltaSharingFileSystem.scala | 140 +++++ 9 files changed, 1895 insertions(+), 12 deletions(-) create mode 100644 sharing/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister create mode 100644 sharing/src/main/scala/io/delta/sharing/spark/DeltaFormatSharingLimitPushDown.scala create mode 100644 sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala create mode 100644 sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala create mode 100644 sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaTestUtils.scala create mode 100644 sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala create mode 100644 sharing/src/test/scala/io/delta/sharing/spark/TestDeltaSharingFileSystem.scala diff --git a/sharing/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sharing/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister new file mode 100644 index 00000000000..c81a708f931 --- /dev/null +++ b/sharing/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -0,0 +1 @@ +io.delta.sharing.spark.DeltaSharingDataSource \ No newline at end of file diff --git a/sharing/src/main/scala/io/delta/sharing/spark/DeltaFormatSharingLimitPushDown.scala b/sharing/src/main/scala/io/delta/sharing/spark/DeltaFormatSharingLimitPushDown.scala new file mode 100644 index 00000000000..589df97057a --- /dev/null +++ b/sharing/src/main/scala/io/delta/sharing/spark/DeltaFormatSharingLimitPushDown.scala @@ -0,0 +1,53 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sharing.spark + +import io.delta.sharing.client.util.ConfUtils + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.IntegerLiteral +import org.apache.spark.sql.catalyst.plans.logical.{LocalLimit, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} + +// A spark rule that applies limit pushdown to DeltaSharingFileIndex, when the config is enabled. +// To allow only fetching needed files from delta sharing server. +object DeltaFormatSharingLimitPushDown extends Rule[LogicalPlan] { + + def setup(spark: SparkSession): Unit = synchronized { + if (!spark.experimental.extraOptimizations.contains(DeltaFormatSharingLimitPushDown)) { + spark.experimental.extraOptimizations ++= Seq(DeltaFormatSharingLimitPushDown) + } + } + + def apply(p: LogicalPlan): LogicalPlan = { + p transform { + case localLimit @ LocalLimit( + literalExpr @ IntegerLiteral(limit), + l @ LogicalRelation( + r @ HadoopFsRelation(remoteIndex: DeltaSharingFileIndex, _, _, _, _, _), + _, + _, + _ + ) + ) if (ConfUtils.limitPushdownEnabled(p.conf) && remoteIndex.limitHint.isEmpty) => + val spark = SparkSession.active + val newRel = r.copy(location = remoteIndex.copy(limitHint = Some(limit)))(spark) + LocalLimit(literalExpr, l.copy(relation = newRel)) + } + } +} diff --git a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala new file mode 100644 index 00000000000..231be28ce50 --- /dev/null +++ b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala @@ -0,0 +1,272 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sharing.spark + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.delta.{ + DeltaColumnMapping, + DeltaErrors, + DeltaTableUtils => TahoeDeltaTableUtils +} +import org.apache.spark.sql.delta.commands.cdc.CDCReader +import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.schema.SchemaUtils +import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSQLConf} +import io.delta.sharing.client.{DeltaSharingClient, DeltaSharingRestClient} +import io.delta.sharing.client.model.{Table => DeltaSharingTable} +import io.delta.sharing.client.util.{ConfUtils, JsonUtils} +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkEnv +import org.apache.spark.delta.sharing.PreSignedUrlCache +import org.apache.spark.sql.{SparkSession, SQLContext} +import org.apache.spark.sql.execution.datasources.HadoopFsRelation +import org.apache.spark.sql.execution.streaming.Source +import org.apache.spark.sql.sources.{ + BaseRelation, + DataSourceRegister, + RelationProvider, + StreamSourceProvider +} +import org.apache.spark.sql.types.StructType + +/** + * A DataSource for Delta Sharing, used to support all types of queries on a delta sharing table: + * batch, cdf, streaming, time travel, filters, etc. + */ +private[sharing] class DeltaSharingDataSource + extends RelationProvider + with DataSourceRegister + with DeltaLogging { + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String]): BaseRelation = { + DeltaSharingDataSource.setupFileSystem(sqlContext) + val options = new DeltaSharingOptions(parameters) + + val userInputResponseFormat = options.options.get(DeltaSharingOptions.RESPONSE_FORMAT) + if (userInputResponseFormat.isEmpty && !options.readChangeFeed) { + return autoResolveBaseRelationForSnapshotQuery(options) + } + + val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException) + if (options.responseFormat == DeltaSharingOptions.RESPONSE_FORMAT_PARQUET) { + // When user explicitly set responseFormat=parquet, to query shared tables without advanced + // delta features. + logInfo(s"createRelation with parquet format for table path:$path, parameters:$parameters") + val deltaLog = RemoteDeltaLog( + path, + forStreaming = false, + responseFormat = options.responseFormat + ) + deltaLog.createRelation( + options.versionAsOf, + options.timestampAsOf, + options.cdfOptions + ) + } else if (options.responseFormat == DeltaSharingOptions.RESPONSE_FORMAT_DELTA) { + // When user explicitly set responseFormat=delta, to query shared tables with advanced + // delta features. + logInfo(s"createRelation with delta format for table path:$path, parameters:$parameters") + // 1. create delta sharing client + val parsedPath = DeltaSharingRestClient.parsePath(path) + val client = DeltaSharingRestClient( + profileFile = parsedPath.profileFile, + forStreaming = false, + responseFormat = options.responseFormat, + // comma separated delta reader features, used to tell delta sharing server what delta + // reader features the client is able to process. + readerFeatures = DeltaSharingUtils.SUPPORTED_READER_FEATURES.mkString(",") + ) + val dsTable = DeltaSharingTable( + share = parsedPath.share, + schema = parsedPath.schema, + name = parsedPath.table + ) + + // 2. getMetadata for schema to be used in the file index. + val deltaTableMetadata = DeltaSharingUtils.queryDeltaTableMetadata( + client = client, + table = dsTable, + versionAsOf = options.versionAsOf, + timestampAsOf = options.timestampAsOf + ) + val deltaSharingTableMetadata = DeltaSharingUtils.getDeltaSharingTableMetadata( + table = dsTable, + deltaTableMetadata = deltaTableMetadata + ) + + // 3. Prepare HadoopFsRelation + getHadoopFsRelationForDeltaSnapshotQuery( + path = path, + options = options, + dsTable = dsTable, + client = client, + deltaSharingTableMetadata = deltaSharingTableMetadata + ) + } else { + throw new UnsupportedOperationException( + s"responseformat(${options.responseFormat}) is not supported in delta sharing." + ) + } + } + + /** + * "parquet format sharing" leverages the existing set of remote classes to directly handle the + * list of presigned urls and read data. + * "delta format sharing" instead constructs a local delta log and leverages the delta library to + * read data. + * Firstly we sends a getMetadata call to the delta sharing server the suggested response format + * of the shared table by the server (based on whether there are advanced delta features in the + * shared table), and then decide the code path on the client side. + */ + private def autoResolveBaseRelationForSnapshotQuery( + options: DeltaSharingOptions): BaseRelation = { + val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException) + val parsedPath = DeltaSharingRestClient.parsePath(path) + + val client = DeltaSharingRestClient( + profileFile = parsedPath.profileFile, + forStreaming = false, + // Indicating that the client is able to process response format in both parquet and delta. + responseFormat = s"${DeltaSharingOptions.RESPONSE_FORMAT_PARQUET}," + + s"${DeltaSharingOptions.RESPONSE_FORMAT_DELTA}", + // comma separated delta reader features, used to tell delta sharing server what delta + // reader features the client is able to process. + readerFeatures = DeltaSharingUtils.SUPPORTED_READER_FEATURES.mkString(",") + ) + val dsTable = DeltaSharingTable( + name = parsedPath.table, + schema = parsedPath.schema, + share = parsedPath.share + ) + + val deltaTableMetadata = DeltaSharingUtils.queryDeltaTableMetadata( + client = client, + table = dsTable, + versionAsOf = options.versionAsOf, + timestampAsOf = options.timestampAsOf + ) + + if (deltaTableMetadata.respondedFormat == DeltaSharingOptions.RESPONSE_FORMAT_PARQUET) { + val deltaLog = RemoteDeltaLog( + path = path, + forStreaming = false, + responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_PARQUET, + initDeltaTableMetadata = Some(deltaTableMetadata) + ) + deltaLog.createRelation(options.versionAsOf, options.timestampAsOf, options.cdfOptions) + } else if (deltaTableMetadata.respondedFormat == DeltaSharingOptions.RESPONSE_FORMAT_DELTA) { + val deltaSharingTableMetadata = DeltaSharingUtils.getDeltaSharingTableMetadata( + table = dsTable, + deltaTableMetadata = deltaTableMetadata + ) + val deltaOnlyClient = DeltaSharingRestClient( + profileFile = parsedPath.profileFile, + forStreaming = false, + // Indicating that the client request delta format in response. + responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_DELTA, + // comma separated delta reader features, used to tell delta sharing server what delta + // reader features the client is able to process. + readerFeatures = DeltaSharingUtils.SUPPORTED_READER_FEATURES.mkString(",") + ) + getHadoopFsRelationForDeltaSnapshotQuery( + path = path, + options = options, + dsTable = dsTable, + client = deltaOnlyClient, + deltaSharingTableMetadata = deltaSharingTableMetadata + ) + } else { + throw new UnsupportedOperationException( + s"Unexpected respondedFormat for getMetadata rpc:${deltaTableMetadata.respondedFormat}." + ) + } + } + + /** + * Prepare a HadoopFsRelation for the snapshot query on a delta sharing table. It will contain a + * DeltaSharingFileIndex which is used to handle delta sharing rpc, and construct the local delta + * log, and then build a TahoeFileIndex on top of the delta log. + */ + private def getHadoopFsRelationForDeltaSnapshotQuery( + path: String, + options: DeltaSharingOptions, + dsTable: DeltaSharingTable, + client: DeltaSharingClient, + deltaSharingTableMetadata: DeltaSharingUtils.DeltaSharingTableMetadata): BaseRelation = { + // Prepare DeltaSharingFileIndex + val spark = SparkSession.active + val params = new DeltaSharingFileIndexParams( + new Path(path), + spark, + deltaSharingTableMetadata.metadata, + options + ) + if (ConfUtils.limitPushdownEnabled(spark.sessionState.conf)) { + DeltaFormatSharingLimitPushDown.setup(spark) + } + // limitHint is always None here and will be overridden in DeltaFormatSharingLimitPushDown. + val fileIndex = DeltaSharingFileIndex( + params = params, + table = dsTable, + client = client, + limitHint = None + ) + + // return HadoopFsRelation with the DeltaSharingFileIndex. + HadoopFsRelation( + location = fileIndex, + // This is copied from DeltaLog.buildHadoopFsRelationWithFileIndex. + // Dropping column mapping metadata because it is not relevant for partition schema. + partitionSchema = DeltaColumnMapping.dropColumnMappingMetadata(fileIndex.partitionSchema), + // This is copied from DeltaLog.buildHadoopFsRelationWithFileIndex, original comment: + // We pass all table columns as `dataSchema` so that Spark will preserve the partition + // column locations. Otherwise, for any partition columns not in `dataSchema`, Spark would + // just append them to the end of `dataSchema`. + dataSchema = DeltaColumnMapping.dropColumnMappingMetadata( + TahoeDeltaTableUtils.removeInternalMetadata( + spark, + SchemaUtils.dropNullTypeColumns(deltaSharingTableMetadata.metadata.schema) + ) + ), + bucketSpec = None, + // Handle column mapping metadata in schema. + fileFormat = fileIndex.fileFormat( + deltaSharingTableMetadata.protocol.deltaProtocol, + deltaSharingTableMetadata.metadata.deltaMetadata + ), + options = Map.empty + )(spark) + } + + override def shortName(): String = "deltaSharing" +} + +private[sharing] object DeltaSharingDataSource { + def setupFileSystem(sqlContext: SQLContext): Unit = { + sqlContext.sparkContext.hadoopConfiguration + .setIfUnset("fs.delta-sharing.impl", "io.delta.sharing.client.DeltaSharingFileSystem") + sqlContext.sparkContext.hadoopConfiguration + .setIfUnset( + "fs.delta-sharing-log.impl", + "io.delta.sharing.spark.DeltaSharingLogFileSystem" + ) + PreSignedUrlCache.registerIfNeeded(SparkEnv.get) + } +} diff --git a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala index 4e01408d294..bfd590ca00b 100644 --- a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala +++ b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala @@ -44,6 +44,9 @@ import org.apache.spark.storage.{BlockId, StorageLevel} object DeltaSharingUtils extends Logging { + val SUPPORTED_READER_FEATURES: Seq[String] = + Seq(DeletionVectorsTableFeature.name, ColumnMappingTableFeature.name) + // The prefix will be used for block ids of all blocks that store the delta log in BlockManager. // It's used to ensure delta sharing queries don't mess up with blocks with other applications. val DELTA_SHARING_BLOCK_ID_PREFIX = "test_delta-sharing" @@ -58,6 +61,57 @@ object DeltaSharingUtils extends Logging { metadata: model.DeltaSharingMetadata ) + def queryDeltaTableMetadata( + client: DeltaSharingClient, + table: Table, + versionAsOf: Option[Long] = None, + timestampAsOf: Option[String] = None): DeltaTableMetadata = { + val deltaTableMetadata = client.getMetadata(table, versionAsOf, timestampAsOf) + logInfo( + s"getMetadata returned in ${deltaTableMetadata.respondedFormat} format for table " + + s"$table with v_${versionAsOf.map(_.toString).getOrElse("None")} " + + s"t_${timestampAsOf.getOrElse("None")} from delta sharing server." + ) + deltaTableMetadata + } + + /** + * parse the protocol and metadata from rpc response for getMetadata. + */ + def getDeltaSharingTableMetadata( + table: Table, + deltaTableMetadata: DeltaTableMetadata): DeltaSharingTableMetadata = { + + var metadataOption: Option[model.DeltaSharingMetadata] = None + var protocolOption: Option[model.DeltaSharingProtocol] = None + + deltaTableMetadata.lines + .map( + JsonUtils.fromJson[model.DeltaSharingSingleAction](_).unwrap + ) + .foreach { + case m: model.DeltaSharingMetadata => metadataOption = Some(m) + case p: model.DeltaSharingProtocol => protocolOption = Some(p) + case _ => // ignore other lines + } + + DeltaSharingTableMetadata( + version = deltaTableMetadata.version, + protocol = protocolOption.getOrElse { + throw new IllegalStateException( + s"Failed to get Protocol for ${table.toString}, " + + s"response from server:${deltaTableMetadata.lines}." + ) + }, + metadata = metadataOption.getOrElse { + throw new IllegalStateException( + s"Failed to get Metadata for ${table.toString}, " + + s"response from server:${deltaTableMetadata.lines}." + ) + } + ) + } + private def getTableRefreshResult(tableFiles: DeltaTableFiles): TableRefreshResult = { var minUrlExpiration: Option[Long] = None val idToUrl = tableFiles.lines diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala new file mode 100644 index 00000000000..3334c76ade5 --- /dev/null +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala @@ -0,0 +1,564 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sharing.spark + +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.delta.sharing.DeltaSharingTestSparkUtils +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types.{ + DateType, + IntegerType, + LongType, + StringType, + StructType, + TimestampType +} + +trait DeltaSharingDataSourceDeltaSuiteBase + extends QueryTest + with DeltaSQLCommandTest + with DeltaSharingTestSparkUtils + with DeltaSharingDataSourceDeltaTestUtils { + + /** + * metadata tests + */ + test("failed to getMetadata") { + withTempDir { tempDir => + val sharedTableName = "shared_table_broken_json" + + def test(tablePath: String, tableFullName: String): Unit = { + DeltaSharingUtils.overrideIteratorBlock[String]( + blockId = TestClientForDeltaFormatSharing.getBlockId(sharedTableName, "getMetadata"), + values = Seq("bad protocol string", "bad metadata string").toIterator + ) + DeltaSharingUtils.overrideSingleBlock[Long]( + blockId = TestClientForDeltaFormatSharing.getBlockId(sharedTableName, "getTableVersion"), + value = 1 + ) + // JsonParseException on "bad protocol string" + val exception = intercept[com.fasterxml.jackson.core.JsonParseException] { + spark.read.format("deltaSharing").option("responseFormat", "delta").load(tablePath).schema + } + assert(exception.getMessage.contains("Unrecognized token 'bad'")) + + // table_with_broken_protocol + // able to parse as a DeltaSharingSingleAction, but it's an addFile, not metadata. + DeltaSharingUtils.overrideIteratorBlock[String]( + blockId = TestClientForDeltaFormatSharing.getBlockId(sharedTableName, "getMetadata"), + // scalastyle:off line.size.limit + values = Seq( + """{"add": {"path":"random","id":"random","partitionValues":{},"size":1,"motificationTime":1,"dataChange":false}}""" + ).toIterator + ) + val exception2 = intercept[IllegalStateException] { + spark.read.format("deltaSharing").option("responseFormat", "delta").load(tablePath).schema + } + assert( + exception2.getMessage + .contains(s"Failed to get Protocol for $tableFullName") + ) + + // table_with_broken_metadata + // able to parse as a DeltaSharingSingleAction, but it's an addFile, not metadata. + DeltaSharingUtils.overrideIteratorBlock[String]( + blockId = TestClientForDeltaFormatSharing.getBlockId(sharedTableName, "getMetadata"), + values = Seq( + """{"protocol":{"minReaderVersion":1}}""" + ).toIterator + ) + val exception3 = intercept[IllegalStateException] { + spark.read.format("deltaSharing").option("responseFormat", "delta").load(tablePath).schema + } + assert( + exception3.getMessage + .contains(s"Failed to get Metadata for $tableFullName") + ) + } + + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + val tableFullName = s"share1.default.$sharedTableName" + test(s"${profileFile.getCanonicalPath}#$tableFullName", tableFullName) + } + } + } + + def assertLimit(tablePath: String, expectedLimit: Seq[Long]): Unit = { + assert(expectedLimit == + TestClientForDeltaFormatSharing.limits.filter(_._1.contains(tablePath)).map(_._2)) + } + + def assertRequestedFormat(tablePath: String, expectedFormat: Seq[String]): Unit = { + assert(expectedFormat == + TestClientForDeltaFormatSharing.requestedFormat.filter(_._1.contains(tablePath)).map(_._2)) + } + /** + * snapshot queries + */ + test("DeltaSharingDataSource able to read simple data") { + withTempDir { tempDir => + val deltaTableName = "delta_table_simple" + withTable(deltaTableName) { + createTable(deltaTableName) + sql( + s"INSERT INTO $deltaTableName" + + """ VALUES (1, "one", "2023-01-01", "2023-01-01 00:00:00"), + |(2, "two", "2023-02-02", "2023-02-02 00:00:00")""".stripMargin + ) + + val expectedSchema: StructType = new StructType() + .add("c1", IntegerType) + .add("c2", StringType) + .add("c3", DateType) + .add("c4", TimestampType) + val expected = Seq( + Row(1, "one", sqlDate("2023-01-01"), sqlTimestamp("2023-01-01 00:00:00")), + Row(2, "two", sqlDate("2023-02-02"), sqlTimestamp("2023-02-02 00:00:00")) + ) + + Seq(true, false).foreach { enableLimitPushdown => + val sharedTableName = s"shared_table_simple_$enableLimitPushdown" + prepareMockedClientAndFileSystemResult(deltaTableName, sharedTableName) + prepareMockedClientGetTableVersion(deltaTableName, sharedTableName) + + def test(tablePath: String, tableName: String): Unit = { + assert( + expectedSchema == spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .load(tablePath) + .schema + ) + val df = + spark.read.format("deltaSharing").option("responseFormat", "delta").load(tablePath) + checkAnswer(df, expected) + assert(df.count() > 0) + assertLimit(tableName, Seq.empty[Long]) + val limitDf = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .load(tablePath) + .limit(1) + assert(limitDf.collect().size == 1) + assertLimit(tableName, Some(1L).filter(_ => enableLimitPushdown).toSeq) + } + + val limitPushdownConfig = Map( + "spark.delta.sharing.limitPushdown.enabled" -> enableLimitPushdown.toString + ) + withSQLConf((limitPushdownConfig ++ getDeltaSharingClassesSQLConf).toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + val tableName = s"share1.default.$sharedTableName" + test(s"${profileFile.getCanonicalPath}#$tableName", tableName) + } + } + } + } + } + + test("DeltaSharingDataSource able to auto resolve responseFormat") { + withTempDir { tempDir => + val deltaTableName = "delta_table_auto" + withTable(deltaTableName) { + createSimpleTable(deltaTableName, enableCdf = false) + sql( + s"""INSERT INTO $deltaTableName VALUES (1, "one"), (2, "one")""".stripMargin + ) + sql( + s"""INSERT INTO $deltaTableName VALUES (1, "two"), (2, "two")""".stripMargin + ) + + val expectedSchema: StructType = new StructType() + .add("c1", IntegerType) + .add("c2", StringType) + + def testAutoResolve(tablePath: String, tableName: String, expectedFormat: String): Unit = { + assert( + expectedSchema == spark.read + .format("deltaSharing") + .load(tablePath) + .schema + ) + + val deltaDf = spark.read.format("delta").table(deltaTableName) + val sharingDf = spark.read.format("deltaSharing").load(tablePath) + checkAnswer(deltaDf, sharingDf) + assert(sharingDf.count() > 0) + assertLimit(tableName, Seq.empty[Long]) + assertRequestedFormat(tableName, Seq(expectedFormat)) + + val limitDf = spark.read + .format("deltaSharing") + .load(tablePath) + .limit(1) + assert(limitDf.collect().size == 1) + assertLimit(tableName, Seq(1L)) + + val deltaDfV1 = spark.read.format("delta").option("versionAsOf", 1).table(deltaTableName) + val sharingDfV1 = + spark.read.format("deltaSharing").option("versionAsOf", 1).load(tablePath) + checkAnswer(deltaDfV1, sharingDfV1) + assert(sharingDfV1.count() > 0) + assertRequestedFormat(tableName, Seq(expectedFormat)) + } + + // Test for delta format response + val sharedDeltaTable = "shared_delta_table" + prepareMockedClientAndFileSystemResult(deltaTableName, sharedDeltaTable) + prepareMockedClientAndFileSystemResult( + deltaTableName, + sharedDeltaTable, + versionAsOf = Some(1) + ) + prepareMockedClientGetTableVersion(deltaTableName, sharedDeltaTable) + + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + testAutoResolve( + s"${profileFile.getCanonicalPath}#share1.default.$sharedDeltaTable", + s"share1.default.$sharedDeltaTable", + "delta" + ) + } + + // Test for parquet format response + val sharedParquetTable = "shared_parquet_table" + prepareMockedClientAndFileSystemResultForParquet( + deltaTableName, + sharedParquetTable + ) + prepareMockedClientAndFileSystemResultForParquet( + deltaTableName, + sharedParquetTable, + versionAsOf = Some(1) + ) + prepareMockedClientGetTableVersion(deltaTableName, sharedParquetTable) + + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + testAutoResolve( + s"${profileFile.getCanonicalPath}#share1.default.$sharedParquetTable", + s"share1.default.$sharedParquetTable", + "parquet" + ) + } + } + } + } + + test("DeltaSharingDataSource able to read data with filters and select") { + withTempDir { tempDir => + val deltaTableName = "delta_table_filters" + withTable(deltaTableName) { + createSimpleTable(deltaTableName, enableCdf = false) + sql(s"""INSERT INTO $deltaTableName VALUES (1, "first"), (2, "first")""") + sql(s"""INSERT INTO $deltaTableName VALUES (1, "second"), (2, "second")""") + sql(s"""INSERT INTO $deltaTableName VALUES (1, "third"), (2, "third")""") + + val sharedTableName = "shared_table_filters" + prepareMockedClientAndFileSystemResult(deltaTableName, sharedTableName) + prepareMockedClientGetTableVersion(deltaTableName, sharedTableName) + + // The files returned from delta sharing client are the same for these queries. + // This is to test the filters are passed correctly to TahoeLogFileIndex for the local delta + // log. + def testFiltersAndSelect(tablePath: String): Unit = { + var expected = Seq(Row(1, "first"), Row(1, "second"), Row(1, "third"), Row(2, "second")) + var df = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .load(tablePath) + .filter(col("c1") === 1 || col("c2") === "second") + checkAnswer(df, expected) + + expected = Seq(Row(1, "first"), Row(1, "second"), Row(1, "third")) + df = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .load(tablePath) + .filter(col("c1") === 1) + checkAnswer(df, expected) + + expected = Seq(Row(1, "second"), Row(2, "second")) + df = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .load(tablePath) + .filter(col("c2") === "second") + checkAnswer(df, expected) + + // with select as well + expected = Seq(Row(1), Row(1), Row(1), Row(2), Row(2), Row(2)) + df = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .load(tablePath) + .select("c1") + checkAnswer(df, expected) + + expected = Seq( + Row("first"), + Row("first"), + Row("second"), + Row("second"), + Row("third"), + Row("third") + ) + df = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .load(tablePath) + .select("c2") + checkAnswer(df, expected) + + expected = Seq(Row(1), Row(2)) + df = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .load(tablePath) + .filter(col("c2") === "second") + .select("c1") + checkAnswer(df, expected) + } + + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + testFiltersAndSelect(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableName") + } + } + } + } + + test("DeltaSharingDataSource able to read data for time travel queries") { + withTempDir { tempDir => + val deltaTableName = "delta_table_time_travel" + withTable(deltaTableName) { + createTable(deltaTableName) + + sql( + s"INSERT INTO $deltaTableName" + + """ VALUES (1, "one", "2023-01-01", "2023-01-01 00:00:00")""".stripMargin + ) + sql( + s"INSERT INTO $deltaTableName" + + """ VALUES (2, "two", "2023-02-02", "2023-02-02 00:00:00")""".stripMargin + ) + sql( + s"INSERT INTO $deltaTableName" + + """ VALUES (3, "three", "2023-03-03", "2023-03-03 00:00:00")""".stripMargin + ) + + val sharedTableNameV1 = "shared_table_v1" + prepareMockedClientAndFileSystemResult( + deltaTable = deltaTableName, + sharedTable = sharedTableNameV1, + versionAsOf = Some(1L) + ) + + def testVersionAsOf1(tablePath: String): Unit = { + val dfV1 = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("versionAsOf", 1) + .load(tablePath) + val expectedV1 = Seq( + Row(1, "one", sqlDate("2023-01-01"), sqlTimestamp("2023-01-01 00:00:00")) + ) + checkAnswer(dfV1, expectedV1) + } + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + testVersionAsOf1(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableNameV1") + } + + // using different table name because spark caches the content read from a file, i.e., + // the delta log from 0.json. + // TODO: figure out how to get a per query id and use it in getCustomTablePath to + // differentiate the same table used in different queries. + // TODO: Also check if it's possible to disable the file cache. + val sharedTableNameV3 = "shared_table_v3" + prepareMockedClientAndFileSystemResult( + deltaTable = deltaTableName, + sharedTable = sharedTableNameV3, + versionAsOf = Some(3L) + ) + + def testVersionAsOf3(tablePath: String): Unit = { + val dfV3 = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("versionAsOf", 3) + .load(tablePath) + val expectedV3 = Seq( + Row(1, "one", sqlDate("2023-01-01"), sqlTimestamp("2023-01-01 00:00:00")), + Row(2, "two", sqlDate("2023-02-02"), sqlTimestamp("2023-02-02 00:00:00")), + Row(3, "three", sqlDate("2023-03-03"), sqlTimestamp("2023-03-03 00:00:00")) + ) + checkAnswer(dfV3, expectedV3) + } + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + testVersionAsOf3(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableNameV3") + } + + val sharedTableNameTs = "shared_table_ts" + // Given the result of delta sharing rpc is mocked, the actual value of the timestampStr + // can be any thing that's valid for DeltaSharingOptions, and formattedTimestamp is the + // parsed result and will be sent in the delta sharing rpc. + val timestampStr = "2023-01-01 00:00:00" + val formattedTimestamp = "2023-01-01T08:00:00Z" + + prepareMockedClientGetTableVersion(deltaTableName, sharedTableNameTs) + prepareMockedClientAndFileSystemResult( + deltaTable = deltaTableName, + sharedTable = sharedTableNameTs, + versionAsOf = None, + timestampAsOf = Some(formattedTimestamp) + ) + + def testTimestampQuery(tablePath: String): Unit = { + val dfTs = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("timestampAsOf", timestampStr) + .load(tablePath) + val expectedTs = Seq( + Row(1, "one", sqlDate("2023-01-01"), sqlTimestamp("2023-01-01 00:00:00")), + Row(2, "two", sqlDate("2023-02-02"), sqlTimestamp("2023-02-02 00:00:00")), + Row(3, "three", sqlDate("2023-03-03"), sqlTimestamp("2023-03-03 00:00:00")) + ) + checkAnswer(dfTs, expectedTs) + } + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + testTimestampQuery(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableNameTs") + } + } + } + } + + test("DeltaSharingDataSource able to read data with more entries") { + withTempDir { tempDir => + val deltaTableName = "delta_table_more" + withTable(deltaTableName) { + createSimpleTable(deltaTableName, enableCdf = false) + // The table operations take about 6~10 seconds. + for (i <- 0 to 9) { + val iteration = s"iteration $i" + val valuesBuilder = Seq.newBuilder[String] + for (j <- 0 to 49) { + valuesBuilder += s"""(${i * 10 + j}, "$iteration")""" + } + sql(s"INSERT INTO $deltaTableName VALUES ${valuesBuilder.result().mkString(",")}") + } + + val sharedTableName = "shared_table_more" + prepareMockedClientAndFileSystemResult(deltaTableName, sharedTableName) + prepareMockedClientGetTableVersion(deltaTableName, sharedTableName) + + val expectedSchema: StructType = new StructType() + .add("c1", IntegerType) + .add("c2", StringType) + val expected = spark.read.format("delta").table(deltaTableName) + + def test(tablePath: String): Unit = { + assert( + expectedSchema == spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .load(tablePath) + .schema + ) + val df = + spark.read.format("deltaSharing").option("responseFormat", "delta").load(tablePath) + checkAnswer(df, expected) + } + + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + test(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableName") + } + } + } + } + + test("DeltaSharingDataSource able to read data with join on the same table") { + withTempDir { tempDir => + val deltaTableName = "delta_table_join" + withTable(deltaTableName) { + createSimpleTable(deltaTableName, enableCdf = false) + sql(s"""INSERT INTO $deltaTableName VALUES (1, "first"), (2, "first")""") + sql(s"""INSERT INTO $deltaTableName VALUES (1, "second"), (2, "second")""") + sql(s"""INSERT INTO $deltaTableName VALUES (1, "third"), (2, "third")""") + + val sharedTableName = "shared_table_join" + prepareMockedClientAndFileSystemResult(deltaTableName, sharedTableName) + prepareMockedClientGetTableVersion(deltaTableName, sharedTableName) + prepareMockedClientAndFileSystemResult( + deltaTableName, + sharedTableName, + versionAsOf = Some(1L) + ) + + def testJoin(tablePath: String): Unit = { + // Query the same latest version + val deltaDfLatest = spark.read.format("delta").table(deltaTableName) + val deltaDfV1 = spark.read.format("delta").option("versionAsOf", 1).table(deltaTableName) + val sharingDfLatest = + spark.read.format("deltaSharing").option("responseFormat", "delta").load(tablePath) + val sharingDfV1 = + spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("versionAsOf", 1) + .load(tablePath) + + var deltaDfJoined = deltaDfLatest.join(deltaDfLatest, "c1") + var sharingDfJoined = sharingDfLatest.join(sharingDfLatest, "c1") + // CheckAnswer ensures that delta sharing produces the same result as delta. + // The check on the size is used to double check that a valid dataframe is generated. + checkAnswer(deltaDfJoined, sharingDfJoined) + assert(sharingDfJoined.count() > 0) + + // Query the same versionAsOf + deltaDfJoined = deltaDfV1.join(deltaDfV1, "c1") + sharingDfJoined = sharingDfV1.join(sharingDfV1, "c1") + checkAnswer(deltaDfJoined, sharingDfJoined) + assert(sharingDfJoined.count() > 0) + + // Query with different versions + deltaDfJoined = deltaDfLatest.join(deltaDfV1, "c1") + sharingDfJoined = sharingDfLatest.join(sharingDfV1, "c1") + checkAnswer(deltaDfJoined, sharingDfJoined) + // Size is 6 because for each of the 6 rows in latest, there is 1 row with the same c1 + // value in v1. + assert(sharingDfJoined.count() > 0) + } + + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + testJoin(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableName") + } + } + } + } +} + +class DeltaSharingDataSourceDeltaSuite extends DeltaSharingDataSourceDeltaSuiteBase {} diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaTestUtils.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaTestUtils.scala new file mode 100644 index 00000000000..2aaab54e91b --- /dev/null +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaTestUtils.scala @@ -0,0 +1,538 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sharing.spark + +import java.io.File +import java.nio.charset.StandardCharsets.UTF_8 + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.delta.{DeltaLog, Snapshot} +import org.apache.spark.sql.delta.actions.{ + Action, + AddCDCFile, + AddFile, + DeletionVectorDescriptor, + Metadata, + RemoveFile +} +import org.apache.spark.sql.delta.deletionvectors.{ + RoaringBitmapArray, + RoaringBitmapArrayFormat +} +import org.apache.spark.sql.delta.util.{FileNames, JsonUtils} +import com.google.common.hash.Hashing +import io.delta.sharing.client.model.{ + AddFile => ClientAddFile, + Metadata => ClientMetadata, + Protocol => ClientProtocol +} +import io.delta.sharing.spark.model.{ + DeltaSharingFileAction, + DeltaSharingMetadata, + DeltaSharingProtocol +} +import org.apache.commons.io.FileUtils +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkConf +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.test.SharedSparkSession + +trait DeltaSharingDataSourceDeltaTestUtils extends SharedSparkSession { + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.delta.sharing.preSignedUrl.expirationMs", "15000") + .set("spark.delta.sharing.driver.refreshCheckIntervalMs", "5000") + .set("spark.delta.sharing.driver.refreshThresholdMs", "10000") + .set("spark.delta.sharing.driver.accessThresholdToExpireMs", "120000") + } + + private[spark] def removePartitionPrefix(filePath: String): String = { + filePath.split("/").last + } + + private def getResponseDVAndId( + sharedTable: String, + deletionVector: DeletionVectorDescriptor): (DeletionVectorDescriptor, String) = { + if (deletionVector != null) { + if (deletionVector.storageType == DeletionVectorDescriptor.INLINE_DV_MARKER) { + (deletionVector, Hashing.sha256().hashString(deletionVector.uniqueId, UTF_8).toString) + } else { + val dvPath = deletionVector.absolutePath(new Path("not-used")) + ( + deletionVector.copy( + pathOrInlineDv = TestDeltaSharingFileSystem.encode(sharedTable, dvPath.getName), + storageType = DeletionVectorDescriptor.PATH_DV_MARKER + ), + Hashing.sha256().hashString(deletionVector.uniqueId, UTF_8).toString + ) + } + } else { + (null, null) + } + } + + private def isDataFile(filePath: String): Boolean = { + filePath.endsWith(".parquet") || filePath.endsWith(".bin") + } + + // Convert from delta AddFile to DeltaSharingFileAction to serialize to json. + private def getDeltaSharingFileActionForAddFile( + addFile: AddFile, + sharedTable: String, + version: Long, + timestamp: Long): DeltaSharingFileAction = { + val parquetFile = removePartitionPrefix(addFile.path) + + val (responseDV, dvFileId) = getResponseDVAndId(sharedTable, addFile.deletionVector) + + DeltaSharingFileAction( + id = Hashing.sha256().hashString(parquetFile, UTF_8).toString, + version = version, + timestamp = timestamp, + deletionVectorFileId = dvFileId, + deltaSingleAction = addFile + .copy( + path = TestDeltaSharingFileSystem.encode(sharedTable, parquetFile), + deletionVector = responseDV + ) + .wrap + ) + } + + // Convert from delta RemoveFile to DeltaSharingFileAction to serialize to json. + // scalastyle:off removeFile + private def getDeltaSharingFileActionForRemoveFile( + removeFile: RemoveFile, + sharedTable: String, + version: Long, + timestamp: Long): DeltaSharingFileAction = { + val parquetFile = removePartitionPrefix(removeFile.path) + + val (responseDV, dvFileId) = getResponseDVAndId(sharedTable, removeFile.deletionVector) + + DeltaSharingFileAction( + id = Hashing.sha256().hashString(parquetFile, UTF_8).toString, + version = version, + timestamp = timestamp, + deletionVectorFileId = dvFileId, + deltaSingleAction = removeFile + .copy( + path = TestDeltaSharingFileSystem.encode(sharedTable, parquetFile), + deletionVector = responseDV + ) + .wrap + ) + // scalastyle:on removeFile + } + + // Reset the result for client.GetTableVersion for the sharedTable based on the latest table + // version of the deltaTable, use BlockManager to store the result. + private[spark] def prepareMockedClientGetTableVersion( + deltaTable: String, + sharedTable: String): Unit = { + val snapshotToUse = getSnapshotToUse(deltaTable, None) + DeltaSharingUtils.overrideSingleBlock[Long]( + blockId = TestClientForDeltaFormatSharing.getBlockId(sharedTable, "getTableVersion"), + value = snapshotToUse.version + ) + } + + def getTimeStampForVersion(deltaTable: String, version: Long): Long = { + val snapshotToUse = getSnapshotToUse(deltaTable, None) + FileUtils + .listFiles(new File(snapshotToUse.deltaLog.logPath.toUri()), null, true) + .asScala + .foreach { f => + if (FileNames.isDeltaFile(new Path(f.getName))) { + if (FileNames.getFileVersion(new Path(f.getName)) == version) { + return f.lastModified + } + } + } + 0 + } + + // Prepare the result(Protocol and Metadata) for client.GetMetadata for the sharedTable based on + // the latest table info of the deltaTable, store them in BlockManager. + private[spark] def prepareMockedClientMetadata(deltaTable: String, sharedTable: String): Unit = { + val snapshotToUse = getSnapshotToUse(deltaTable, None) + val dsProtocol: DeltaSharingProtocol = DeltaSharingProtocol(snapshotToUse.protocol) + val dsMetadata: DeltaSharingMetadata = DeltaSharingMetadata( + deltaMetadata = snapshotToUse.metadata + ) + + // Put the metadata in blockManager for DeltaSharingClient to return for getMetadata. + DeltaSharingUtils.overrideIteratorBlock[String]( + blockId = TestClientForDeltaFormatSharing.getBlockId(sharedTable, "getMetadata"), + values = Seq(dsProtocol.json, dsMetadata.json).toIterator + ) + } + + private def updateAddFileWithInlineDV( + addFile: AddFile, + inlineDvFormat: RoaringBitmapArrayFormat.Value, + bitmap: RoaringBitmapArray): AddFile = { + val dv = DeletionVectorDescriptor.inlineInLog( + bitmap.serializeAsByteArray(inlineDvFormat), + bitmap.cardinality + ) + addFile + .removeRows( + deletionVector = dv, + updateStats = true + ) + ._1 + } + + private def updateDvPathToCount( + addFile: AddFile, + pathToCount: scala.collection.mutable.Map[String, Int]): Unit = { + if (addFile.deletionVector != null && + addFile.deletionVector.storageType != DeletionVectorDescriptor.INLINE_DV_MARKER) { + val dvPath = addFile.deletionVector.pathOrInlineDv + pathToCount.put(dvPath, pathToCount.getOrElse(dvPath, 0) + 1) + } + } + + // Sort by id in decreasing order. + private def deltaSharingFileActionDecreaseOrderFunc( + f1: model.DeltaSharingFileAction, + f2: model.DeltaSharingFileAction): Boolean = { + f1.id > f2.id + } + + // Sort by id in increasing order. + private def deltaSharingFileActionIncreaseOrderFunc( + f1: model.DeltaSharingFileAction, + f2: model.DeltaSharingFileAction): Boolean = { + f1.id < f2.id + } + + private def getSnapshotToUse(deltaTable: String, versionAsOf: Option[Long]): Snapshot = { + val deltaLog = DeltaLog.forTable(spark, new TableIdentifier(deltaTable)) + if (versionAsOf.isDefined) { + deltaLog.getSnapshotAt(versionAsOf.get) + } else { + deltaLog.update() + } + } + + // This function does 2 jobs: + // 1. Prepare the result for functions of delta sharing rest client, i.e., (Protocol, Metadata) + // for getMetadata, (Protocol, Metadata, and list of lines from delta actions) for getFiles, use + // BlockManager to store the data to make them available across different classes. All the lines + // are for responseFormat=parquet. + // 2. Put the parquet file in blockManager for DeltaSharingFileSystem to load bytes out of it. + private[spark] def prepareMockedClientAndFileSystemResultForParquet( + deltaTable: String, + sharedTable: String, + versionAsOf: Option[Long] = None): Unit = { + val lines = Seq.newBuilder[String] + var totalSize = 0L + val clientAddFilesArrayBuffer = ArrayBuffer[ClientAddFile]() + + // To prepare faked delta sharing responses with needed files for DeltaSharingClient. + val snapshotToUse = getSnapshotToUse(deltaTable, versionAsOf) + + snapshotToUse.allFiles.collect().foreach { addFile => + val parquetFile = removePartitionPrefix(addFile.path) + val clientAddFile = ClientAddFile( + url = TestDeltaSharingFileSystem.encode(sharedTable, parquetFile), + id = Hashing.md5().hashString(parquetFile, UTF_8).toString, + partitionValues = addFile.partitionValues, + size = addFile.size, + stats = null, + version = snapshotToUse.version, + timestamp = snapshotToUse.timestamp + ) + totalSize = totalSize + addFile.size + clientAddFilesArrayBuffer += clientAddFile + } + + // Scan through the parquet files of the local delta table, and prepare the data of parquet file + // reading in DeltaSharingFileSystem. + val files = + FileUtils.listFiles(new File(snapshotToUse.deltaLog.dataPath.toUri()), null, true).asScala + files.foreach { f => + val filePath = f.getCanonicalPath + if (isDataFile(filePath)) { + // Put the parquet file in blockManager for DeltaSharingFileSystem to load bytes out of it. + DeltaSharingUtils.overrideIteratorBlock[Byte]( + blockId = TestDeltaSharingFileSystem.getBlockId(sharedTable, f.getName), + values = FileUtils.readFileToByteArray(f).toIterator + ) + } + } + + val clientProtocol = ClientProtocol(minReaderVersion = 1) + // This is specifically to set the size of the metadata. + val deltaMetadata = snapshotToUse.metadata + val clientMetadata = ClientMetadata( + id = deltaMetadata.id, + name = deltaMetadata.name, + description = deltaMetadata.description, + schemaString = deltaMetadata.schemaString, + configuration = deltaMetadata.configuration, + partitionColumns = deltaMetadata.partitionColumns, + size = totalSize + ) + lines += JsonUtils.toJson(clientProtocol.wrap) + lines += JsonUtils.toJson(clientMetadata.wrap) + clientAddFilesArrayBuffer.toSeq.foreach { clientAddFile => + lines += JsonUtils.toJson(clientAddFile.wrap) + } + + // Put the metadata in blockManager for DeltaSharingClient to return metadata when being asked. + DeltaSharingUtils.overrideIteratorBlock[String]( + blockId = TestClientForDeltaFormatSharing.getBlockId( + sharedTableName = sharedTable, + queryType = "getMetadata", + versionAsOf = versionAsOf + ), + values = Seq( + JsonUtils.toJson(clientProtocol.wrap), + JsonUtils.toJson(clientMetadata.wrap) + ).toIterator + ) + + // Put the delta log (list of actions) in blockManager for DeltaSharingClient to return as the + // http response when getFiles is called. + DeltaSharingUtils.overrideIteratorBlock[String]( + blockId = TestClientForDeltaFormatSharing.getBlockId( + sharedTableName = sharedTable, + queryType = "getFiles", + versionAsOf = versionAsOf + ), + values = lines.result().toIterator + ) + } + + // This function does 2 jobs: + // 1. Prepare the result for functions of delta sharing rest client, i.e., (Protocol, Metadata) + // for getMetadata, (Protocol, Metadata, and list of lines from delta actions) for getFiles, use + // BlockManager to store the data to make them available across different classes. + // 2. Put the parquet file in blockManager for DeltaSharingFileSystem to load bytes out of it. + private[spark] def prepareMockedClientAndFileSystemResult( + deltaTable: String, + sharedTable: String, + versionAsOf: Option[Long] = None, + timestampAsOf: Option[String] = None, + inlineDvFormat: Option[RoaringBitmapArrayFormat.Value] = None, + assertMultipleDvsInOneFile: Boolean = false, + reverseFileOrder: Boolean = false): Unit = { + val lines = Seq.newBuilder[String] + var totalSize = 0L + + // To prepare faked delta sharing responses with needed files for DeltaSharingClient. + val snapshotToUse = getSnapshotToUse(deltaTable, versionAsOf) + val fileActionsArrayBuffer = ArrayBuffer[model.DeltaSharingFileAction]() + val dvPathToCount = scala.collection.mutable.Map[String, Int]() + snapshotToUse.allFiles.collect().foreach { addFile => + if (assertMultipleDvsInOneFile) { + updateDvPathToCount(addFile, dvPathToCount) + } + + val updatedAdd = if (inlineDvFormat.isDefined) { + // Remove row 0 and 2 in the AddFile. + updateAddFileWithInlineDV(addFile, inlineDvFormat.get, RoaringBitmapArray(0L, 2L)) + } else { + addFile + } + + val dsAddFile = getDeltaSharingFileActionForAddFile( + updatedAdd, + sharedTable, + snapshotToUse.version, + snapshotToUse.timestamp + ) + totalSize = totalSize + addFile.size + fileActionsArrayBuffer += dsAddFile + } + val fileActionSeq = if (reverseFileOrder) { + fileActionsArrayBuffer.toSeq.sortWith(deltaSharingFileActionDecreaseOrderFunc) + } else { + fileActionsArrayBuffer.toSeq.sortWith(deltaSharingFileActionIncreaseOrderFunc) + } + var previousIdOpt: Option[String] = None + fileActionSeq.foreach { fileAction => + if (reverseFileOrder) { + assert( + // Using < instead of <= because there can be a removeFile and addFile pointing to the + // same parquet file which result in the same file id, since id is a hash of file path. + // This is ok because eventually it can read data out of the correct parquet file. + !previousIdOpt.exists(_ < fileAction.id), + s"fileActions must be in decreasing order by id: ${previousIdOpt} is not smaller than" + + s" ${fileAction.id}." + ) + previousIdOpt = Some(fileAction.id) + } + lines += fileAction.json + } + if (assertMultipleDvsInOneFile) { + assert(dvPathToCount.max._2 > 1) + } + + // Scan through the parquet files of the local delta table, and prepare the data of parquet file + // reading in DeltaSharingFileSystem. + val files = + FileUtils.listFiles(new File(snapshotToUse.deltaLog.dataPath.toUri()), null, true).asScala + files.foreach { f => + val filePath = f.getCanonicalPath + if (isDataFile(filePath)) { + // Put the parquet file in blockManager for DeltaSharingFileSystem to load bytes out of it. + DeltaSharingUtils.overrideIteratorBlock[Byte]( + blockId = TestDeltaSharingFileSystem.getBlockId(sharedTable, f.getName), + values = FileUtils.readFileToByteArray(f).toIterator + ) + } + } + + // This is specifically to set the size of the metadata. + val dsMetadata = DeltaSharingMetadata( + deltaMetadata = snapshotToUse.metadata, + size = totalSize + ) + val dsProtocol = DeltaSharingProtocol(deltaProtocol = snapshotToUse.protocol) + // Put the metadata in blockManager for DeltaSharingClient to return metadata when being asked. + DeltaSharingUtils.overrideIteratorBlock[String]( + blockId = TestClientForDeltaFormatSharing.getBlockId( + sharedTableName = sharedTable, + queryType = "getMetadata", + versionAsOf = versionAsOf, + timestampAsOf = timestampAsOf + ), + values = Seq(dsProtocol.json, dsMetadata.json).toIterator + ) + + lines += dsProtocol.json + lines += dsMetadata.json + // Put the delta log (list of actions) in blockManager for DeltaSharingClient to return as the + // http response when getFiles is called. + DeltaSharingUtils.overrideIteratorBlock[String]( + blockId = TestClientForDeltaFormatSharing.getBlockId( + sharedTableName = sharedTable, + queryType = "getFiles", + versionAsOf = versionAsOf, + timestampAsOf = timestampAsOf + ), + values = lines.result().toIterator + ) + } + + private[spark] def prepareMockedClientAndFileSystemResultForStreaming( + deltaTable: String, + sharedTable: String, + startingVersion: Long, + endingVersion: Long, + assertDVExists: Boolean = false): Unit = { + val actionLines = Seq.newBuilder[String] + + var maxVersion = -1L + var totalSize = 0L + + val deltaLog = DeltaLog.forTable(spark, new TableIdentifier(deltaTable)) + val startingSnapshot = deltaLog.getSnapshotAt(startingVersion) + actionLines += DeltaSharingProtocol(deltaProtocol = startingSnapshot.protocol).json + actionLines += DeltaSharingMetadata( + deltaMetadata = startingSnapshot.metadata, + version = startingVersion + ).json + + val logFiles = + FileUtils.listFiles(new File(deltaLog.logPath.toUri()), null, true).asScala + var dvExists = false + logFiles.foreach { f => + if (FileNames.isDeltaFile(new Path(f.getName))) { + val version = FileNames.getFileVersion(new Path(f.getName)) + if (version >= startingVersion && version <= endingVersion) { + // protocol/metadata are processed from startingSnapshot, only process versions greater + // than startingVersion for real actions and possible metadata changes. + maxVersion = maxVersion.max(version) + val timestamp = f.lastModified + + FileUtils.readLines(f).asScala.foreach { l => + val action = Action.fromJson(l) + action match { + case m: Metadata => + actionLines += DeltaSharingMetadata( + deltaMetadata = m, + version = version + ).json + case addFile: AddFile if addFile.dataChange => + // Convert from delta AddFile to DeltaSharingAddFile to serialize to json. + val dsAddFile = + getDeltaSharingFileActionForAddFile(addFile, sharedTable, version, timestamp) + dvExists = dvExists || (dsAddFile.deletionVectorFileId != null) + totalSize = totalSize + addFile.size + actionLines += dsAddFile.json + case removeFile: RemoveFile if removeFile.dataChange => + // scalastyle:off removeFile + val dsRemoveFile = getDeltaSharingFileActionForRemoveFile( + removeFile, + sharedTable, + version, + timestamp + ) + // scalastyle:on removeFile + dvExists = dvExists || (dsRemoveFile.deletionVectorFileId != null) + totalSize = totalSize + removeFile.size.getOrElse(0L) + actionLines += dsRemoveFile.json + case _ => // ignore all other actions such as CommitInfo. + } + } + } + } + } + val dataFiles = + FileUtils.listFiles(new File(deltaLog.dataPath.toUri()), null, true).asScala + dataFiles.foreach { f => + if (isDataFile(f.getCanonicalPath)) { + DeltaSharingUtils.overrideIteratorBlock[Byte]( + blockId = TestDeltaSharingFileSystem.getBlockId(sharedTable, f.getName), + values = FileUtils.readFileToByteArray(f).toIterator + ) + } + } + + if (assertDVExists) { + assert(dvExists, "There should be DV in the files returned from server.") + } + + DeltaSharingUtils.overrideIteratorBlock[String]( + blockId = TestClientForDeltaFormatSharing.getBlockId( + sharedTable, + s"getFiles_${startingVersion}_$endingVersion" + ), + values = actionLines.result().toIterator + ) + } + + + protected def getDeltaSharingClassesSQLConf: Map[String, String] = { + Map( + "fs.delta-sharing.impl" -> classOf[TestDeltaSharingFileSystem].getName, + "spark.delta.sharing.client.class" -> + classOf[TestClientForDeltaFormatSharing].getName, + "spark.delta.sharing.profile.provider.class" -> + "io.delta.sharing.client.DeltaSharingFileProfileProvider" + ) + } +} diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingFileIndexSuite.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingFileIndexSuite.scala index 9c1b4036fac..748ed8e4d63 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingFileIndexSuite.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingFileIndexSuite.scala @@ -30,7 +30,7 @@ import io.delta.sharing.client.util.JsonUtils import org.apache.commons.io.FileUtils import org.apache.hadoop.fs.Path -import org.apache.spark.{SparkConf, SparkEnv} +import org.apache.spark.SparkEnv import org.apache.spark.delta.sharing.{PreSignedUrlCache, PreSignedUrlFetcher} import org.apache.spark.sql.{QueryTest, SparkSession} import org.apache.spark.sql.catalyst.expressions.{ @@ -39,7 +39,6 @@ import org.apache.spark.sql.catalyst.expressions.{ Literal => SqlLiteral } import org.apache.spark.sql.delta.sharing.DeltaSharingTestSparkUtils -import org.apache.spark.sql.test.{SharedSparkSession} import org.apache.spark.sql.types.{FloatType, IntegerType} private object TestUtils { @@ -190,7 +189,7 @@ class TestDeltaSharingClientForFileIndex( class DeltaSharingFileIndexSuite extends QueryTest with DeltaSQLCommandTest - with SharedSparkSession + with DeltaSharingDataSourceDeltaTestUtils with DeltaSharingTestSparkUtils { import TestUtils._ @@ -292,14 +291,6 @@ class DeltaSharingFileIndexSuite } } - override protected def sparkConf: SparkConf = { - super.sparkConf - .set("spark.delta.sharing.preSignedUrl.expirationMs", defaultUrlExpirationMs.toString) - .set("spark.delta.sharing.driver.refreshCheckIntervalMs", "1000") - .set("spark.delta.sharing.driver.refreshThresholdMs", "2000") - .set("spark.delta.sharing.driver.accessThresholdToExpireMs", "60000") - } - test("refresh works") { PreSignedUrlCache.registerIfNeeded(SparkEnv.get) @@ -333,7 +324,7 @@ class DeltaSharingFileIndexSuite 1000 ) // sleep for expirationTimeMs to ensure that the urls are refreshed. - Thread.sleep(defaultUrlExpirationMs) + Thread.sleep(15000) // Verify that the url is refreshed as paths(1), not paths(0) anymore. assert(fetcher.getUrl == paths(1)) diff --git a/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala b/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala new file mode 100644 index 00000000000..06f8d8776eb --- /dev/null +++ b/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala @@ -0,0 +1,270 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sharing.spark + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.delta.util.JsonUtils +import io.delta.sharing.client.{ + DeltaSharingClient, + DeltaSharingProfileProvider, + DeltaSharingRestClient +} +import io.delta.sharing.client.model.{ + AddFile => ClientAddFile, + DeltaTableFiles, + DeltaTableMetadata, + SingleAction, + Table +} + +import org.apache.spark.SparkEnv +import org.apache.spark.storage.BlockId + +/** + * A mocked delta sharing client for DeltaFormatSharing. + * The test suite need to prepare the mocked delta sharing rpc response and store them in + * BlockManager. Then this client will just load the response of return upon rpc call. + */ +private[spark] class TestClientForDeltaFormatSharing( + profileProvider: DeltaSharingProfileProvider, + timeoutInSeconds: Int = 120, + numRetries: Int = 10, + maxRetryDuration: Long = Long.MaxValue, + sslTrustAll: Boolean = false, + forStreaming: Boolean = false, + responseFormat: String = DeltaSharingRestClient.RESPONSE_FORMAT_DELTA, + readerFeatures: String = "", + queryTablePaginationEnabled: Boolean = false, + maxFilesPerReq: Int = 100000) + extends DeltaSharingClient { + + assert( + responseFormat == DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET || + (readerFeatures.contains("deletionVectors") && readerFeatures.contains("columnMapping")), + "deletionVectors and columnMapping should be supported in all types of queries." + ) + + import TestClientForDeltaFormatSharing._ + + override def listAllTables(): Seq[Table] = throw new UnsupportedOperationException("not needed") + + override def getMetadata( + table: Table, + versionAsOf: Option[Long] = None, + timestampAsOf: Option[String] = None): DeltaTableMetadata = { + val iterator = SparkEnv.get.blockManager + .get[String](getBlockId(table.name, "getMetadata", versionAsOf, timestampAsOf)) + .map(_.data.asInstanceOf[Iterator[String]]) + .getOrElse { + throw new IllegalStateException( + s"getMetadata is missing for: ${table.name}, versionAsOf:$versionAsOf, " + + s"timestampAsOf:$timestampAsOf. This shouldn't happen in the unit test." + ) + } + // iterator.toSeq doesn't trigger CompletionIterator in BlockManager which releases the reader + // lock on the underlying block. iterator hasNext does trigger it. + val linesBuilder = Seq.newBuilder[String] + while (iterator.hasNext) { + linesBuilder += iterator.next() + } + if (table.name.contains("shared_parquet_table")) { + val lines = linesBuilder.result() + val protocol = JsonUtils.fromJson[SingleAction](lines(0)).protocol + val metadata = JsonUtils.fromJson[SingleAction](lines(1)).metaData + DeltaTableMetadata( + version = versionAsOf.getOrElse(getTableVersion(table)), + protocol = protocol, + metadata = metadata, + respondedFormat = DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET + ) + } else { + DeltaTableMetadata( + version = versionAsOf.getOrElse(getTableVersion(table)), + lines = linesBuilder.result(), + respondedFormat = DeltaSharingRestClient.RESPONSE_FORMAT_DELTA + ) + } + } + + override def getTableVersion(table: Table, startingTimestamp: Option[String] = None): Long = { + val versionOpt = SparkEnv.get.blockManager.getSingle[Long]( + getBlockId(table.name, "getTableVersion") + ) + val version = versionOpt.getOrElse { + throw new IllegalStateException( + s"getTableVersion is missing for: ${table.name}. This shouldn't happen in the unit test." + ) + } + SparkEnv.get.blockManager.releaseLock(getBlockId(table.name, "getTableVersion")) + version + } + + override def getFiles( + table: Table, + predicates: Seq[String], + limit: Option[Long], + versionAsOf: Option[Long], + timestampAsOf: Option[String], + jsonPredicateHints: Option[String], + refreshToken: Option[String] + ): DeltaTableFiles = { + limit.foreach(lim => TestClientForDeltaFormatSharing.limits.put( + s"${table.share}.${table.schema}.${table.name}", lim)) + TestClientForDeltaFormatSharing.requestedFormat.put( + s"${table.share}.${table.schema}.${table.name}", responseFormat) + val iterator = SparkEnv.get.blockManager + .get[String](getBlockId(table.name, "getFiles", versionAsOf, timestampAsOf)) + .map(_.data.asInstanceOf[Iterator[String]]) + .getOrElse { + throw new IllegalStateException( + s"getFiles is missing for: ${table.name} versionAsOf:$versionAsOf, " + + s"timestampAsOf:$timestampAsOf. This shouldn't happen in the unit test." + ) + } + // iterator.toSeq doesn't trigger CompletionIterator in BlockManager which releases the reader + // lock on the underlying block. iterator hasNext does trigger it. + val linesBuilder = Seq.newBuilder[String] + while (iterator.hasNext) { + linesBuilder += iterator.next() + } + if (table.name.contains("shared_parquet_table")) { + val lines = linesBuilder.result() + val protocol = JsonUtils.fromJson[SingleAction](lines(0)).protocol + val metadata = JsonUtils.fromJson[SingleAction](lines(1)).metaData + val files = ArrayBuffer[ClientAddFile]() + lines.drop(2).foreach { line => + val action = JsonUtils.fromJson[SingleAction](line) + if (action.file != null) { + files.append(action.file) + } else { + throw new IllegalStateException(s"Unexpected Line:${line}") + } + } + DeltaTableFiles( + versionAsOf.getOrElse(getTableVersion(table)), + protocol, + metadata, + files.toSeq, + respondedFormat = DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET + ) + } else { + DeltaTableFiles( + version = versionAsOf.getOrElse(getTableVersion(table)), + lines = linesBuilder.result(), + respondedFormat = DeltaSharingRestClient.RESPONSE_FORMAT_DELTA + ) + } + } + + override def getFiles( + table: Table, + startingVersion: Long, + endingVersion: Option[Long] + ): DeltaTableFiles = { + assert( + endingVersion.isDefined, + "endingVersion is not defined. This shouldn't happen in unit test." + ) + val iterator = SparkEnv.get.blockManager + .get[String](getBlockId(table.name, s"getFiles_${startingVersion}_${endingVersion.get}")) + .map(_.data.asInstanceOf[Iterator[String]]) + .getOrElse { + throw new IllegalStateException( + s"getFiles is missing for: ${table.name} with [${startingVersion}, " + + s"${endingVersion.get}]. This shouldn't happen in the unit test." + ) + } + // iterator.toSeq doesn't trigger CompletionIterator in BlockManager which releases the reader + // lock on the underlying block. iterator hasNext does trigger it. + val linesBuilder = Seq.newBuilder[String] + while (iterator.hasNext) { + linesBuilder += iterator.next() + } + DeltaTableFiles( + version = getTableVersion(table), + lines = linesBuilder.result(), + respondedFormat = DeltaSharingRestClient.RESPONSE_FORMAT_DELTA + ) + } + + override def getCDFFiles( + table: Table, + cdfOptions: Map[String, String], + includeHistoricalMetadata: Boolean + ): DeltaTableFiles = { + val suffix = cdfOptions + .get(DeltaSharingOptions.CDF_START_VERSION) + .getOrElse( + cdfOptions.get(DeltaSharingOptions.CDF_START_TIMESTAMP).get + ) + val iterator = SparkEnv.get.blockManager + .get[String]( + getBlockId( + table.name, + s"getCDFFiles_$suffix" + ) + ) + .map( + _.data.asInstanceOf[Iterator[String]] + ) + .getOrElse { + throw new IllegalStateException( + s"getCDFFiles is missing for: ${table.name}. This shouldn't happen in the unit test." + ) + } + // iterator.toSeq doesn't trigger CompletionIterator in BlockManager which releases the reader + // lock on the underlying block. iterator hasNext does trigger it. + val linesBuilder = Seq.newBuilder[String] + while (iterator.hasNext) { + linesBuilder += iterator.next() + } + DeltaTableFiles( + version = getTableVersion(table), + lines = linesBuilder.result(), + respondedFormat = DeltaSharingRestClient.RESPONSE_FORMAT_DELTA + ) + } + + override def getForStreaming(): Boolean = forStreaming + + override def getProfileProvider: DeltaSharingProfileProvider = profileProvider +} + +object TestClientForDeltaFormatSharing { + def getBlockId( + sharedTableName: String, + queryType: String, + versionAsOf: Option[Long] = None, + timestampAsOf: Option[String] = None): BlockId = { + assert(!(versionAsOf.isDefined && timestampAsOf.isDefined)) + val suffix = if (versionAsOf.isDefined) { + s"_v${versionAsOf.get}" + } else if (timestampAsOf.isDefined) { + s"_t${timestampAsOf.get}" + } else { + "" + } + BlockId( + s"${DeltaSharingUtils.DELTA_SHARING_BLOCK_ID_PREFIX}" + + s"_${sharedTableName}_$queryType$suffix" + ) + } + + val limits = scala.collection.mutable.Map[String, Long]() + val requestedFormat = scala.collection.mutable.Map[String, String]() +} diff --git a/sharing/src/test/scala/io/delta/sharing/spark/TestDeltaSharingFileSystem.scala b/sharing/src/test/scala/io/delta/sharing/spark/TestDeltaSharingFileSystem.scala new file mode 100644 index 00000000000..2d372afba75 --- /dev/null +++ b/sharing/src/test/scala/io/delta/sharing/spark/TestDeltaSharingFileSystem.scala @@ -0,0 +1,140 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sharing.spark + +import java.io.FileNotFoundException +import java.net.{URI, URLDecoder, URLEncoder} +import java.util.concurrent.TimeUnit + +import io.delta.sharing.client.DeltaSharingFileSystem +import org.apache.hadoop.fs._ +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.hadoop.util.Progressable + +import org.apache.spark.SparkEnv +import org.apache.spark.delta.sharing.{PreSignedUrlCache, PreSignedUrlFetcher} +import org.apache.spark.storage.BlockId + +/** + * Read-only file system for DeltaSharingDataSourceDeltaSuite. + * To replace DeltaSharingFileSystem and return the content for parquet files. + */ +private[spark] class TestDeltaSharingFileSystem extends FileSystem { + import TestDeltaSharingFileSystem._ + + private lazy val preSignedUrlCacheRef = PreSignedUrlCache.getEndpointRefInExecutor(SparkEnv.get) + + override def getScheme: String = SCHEME + + override def getUri(): URI = URI.create(s"$SCHEME:///") + + override def open(f: Path, bufferSize: Int): FSDataInputStream = { + val path = DeltaSharingFileSystem.decode(f) + val fetcher = + new PreSignedUrlFetcher( + preSignedUrlCacheRef, + path.tablePath, + path.fileId, + TimeUnit.MINUTES.toMillis(10) + ) + val (tableName, parquetFilePath) = decode(fetcher.getUrl()) + val arrayBuilder = Array.newBuilder[Byte] + val iterator = SparkEnv.get.blockManager + .get[Byte](getBlockId(tableName, parquetFilePath)) + .map( + _.data.asInstanceOf[Iterator[Byte]] + ) + .getOrElse { + throw new FileNotFoundException(f.toString) + } + while (iterator.hasNext) { + arrayBuilder += iterator.next() + } + new FSDataInputStream(new SeekableByteArrayInputStream(arrayBuilder.result())) + } + + override def create( + f: Path, + permission: FsPermission, + overwrite: Boolean, + bufferSize: Int, + replication: Short, + blockSize: Long, + progress: Progressable): FSDataOutputStream = + throw new UnsupportedOperationException("create") + + override def append(f: Path, bufferSize: Int, progress: Progressable): FSDataOutputStream = + throw new UnsupportedOperationException("append") + + override def rename(src: Path, dst: Path): Boolean = + throw new UnsupportedOperationException("rename") + + override def delete(f: Path, recursive: Boolean): Boolean = + throw new UnsupportedOperationException("delete") + + override def listStatus(f: Path): Array[FileStatus] = + throw new UnsupportedOperationException("listStatus") + + override def setWorkingDirectory(new_dir: Path): Unit = + throw new UnsupportedOperationException("setWorkingDirectory") + + override def getWorkingDirectory: Path = new Path(getUri) + + override def mkdirs(f: Path, permission: FsPermission): Boolean = + throw new UnsupportedOperationException("mkdirs") + + override def getFileStatus(f: Path): FileStatus = { + val resolved = makeQualified(f) + new FileStatus(DeltaSharingFileSystem.decode(resolved).fileSize, false, 0, 1, 0, f) + } + + override def close(): Unit = { + super.close() + } +} + +private[spark] object TestDeltaSharingFileSystem { + val SCHEME = "delta-sharing" + + def getBlockId(tableName: String, parquetFilePath: String): BlockId = { + BlockId( + s"${DeltaSharingUtils.DELTA_SHARING_BLOCK_ID_PREFIX}_" + + s"{$tableName}_$parquetFilePath" + ) + } + + // The encoded string is purely for testing purpose to contain the table name and file path, + // which will be decoded and used to find block in block manager. + // In real traffic, it will be a pre-signed url. + def encode(tableName: String, parquetFilePath: String): String = { + val encodedTableName = URLEncoder.encode(tableName, "UTF-8") + val encodedParquetFilePath = URLEncoder.encode(parquetFilePath, "UTF-8") + // SCHEME:/// is needed for making this path an absolute path + s"$SCHEME:///$encodedTableName/$encodedParquetFilePath" + } + + def decode(encodedPath: String): (String, String) = { + val Array(tableName, parquetFilePath) = encodedPath + .stripPrefix(s"$SCHEME:///") + .stripPrefix(s"$SCHEME:/") + .split("/") + .map( + URLDecoder.decode(_, "UTF-8") + ) + (tableName, parquetFilePath) + } +}