From b6f26473fcfc0fb6730986bf8b6f21d11ff5340b Mon Sep 17 00:00:00 2001 From: Lin Zhou Date: Tue, 9 Jan 2024 10:29:30 -0800 Subject: [PATCH 1/4] add cdf support --- .../DeltaFormatSharingLimitPushDown.scala | 53 ++ .../sharing/spark/DeltaSharingCDFUtils.scala | 105 +++ .../spark/DeltaSharingDataSource.scala | 275 ++++++ .../spark/DeltaSharingLogFileSystem.scala | 227 +++++ .../sharing/spark/DeltaSharingUtils.scala | 101 +++ .../spark/DeltaSharingCDFUtilsSuite.scala | 243 +++++ .../DeltaSharingDataSourceDeltaSuite.scala | 851 ++++++++++++++++++ ...DeltaSharingDataSourceDeltaTestUtils.scala | 654 ++++++++++++++ .../spark/DeltaSharingFileIndexSuite.scala | 15 +- .../TestClientForDeltaFormatSharing.scala | 270 ++++++ .../spark/TestDeltaSharingFileSystem.scala | 140 +++ 11 files changed, 2922 insertions(+), 12 deletions(-) create mode 100644 sharing/src/main/scala/io/delta/sharing/spark/DeltaFormatSharingLimitPushDown.scala create mode 100644 sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingCDFUtils.scala create mode 100644 sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala create mode 100644 sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingCDFUtilsSuite.scala create mode 100644 sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala create mode 100644 sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaTestUtils.scala create mode 100644 sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala create mode 100644 sharing/src/test/scala/io/delta/sharing/spark/TestDeltaSharingFileSystem.scala diff --git a/sharing/src/main/scala/io/delta/sharing/spark/DeltaFormatSharingLimitPushDown.scala b/sharing/src/main/scala/io/delta/sharing/spark/DeltaFormatSharingLimitPushDown.scala new file mode 100644 index 00000000000..589df97057a --- /dev/null +++ b/sharing/src/main/scala/io/delta/sharing/spark/DeltaFormatSharingLimitPushDown.scala @@ -0,0 +1,53 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sharing.spark + +import io.delta.sharing.client.util.ConfUtils + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.IntegerLiteral +import org.apache.spark.sql.catalyst.plans.logical.{LocalLimit, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} + +// A spark rule that applies limit pushdown to DeltaSharingFileIndex, when the config is enabled. +// To allow only fetching needed files from delta sharing server. +object DeltaFormatSharingLimitPushDown extends Rule[LogicalPlan] { + + def setup(spark: SparkSession): Unit = synchronized { + if (!spark.experimental.extraOptimizations.contains(DeltaFormatSharingLimitPushDown)) { + spark.experimental.extraOptimizations ++= Seq(DeltaFormatSharingLimitPushDown) + } + } + + def apply(p: LogicalPlan): LogicalPlan = { + p transform { + case localLimit @ LocalLimit( + literalExpr @ IntegerLiteral(limit), + l @ LogicalRelation( + r @ HadoopFsRelation(remoteIndex: DeltaSharingFileIndex, _, _, _, _, _), + _, + _, + _ + ) + ) if (ConfUtils.limitPushdownEnabled(p.conf) && remoteIndex.limitHint.isEmpty) => + val spark = SparkSession.active + val newRel = r.copy(location = remoteIndex.copy(limitHint = Some(limit)))(spark) + LocalLimit(literalExpr, l.copy(relation = newRel)) + } + } +} diff --git a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingCDFUtils.scala b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingCDFUtils.scala new file mode 100644 index 00000000000..d6be355562a --- /dev/null +++ b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingCDFUtils.scala @@ -0,0 +1,105 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sharing.spark + +import java.lang.ref.WeakReference +import java.nio.charset.StandardCharsets.UTF_8 + +import org.apache.spark.sql.delta.catalog.DeltaTableV2 +import com.google.common.hash.Hashing +import io.delta.sharing.client.DeltaSharingClient +import io.delta.sharing.client.model.{Table => DeltaSharingTable} +import org.apache.hadoop.fs.Path + +import org.apache.spark.delta.sharing.CachedTableManager +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.sources.BaseRelation + +object DeltaSharingCDFUtils extends Logging { + + private def getDuration(start: Long): Double = { + (System.currentTimeMillis() - start) / 1000.0 + } + + private[sharing] def prepareCDCRelation( + sqlContext: SQLContext, + options: DeltaSharingOptions, + table: DeltaSharingTable, + client: DeltaSharingClient): BaseRelation = { + val startTime = System.currentTimeMillis() + // 1. Get all files with DeltaSharingClient. + // includeHistoricalMetadata is always set to true, to get the metadata at the startingVersion + // and also any metadata changes between [startingVersion, endingVersion], to put them in the + // delta log. This is to allow delta library to check the metadata change and handle it + // properly -- currently it throws error for column mapping changes. + val deltaTableFiles = + client.getCDFFiles(table, options.cdfOptions, includeHistoricalMetadata = true) + logInfo( + s"Fetched ${deltaTableFiles.lines.size} lines with cdf options ${options.cdfOptions} " + + s"for table ${table} from delta sharing server, took ${getDuration(startTime)}s." + ) + + val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException) + // 2. Prepare local delta log + val queryCustomTablePath = client.getProfileProvider.getCustomTablePath(path) + val queryParamsHashId = DeltaSharingUtils.getQueryParamsHashId(options.cdfOptions) + val tablePathWithHashIdSuffix = + DeltaSharingUtils.getTablePathWithIdSuffix(queryCustomTablePath, queryParamsHashId) + val deltaLogMetadata = DeltaSharingLogFileSystem.constructLocalDeltaLogAcrossVersions( + lines = deltaTableFiles.lines, + customTablePath = tablePathWithHashIdSuffix, + startingVersionOpt = None, + endingVersionOpt = None + ) + + // 3. Register parquet file id to url mapping + CachedTableManager.INSTANCE.register( + // Using path instead of queryCustomTablePath because it will be customized within + // CachedTableManager. + tablePath = DeltaSharingUtils.getTablePathWithIdSuffix(path, queryParamsHashId), + idToUrl = deltaLogMetadata.idToUrl, + refs = Seq(new WeakReference(this)), + profileProvider = client.getProfileProvider, + refresher = DeltaSharingUtils.getRefresherForGetCDFFiles( + client = client, + table = table, + cdfOptions = options.cdfOptions + ), + expirationTimestamp = + if (CachedTableManager.INSTANCE + .isValidUrlExpirationTime(deltaLogMetadata.minUrlExpirationTimestamp)) { + deltaLogMetadata.minUrlExpirationTimestamp.get + } else { + System.currentTimeMillis() + CachedTableManager.INSTANCE.preSignedUrlExpirationMs + }, + refreshToken = None + ) + + // 4. return Delta + val localDeltaCdfOptions = Map( + DeltaSharingOptions.CDF_START_VERSION -> deltaLogMetadata.minVersion.toString, + DeltaSharingOptions.CDF_END_VERSION -> deltaLogMetadata.maxVersion.toString, + DeltaSharingOptions.CDF_READ_OPTION -> "true" + ) + DeltaTableV2( + spark = sqlContext.sparkSession, + path = DeltaSharingLogFileSystem.encode(tablePathWithHashIdSuffix), + options = localDeltaCdfOptions + ).toBaseRelation + } +} diff --git a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala new file mode 100644 index 00000000000..28fa1eb1206 --- /dev/null +++ b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala @@ -0,0 +1,275 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sharing.spark + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.delta.{ + DeltaColumnMapping, + DeltaErrors, + DeltaTableUtils => TahoeDeltaTableUtils +} +import org.apache.spark.sql.delta.commands.cdc.CDCReader +import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.schema.SchemaUtils +import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSQLConf} +import io.delta.sharing.client.{DeltaSharingClient, DeltaSharingRestClient} +import io.delta.sharing.client.model.{Table => DeltaSharingTable} +import io.delta.sharing.client.util.{ConfUtils, JsonUtils} +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkEnv +import org.apache.spark.delta.sharing.PreSignedUrlCache +import org.apache.spark.sql.{SparkSession, SQLContext} +import org.apache.spark.sql.execution.datasources.HadoopFsRelation +import org.apache.spark.sql.execution.streaming.Source +import org.apache.spark.sql.sources.{ + BaseRelation, + DataSourceRegister, + RelationProvider, + StreamSourceProvider +} +import org.apache.spark.sql.types.StructType + +/** + * A DataSource for Delta Sharing, used to support all types of queries on a delta sharing table: + * batch, cdf, streaming, time travel, filters, etc. + */ +private[sharing] class DeltaSharingDataSource + extends RelationProvider + with DataSourceRegister + with DeltaLogging { + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String]): BaseRelation = { + DeltaSharingDataSource.setupFileSystem(sqlContext) + val options = new DeltaSharingOptions(parameters) + + val userInputResponseFormat = options.options.get(DeltaSharingOptions.RESPONSE_FORMAT) + if (userInputResponseFormat.isEmpty && !options.readChangeFeed) { + return autoResolveBaseRelationForSnapshotQuery(options) + } + + val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException) + if (options.responseFormat == DeltaSharingOptions.RESPONSE_FORMAT_PARQUET) { + // When user explicitly set responseFormat=parquet, to query shared tables without advanced + // delta features. + logInfo(s"createRelation with parquet format for table path:$path, parameters:$parameters") + val deltaLog = RemoteDeltaLog( + path, + forStreaming = false, + responseFormat = options.responseFormat + ) + deltaLog.createRelation( + options.versionAsOf, + options.timestampAsOf, + options.cdfOptions + ) + } else if (options.responseFormat == DeltaSharingOptions.RESPONSE_FORMAT_DELTA) { + // When user explicitly set responseFormat=delta, to query shared tables with advanced + // delta features. + logInfo(s"createRelation with delta format for table path:$path, parameters:$parameters") + // 1. create delta sharing client + val parsedPath = DeltaSharingRestClient.parsePath(path) + val client = DeltaSharingRestClient( + profileFile = parsedPath.profileFile, + forStreaming = false, + responseFormat = options.responseFormat, + // comma separated delta reader features, used to tell delta sharing server what delta + // reader features the client is able to process. + readerFeatures = DeltaSharingUtils.SUPPORTED_READER_FEATURES.mkString(",") + ) + val dsTable = DeltaSharingTable( + share = parsedPath.share, + schema = parsedPath.schema, + name = parsedPath.table + ) + + if (options.readChangeFeed) { + return DeltaSharingCDFUtils.prepareCDCRelation(sqlContext, options, dsTable, client) + } + // 2. getMetadata for schema to be used in the file index. + val deltaTableMetadata = DeltaSharingUtils.queryDeltaTableMetadata( + client = client, + table = dsTable, + versionAsOf = options.versionAsOf, + timestampAsOf = options.timestampAsOf + ) + val deltaSharingTableMetadata = DeltaSharingUtils.getDeltaSharingTableMetadata( + table = dsTable, + deltaTableMetadata = deltaTableMetadata + ) + + // 3. Prepare HadoopFsRelation + getHadoopFsRelationForDeltaSnapshotQuery( + path = path, + options = options, + dsTable = dsTable, + client = client, + deltaSharingTableMetadata = deltaSharingTableMetadata + ) + } else { + throw new UnsupportedOperationException( + s"responseformat(${options.responseFormat}) is not supported in delta sharing." + ) + } + } + + /** + * "parquet format sharing" leverages the existing set of remote classes to directly handle the + * list of presigned urls and read data. + * "delta format sharing" instead constructs a local delta log and leverages the delta library to + * read data. + * Firstly we sends a getMetadata call to the delta sharing server the suggested response format + * of the shared table by the server (based on whether there are advanced delta features in the + * shared table), and then decide the code path on the client side. + */ + private def autoResolveBaseRelationForSnapshotQuery( + options: DeltaSharingOptions): BaseRelation = { + val path = options.options.getOrElse("path", throw DeltaSharingErrors.pathNotSpecifiedException) + val parsedPath = DeltaSharingRestClient.parsePath(path) + + val client = DeltaSharingRestClient( + profileFile = parsedPath.profileFile, + forStreaming = false, + // Indicating that the client is able to process response format in both parquet and delta. + responseFormat = s"${DeltaSharingOptions.RESPONSE_FORMAT_PARQUET}," + + s"${DeltaSharingOptions.RESPONSE_FORMAT_DELTA}", + // comma separated delta reader features, used to tell delta sharing server what delta + // reader features the client is able to process. + readerFeatures = DeltaSharingUtils.SUPPORTED_READER_FEATURES.mkString(",") + ) + val dsTable = DeltaSharingTable( + name = parsedPath.table, + schema = parsedPath.schema, + share = parsedPath.share + ) + + val deltaTableMetadata = DeltaSharingUtils.queryDeltaTableMetadata( + client = client, + table = dsTable, + versionAsOf = options.versionAsOf, + timestampAsOf = options.timestampAsOf + ) + + if (deltaTableMetadata.respondedFormat == DeltaSharingOptions.RESPONSE_FORMAT_PARQUET) { + val deltaLog = RemoteDeltaLog( + path = path, + forStreaming = false, + responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_PARQUET, + initDeltaTableMetadata = Some(deltaTableMetadata) + ) + deltaLog.createRelation(options.versionAsOf, options.timestampAsOf, options.cdfOptions) + } else if (deltaTableMetadata.respondedFormat == DeltaSharingOptions.RESPONSE_FORMAT_DELTA) { + val deltaSharingTableMetadata = DeltaSharingUtils.getDeltaSharingTableMetadata( + table = dsTable, + deltaTableMetadata = deltaTableMetadata + ) + val deltaOnlyClient = DeltaSharingRestClient( + profileFile = parsedPath.profileFile, + forStreaming = false, + // Indicating that the client request delta format in response. + responseFormat = DeltaSharingOptions.RESPONSE_FORMAT_DELTA, + // comma separated delta reader features, used to tell delta sharing server what delta + // reader features the client is able to process. + readerFeatures = DeltaSharingUtils.SUPPORTED_READER_FEATURES.mkString(",") + ) + getHadoopFsRelationForDeltaSnapshotQuery( + path = path, + options = options, + dsTable = dsTable, + client = deltaOnlyClient, + deltaSharingTableMetadata = deltaSharingTableMetadata + ) + } else { + throw new UnsupportedOperationException( + s"Unexpected respondedFormat for getMetadata rpc:${deltaTableMetadata.respondedFormat}." + ) + } + } + + /** + * Prepare a HadoopFsRelation for the snapshot query on a delta sharing table. It will contain a + * DeltaSharingFileIndex which is used to handle delta sharing rpc, and construct the local delta + * log, and then build a TahoeFileIndex on top of the delta log. + */ + private def getHadoopFsRelationForDeltaSnapshotQuery( + path: String, + options: DeltaSharingOptions, + dsTable: DeltaSharingTable, + client: DeltaSharingClient, + deltaSharingTableMetadata: DeltaSharingUtils.DeltaSharingTableMetadata): BaseRelation = { + // Prepare DeltaSharingFileIndex + val spark = SparkSession.active + val params = new DeltaSharingFileIndexParams( + new Path(path), + spark, + deltaSharingTableMetadata.metadata, + options + ) + if (ConfUtils.limitPushdownEnabled(spark.sessionState.conf)) { + DeltaFormatSharingLimitPushDown.setup(spark) + } + // limitHint is always None here and will be overridden in DeltaFormatSharingLimitPushDown. + val fileIndex = DeltaSharingFileIndex( + params = params, + table = dsTable, + client = client, + limitHint = None + ) + + // return HadoopFsRelation with the DeltaSharingFileIndex. + HadoopFsRelation( + location = fileIndex, + // This is copied from DeltaLog.buildHadoopFsRelationWithFileIndex. + // Dropping column mapping metadata because it is not relevant for partition schema. + partitionSchema = DeltaColumnMapping.dropColumnMappingMetadata(fileIndex.partitionSchema), + // This is copied from DeltaLog.buildHadoopFsRelationWithFileIndex, original comment: + // We pass all table columns as `dataSchema` so that Spark will preserve the partition + // column locations. Otherwise, for any partition columns not in `dataSchema`, Spark would + // just append them to the end of `dataSchema`. + dataSchema = DeltaColumnMapping.dropColumnMappingMetadata( + TahoeDeltaTableUtils.removeInternalMetadata( + spark, + SchemaUtils.dropNullTypeColumns(deltaSharingTableMetadata.metadata.schema) + ) + ), + bucketSpec = None, + // Handle column mapping metadata in schema. + fileFormat = fileIndex.fileFormat( + deltaSharingTableMetadata.protocol.deltaProtocol, + deltaSharingTableMetadata.metadata.deltaMetadata + ), + options = Map.empty + )(spark) + } + + override def shortName(): String = "deltaSharing" +} + +private[sharing] object DeltaSharingDataSource { + def setupFileSystem(sqlContext: SQLContext): Unit = { + sqlContext.sparkContext.hadoopConfiguration + .setIfUnset("fs.delta-sharing.impl", "io.delta.sharing.client.DeltaSharingFileSystem") + sqlContext.sparkContext.hadoopConfiguration + .setIfUnset( + "fs.delta-sharing-log.impl", + "io.delta.sharing.spark.DeltaSharingLogFileSystem" + ) + PreSignedUrlCache.registerIfNeeded(SparkEnv.get) + } +} diff --git a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingLogFileSystem.scala b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingLogFileSystem.scala index 7214f6368b5..50f9c99b3b1 100644 --- a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingLogFileSystem.scala +++ b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingLogFileSystem.scala @@ -353,6 +353,233 @@ private[sharing] object DeltaSharingLogFileSystem extends Logging { } + /** + * Construct local delta log based on lines returned from delta sharing server. + * + * @param lines a list of delta actions, to be processed and put in the local delta log, + * each action contains a version field to indicate the version of log to + * put it in. + * @param customTablePath query customized table path, used to construct action.path field for + * DeltaSharingFileSystem + * @param startingVersionOpt If set, used to construct the delta file (.json log file) from the + * given startingVersion. This is needed by DeltaSharingSource to + * construct the delta log for the rpc no matter if there are files in + * that version or not, so DeltaSource can read delta actions from the + * starting version (instead from checkpoint). + * @param endingVersionOpt If set, used to construct the delta file (.json log file) until the + * given endingVersion. This is needed by DeltaSharingSource to construct + * the delta log for the rpc no matter if there are files in that version + * or not. + * NOTE: DeltaSource will not advance the offset if there are no files in + * a version of the delta log, but we still create the delta log file for + * that version to avoid missing delta log (json) files. + * @return ConstructedDeltaLogMetadata, which contains 3 fields: + * - idToUrl: mapping from file id to pre-signed url + * - minUrlExpirationTimestamp timestamp indicating the when to refresh pre-signed urls. + * Both are used to register to CachedTableManager. + * - maxVersion: the max version returned in the http response, used by + * DeltaSharingSource to quickly understand the progress of rpcs from the server. + */ + def constructLocalDeltaLogAcrossVersions( + lines: Seq[String], + customTablePath: String, + startingVersionOpt: Option[Long], + endingVersionOpt: Option[Long]): ConstructedDeltaLogMetadata = { + val startTime = System.currentTimeMillis() + assert( + startingVersionOpt.isDefined == endingVersionOpt.isDefined, + s"startingVersionOpt($startingVersionOpt) and endingVersionOpt($endingVersionOpt) should be" + + " both defined or not." + ) + if (startingVersionOpt.isDefined) { + assert( + startingVersionOpt.get <= endingVersionOpt.get, + s"startingVersionOpt($startingVersionOpt) must be smaller than " + + s"endingVersionOpt($endingVersionOpt)." + ) + } + var minVersion = Long.MaxValue + var maxVersion = 0L + var minUrlExpirationTimestamp: Option[Long] = None + val idToUrl = scala.collection.mutable.Map[String, String]() + val versionToDeltaSharingFileActions = + scala.collection.mutable.Map[Long, ArrayBuffer[model.DeltaSharingFileAction]]() + val versionToMetadata = scala.collection.mutable.Map[Long, model.DeltaSharingMetadata]() + val versionToJsonLogBuilderMap = scala.collection.mutable.Map[Long, ArrayBuffer[String]]() + val versionToJsonLogSize = scala.collection.mutable.Map[Long, Long]().withDefaultValue(0L) + var numFileActionsInMinVersion = 0 + val versionToTimestampMap = scala.collection.mutable.Map[Long, Long]() + var startingMetadataLineOpt: Option[String] = None + var startingProtocolLineOpt: Option[String] = None + + lines.foreach { line => + val action = JsonUtils.fromJson[model.DeltaSharingSingleAction](line).unwrap + action match { + case fileAction: model.DeltaSharingFileAction => + minVersion = minVersion.min(fileAction.version) + maxVersion = maxVersion.max(fileAction.version) + // Store file actions in an array to sort them based on id later. + versionToDeltaSharingFileActions.getOrElseUpdate( + fileAction.version, + ArrayBuffer[model.DeltaSharingFileAction]() + ) += fileAction + case metadata: model.DeltaSharingMetadata => + if (metadata.version != null) { + // This is to handle the cdf and streaming query result. + minVersion = minVersion.min(metadata.version) + maxVersion = maxVersion.max(metadata.version) + versionToMetadata(metadata.version) = metadata + if (metadata.version == minVersion) { + startingMetadataLineOpt = Some(metadata.deltaMetadata.json + "\n") + } + } else { + // This is to handle the snapshot query result from DeltaSharingSource. + startingMetadataLineOpt = Some(metadata.deltaMetadata.json + "\n") + } + case protocol: model.DeltaSharingProtocol => + startingProtocolLineOpt = Some(protocol.deltaProtocol.json + "\n") + case _ => // do nothing, ignore the line. + } + } + + if (startingVersionOpt.isDefined) { + minVersion = minVersion.min(startingVersionOpt.get) + } else if (minVersion == Long.MaxValue) { + // This means there are no files returned from server for this cdf request. + // A 0.json file will be prepared with metadata and protocol only. + minVersion = 0 + } + if (endingVersionOpt.isDefined) { + maxVersion = maxVersion.max(endingVersionOpt.get) + } + // Store the starting protocol and metadata in the minVersion.json. + val protocolAndMetadataStr = startingMetadataLineOpt.getOrElse("") + startingProtocolLineOpt + .getOrElse("") + versionToJsonLogBuilderMap.getOrElseUpdate( + minVersion, + ArrayBuffer[String]() + ) += protocolAndMetadataStr + versionToJsonLogSize(minVersion) += protocolAndMetadataStr.length + numFileActionsInMinVersion = versionToDeltaSharingFileActions + .getOrElseUpdate(minVersion, ArrayBuffer[model.DeltaSharingFileAction]()) + .size + + // Write metadata to the delta log json file. + versionToMetadata.foreach { + case (version, metadata) => + if (version != minVersion) { + val metadataStr = metadata.deltaMetadata.json + "\n" + versionToJsonLogBuilderMap.getOrElseUpdate( + version, + ArrayBuffer[String]() + ) += metadataStr + versionToJsonLogSize(version) += metadataStr.length + } + } + // Write file actions to the delta log json file. + var previousIdOpt: Option[String] = None + versionToDeltaSharingFileActions.foreach { + case (version, actions) => + previousIdOpt = None + actions.toSeq.sortWith(deltaSharingFileActionIncreaseOrderFunc).foreach { fileAction => + assert( + // Using > instead of >= because there can be a removeFile and addFile pointing to the + // same parquet file which result in the same file id, since id is a hash of file path. + // This is ok because eventually it can read data out of the correct parquet file. + !previousIdOpt.exists(_ > fileAction.id), + s"fileActions must be in increasing order by id: ${previousIdOpt} is not smaller than" + + s" ${fileAction.id}, in version:$version." + ) + previousIdOpt = Some(fileAction.id) + + // 1. build it to url mapping + idToUrl(fileAction.id) = fileAction.path + if (requiresIdToUrlForDV(fileAction.getDeletionVectorOpt)) { + idToUrl(fileAction.deletionVectorFileId) = + fileAction.getDeletionVectorOpt.get.pathOrInlineDv + } + + // 2. prepare json log content. + versionToTimestampMap.getOrElseUpdate(version, fileAction.timestamp) + val actionJsonStr = getActionWithDeltaSharingPath(fileAction, customTablePath) + "\n" + versionToJsonLogBuilderMap.getOrElseUpdate( + version, + ArrayBuffer[String]() + ) += actionJsonStr + versionToJsonLogSize(version) += actionJsonStr.length + + // 3. process expiration timestamp + if (fileAction.expirationTimestamp != null) { + minUrlExpirationTimestamp = minUrlExpirationTimestamp + .filter(_ < fileAction.expirationTimestamp) + .orElse(Some(fileAction.expirationTimestamp)) + } + } + } + + val encodedTablePath = DeltaSharingLogFileSystem.encode(customTablePath) + val deltaLogPath = s"${encodedTablePath.toString}/_delta_log" + val fileSizeTsSeq = Seq.newBuilder[DeltaSharingLogFileStatus] + + if (minVersion > 0) { + // If the minVersion is not 0 in the response, then prepare checkpoint at minVersion - 1: + // need to prepare two files: 1) (minVersion-1).checkpoint.parquet 2) _last_checkpoint + val checkpointVersion = minVersion - 1 + + // 1) store the checkpoint byte array in BlockManager for future read. + val checkpointParquetFileName = + FileNames.checkpointFileSingular(new Path(deltaLogPath), checkpointVersion).toString + fileSizeTsSeq += DeltaSharingLogFileStatus( + path = checkpointParquetFileName, + size = FAKE_CHECKPOINT_BYTE_ARRAY.size, + modificationTime = 0L + ) + + // 2) Prepare the content for _last_checkpoint + val lastCheckpointContent = + s"""{"version":${checkpointVersion},"size":${FAKE_CHECKPOINT_BYTE_ARRAY.size}}""" + val lastCheckpointPath = new Path(deltaLogPath, "_last_checkpoint").toString + fileSizeTsSeq += DeltaSharingLogFileStatus( + path = lastCheckpointPath, + size = lastCheckpointContent.length, + modificationTime = 0L + ) + DeltaSharingUtils.overrideSingleBlock[String]( + blockId = getDeltaSharingLogBlockId(lastCheckpointPath), + value = lastCheckpointContent + ) + } + + for (version <- minVersion to maxVersion) { + val jsonFilePath = FileNames.deltaFile(new Path(deltaLogPath), version).toString + DeltaSharingUtils.overrideIteratorBlock[String]( + getDeltaSharingLogBlockId(jsonFilePath), + versionToJsonLogBuilderMap.getOrElse(version, Seq.empty).toIterator + ) + fileSizeTsSeq += DeltaSharingLogFileStatus( + path = jsonFilePath, + size = versionToJsonLogSize.getOrElse(version, 0), + modificationTime = versionToTimestampMap.get(version).getOrElse(0L) + ) + } + + DeltaSharingUtils.overrideIteratorBlock[DeltaSharingLogFileStatus]( + getDeltaSharingLogBlockId(deltaLogPath), + fileSizeTsSeq.result().toIterator + ) + logInfo( + s"It takes ${(System.currentTimeMillis() - startTime) / 1000.0}s to construct delta log" + + s"for $customTablePath from $minVersion to $maxVersion, with ${idToUrl.toMap.size} urls." + ) + ConstructedDeltaLogMetadata( + idToUrl = idToUrl.toMap, + minUrlExpirationTimestamp = minUrlExpirationTimestamp, + numFileActionsInMinVersionOpt = Some(numFileActionsInMinVersion), + minVersion = minVersion, + maxVersion = maxVersion + ) + } + /** Set the modificationTime to zero, this is to align with the time returned from * DeltaSharingFileSystem.getFileStatus */ diff --git a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala index 4e01408d294..0d23577a3a7 100644 --- a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala +++ b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala @@ -44,6 +44,9 @@ import org.apache.spark.storage.{BlockId, StorageLevel} object DeltaSharingUtils extends Logging { + val SUPPORTED_READER_FEATURES: Seq[String] = + Seq(DeletionVectorsTableFeature.name, ColumnMappingTableFeature.name) + // The prefix will be used for block ids of all blocks that store the delta log in BlockManager. // It's used to ensure delta sharing queries don't mess up with blocks with other applications. val DELTA_SHARING_BLOCK_ID_PREFIX = "test_delta-sharing" @@ -58,6 +61,57 @@ object DeltaSharingUtils extends Logging { metadata: model.DeltaSharingMetadata ) + def queryDeltaTableMetadata( + client: DeltaSharingClient, + table: Table, + versionAsOf: Option[Long] = None, + timestampAsOf: Option[String] = None): DeltaTableMetadata = { + val deltaTableMetadata = client.getMetadata(table, versionAsOf, timestampAsOf) + logInfo( + s"getMetadata returned in ${deltaTableMetadata.respondedFormat} format for table " + + s"$table with v_${versionAsOf.map(_.toString).getOrElse("None")} " + + s"t_${timestampAsOf.getOrElse("None")} from delta sharing server." + ) + deltaTableMetadata + } + + /** + * parse the protocol and metadata from rpc response for getMetadata. + */ + def getDeltaSharingTableMetadata( + table: Table, + deltaTableMetadata: DeltaTableMetadata): DeltaSharingTableMetadata = { + + var metadataOption: Option[model.DeltaSharingMetadata] = None + var protocolOption: Option[model.DeltaSharingProtocol] = None + + deltaTableMetadata.lines + .map( + JsonUtils.fromJson[model.DeltaSharingSingleAction](_).unwrap + ) + .foreach { + case m: model.DeltaSharingMetadata => metadataOption = Some(m) + case p: model.DeltaSharingProtocol => protocolOption = Some(p) + case _ => // ignore other lines + } + + DeltaSharingTableMetadata( + version = deltaTableMetadata.version, + protocol = protocolOption.getOrElse { + throw new IllegalStateException( + s"Failed to get Protocol for ${table.toString}, " + + s"response from server:${deltaTableMetadata.lines}." + ) + }, + metadata = metadataOption.getOrElse { + throw new IllegalStateException( + s"Failed to get Metadata for ${table.toString}, " + + s"response from server:${deltaTableMetadata.lines}." + ) + } + ) + } + private def getTableRefreshResult(tableFiles: DeltaTableFiles): TableRefreshResult = { var minUrlExpiration: Option[Long] = None val idToUrl = tableFiles.lines @@ -108,6 +162,44 @@ object DeltaSharingUtils extends Logging { } } + /** + * Get the refresher function for a delta sharing table who calls client.getCDFFiles with the + * provided parameters. + * + * @return A refresher function used by the CachedTableManager to refresh urls. + */ + def getRefresherForGetCDFFiles( + client: DeltaSharingClient, + table: Table, + cdfOptions: Map[String, String]): RefresherFunction = { (_: Option[String]) => + { + val tableFiles = client.getCDFFiles( + table = table, + cdfOptions = cdfOptions, + includeHistoricalMetadata = true + ) + getTableRefreshResult(tableFiles) + } + } + + /** + * Get the refresher function for a delta sharing table who calls client.getFiles with the + * provided parameters. + * + * @return A refresher function used by the CachedTableManager to refresh urls. + */ + def getRefresherForGetFilesWithStartingVersion( + client: DeltaSharingClient, + table: Table, + startingVersion: Long, + endingVersion: Option[Long]): RefresherFunction = { (_: Option[String]) => + { + val tableFiles = client + .getFiles(table = table, startingVersion = startingVersion, endingVersion = endingVersion) + getTableRefreshResult(tableFiles) + } + } + def overrideSingleBlock[T: ClassTag](blockId: BlockId, value: T): Unit = { assert( blockId.name.startsWith(DELTA_SHARING_BLOCK_ID_PREFIX), @@ -151,6 +243,15 @@ object DeltaSharingUtils extends Logging { Hashing.sha256().hashString(fullQueryString, UTF_8).toString } + // Get a query hash id based on the query parameters: cdfOptions. + // The id concatenated with table name and used in local DeltaLoc and CachedTableManager. + // This is to uniquely identify the delta sharing table used twice in the same query but with + // different query parameters, so we can differentiate their delta log and entries in the + // CachedTableManager. + private[sharing] def getQueryParamsHashId(cdfOptions: Map[String, String]): String = { + Hashing.sha256().hashString(cdfOptions.toString, UTF_8).toString + } + // Concatenate table path with an id as a suffix, to uniquely identify a delta sharing table and // its corresponding delta log in a query. private[sharing] def getTablePathWithIdSuffix(customTablePath: String, id: String): String = { diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingCDFUtilsSuite.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingCDFUtilsSuite.scala new file mode 100644 index 00000000000..aa8d54d2b48 --- /dev/null +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingCDFUtilsSuite.scala @@ -0,0 +1,243 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sharing.spark + +import java.io.File + +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import io.delta.sharing.client.{ + DeltaSharingClient, + DeltaSharingProfileProvider, + DeltaSharingRestClient +} +import io.delta.sharing.client.model.{DeltaTableFiles, DeltaTableMetadata, Table} +import org.apache.commons.io.FileUtils +import org.apache.hadoop.fs.Path + +import org.apache.spark.{SparkConf, SparkEnv} +import org.apache.spark.delta.sharing.{PreSignedUrlCache, PreSignedUrlFetcher} +import org.apache.spark.sql.{QueryTest, SparkSession} +import org.apache.spark.sql.delta.sharing.DeltaSharingTestSparkUtils +import org.apache.spark.sql.test.{SharedSparkSession} + +private object CDFTesTUtils { + val paths = Seq("http://path1", "http://path2") + + val SparkConfForReturnExpTime = "spark.delta.sharing.fileindexsuite.returnexptime" + + // 10 seconds + val expirationTimeMs = 10000 + + def getExpirationTimestampStr(returnExpTime: Boolean): String = { + if (returnExpTime) { + s""""expirationTimestamp":${System.currentTimeMillis() + expirationTimeMs},""" + } else { + "" + } + } + + // scalastyle:off line.size.limit + val fileStr1Id = "11d9b72771a72f178a6f2839f7f08528" + val metaDataStr = + """{"metaData":{"size":809,"deltaMetadata":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c2"],"configuration":{"delta.enableChangeDataFeed":"true"},"createdTime":1691734718560}}}""" + def getAddFileStr1(path: String, returnExpTime: Boolean = false): String = { + s"""{"file":{"id":"11d9b72771a72f178a6f2839f7f08528",${getExpirationTimestampStr( + returnExpTime + )}"deltaSingleAction":{"add":{"path":"${path}",""" + """"partitionValues":{"c2":"one"},"size":809,"modificationTime":1691734726073,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"c1\":1,\"c2\":\"one\"},\"maxValues\":{\"c1\":2,\"c2\":\"one\"},\"nullCount\":{\"c1\":0,\"c2\":0}}","tags":{"INSERTION_TIME":"1691734726073000","MIN_INSERTION_TIME":"1691734726073000","MAX_INSERTION_TIME":"1691734726073000","OPTIMIZE_TARGET_SIZE":"268435456"}}}}}""" + } + def getAddFileStr2(returnExpTime: Boolean = false): String = { + s"""{"file":{"id":"22d9b72771a72f178a6f2839f7f08529",${getExpirationTimestampStr( + returnExpTime + )}""" + """"deltaSingleAction":{"add":{"path":"http://path2","partitionValues":{"c2":"two"},"size":809,"modificationTime":1691734726073,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"c1\":1,\"c2\":\"two\"},\"maxValues\":{\"c1\":2,\"c2\":\"two\"},\"nullCount\":{\"c1\":0,\"c2\":0}}","tags":{"INSERTION_TIME":"1691734726073000","MIN_INSERTION_TIME":"1691734726073000","MAX_INSERTION_TIME":"1691734726073000","OPTIMIZE_TARGET_SIZE":"268435456"}}}}}""" + } + // scalastyle:on line.size.limit +} + +/** + * A mocked delta sharing client for unit tests. + */ +class TestDeltaSharingClientForCDFUtils( + profileProvider: DeltaSharingProfileProvider, + timeoutInSeconds: Int = 120, + numRetries: Int = 10, + maxRetryDuration: Long = Long.MaxValue, + sslTrustAll: Boolean = false, + forStreaming: Boolean = false, + responseFormat: String = DeltaSharingRestClient.RESPONSE_FORMAT_DELTA, + readerFeatures: String = "", + queryTablePaginationEnabled: Boolean = false, + maxFilesPerReq: Int = 100000) + extends DeltaSharingClient { + + import CDFTesTUtils._ + + private lazy val returnExpirationTimestamp = SparkSession.active.sessionState.conf + .getConfString( + SparkConfForReturnExpTime + ) + .toBoolean + + var numGetFileCalls: Int = -1 + + override def listAllTables(): Seq[Table] = throw new UnsupportedOperationException("not needed") + + override def getMetadata( + table: Table, + versionAsOf: Option[Long], + timestampAsOf: Option[String]): DeltaTableMetadata = { + throw new UnsupportedOperationException("getMetadata is not supported now.") + } + + override def getTableVersion(table: Table, startingTimestamp: Option[String] = None): Long = { + throw new UnsupportedOperationException("getTableVersion is not supported now.") + } + + override def getFiles( + table: Table, + predicates: Seq[String], + limit: Option[Long], + versionAsOf: Option[Long], + timestampAsOf: Option[String], + jsonPredicateHints: Option[String], + refreshToken: Option[String] + ): DeltaTableFiles = { + throw new UnsupportedOperationException("getFiles is not supported now.") + } + + override def getFiles( + table: Table, + startingVersion: Long, + endingVersion: Option[Long] + ): DeltaTableFiles = { + throw new UnsupportedOperationException(s"getFiles with startingVersion($startingVersion)") + } + + override def getCDFFiles( + table: Table, + cdfOptions: Map[String, String], + includeHistoricalMetadata: Boolean + ): DeltaTableFiles = { + numGetFileCalls += 1 + DeltaTableFiles( + version = 0, + lines = Seq[String]( + """{"protocol":{"deltaProtocol":{"minReaderVersion": 1, "minWriterVersion": 1}}}""", + metaDataStr, + getAddFileStr1(paths(numGetFileCalls.min(1)), returnExpirationTimestamp), + getAddFileStr2(returnExpirationTimestamp) + ), + respondedFormat = DeltaSharingRestClient.RESPONSE_FORMAT_DELTA + ) + } + + override def getForStreaming(): Boolean = forStreaming + + override def getProfileProvider: DeltaSharingProfileProvider = profileProvider +} + +class DeltaSharingCDFUtilsSuite + extends QueryTest + with DeltaSQLCommandTest + with SharedSparkSession + with DeltaSharingTestSparkUtils { + + import CDFTesTUtils._ + + private val shareName = "share" + private val schemaName = "default" + private val sharedTableName = "table" + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.delta.sharing.preSignedUrl.expirationMs", expirationTimeMs.toString) + .set("spark.delta.sharing.driver.refreshCheckIntervalMs", "1000") + .set("spark.delta.sharing.driver.refreshThresholdMs", "2000") + .set("spark.delta.sharing.driver.accessThresholdToExpireMs", "60000") + } + + test("refresh works") { + PreSignedUrlCache.registerIfNeeded(SparkEnv.get) + + withTempDir { tempDir => + val profileFile = new File(tempDir, "foo.share") + FileUtils.writeStringToFile( + profileFile, + s"""{ + | "shareCredentialsVersion": 1, + | "endpoint": "https://localhost:12345/not-used-endpoint", + | "bearerToken": "mock" + |}""".stripMargin, + "utf-8" + ) + + def test(): Unit = { + val profilePath = profileFile.getCanonicalPath + val tablePath = new Path(s"$profilePath#$shareName.$schemaName.$sharedTableName") + val client = DeltaSharingRestClient(profilePath, false, "delta") + val dsTable = Table(share = shareName, schema = schemaName, name = sharedTableName) + + val options = new DeltaSharingOptions(Map("path" -> tablePath.toString)) + DeltaSharingCDFUtils.prepareCDCRelation( + SparkSession.active.sqlContext, + options, + dsTable, + client + ) + + val preSignedUrlCacheRef = PreSignedUrlCache.getEndpointRefInExecutor(SparkEnv.get) + val path = options.options.getOrElse( + "path", + throw DeltaSharingErrors.pathNotSpecifiedException + ) + val fetcher = new PreSignedUrlFetcher( + preSignedUrlCacheRef, + DeltaSharingUtils.getTablePathWithIdSuffix( + path, + DeltaSharingUtils.getQueryParamsHashId(options.cdfOptions) + ), + fileStr1Id, + 1000 + ) + // sleep for expirationTimeMs to ensure that the urls are refreshed. + Thread.sleep(expirationTimeMs) + + // Verify that the url is refreshed as paths(1), not paths(0) anymore. + assert(fetcher.getUrl == paths(1)) + } + + withSQLConf( + "spark.delta.sharing.client.class" -> classOf[TestDeltaSharingClientForCDFUtils].getName, + "fs.delta-sharing-log.impl" -> classOf[DeltaSharingLogFileSystem].getName, + "spark.delta.sharing.profile.provider.class" -> + "io.delta.sharing.client.DeltaSharingFileProfileProvider", + SparkConfForReturnExpTime -> "true" + ) { + test() + } + + withSQLConf( + "spark.delta.sharing.client.class" -> classOf[TestDeltaSharingClientForCDFUtils].getName, + "fs.delta-sharing-log.impl" -> classOf[DeltaSharingLogFileSystem].getName, + "spark.delta.sharing.profile.provider.class" -> + "io.delta.sharing.client.DeltaSharingFileProfileProvider", + SparkConfForReturnExpTime -> "false" + ) { + test() + } + } + } +} diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala new file mode 100644 index 00000000000..13d0fca3fcf --- /dev/null +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala @@ -0,0 +1,851 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sharing.spark + +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.delta.sharing.DeltaSharingTestSparkUtils +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types.{ + DateType, + IntegerType, + LongType, + StringType, + StructType, + TimestampType +} + +trait DeltaSharingDataSourceDeltaSuiteBase + extends QueryTest + with DeltaSQLCommandTest + with DeltaSharingTestSparkUtils + with DeltaSharingDataSourceDeltaTestUtils { + + /** + * metadata tests + */ + test("failed to getMetadata") { + withTempDir { tempDir => + val sharedTableName = "shared_table_broken_json" + + def test(tablePath: String, tableFullName: String): Unit = { + DeltaSharingUtils.overrideIteratorBlock[String]( + blockId = TestClientForDeltaFormatSharing.getBlockId(sharedTableName, "getMetadata"), + values = Seq("bad protocol string", "bad metadata string").toIterator + ) + DeltaSharingUtils.overrideSingleBlock[Long]( + blockId = TestClientForDeltaFormatSharing.getBlockId(sharedTableName, "getTableVersion"), + value = 1 + ) + // JsonParseException on "bad protocol string" + val exception = intercept[com.fasterxml.jackson.core.JsonParseException] { + spark.read.format("deltaSharing").option("responseFormat", "delta").load(tablePath).schema + } + assert(exception.getMessage.contains("Unrecognized token 'bad'")) + + // table_with_broken_protocol + // able to parse as a DeltaSharingSingleAction, but it's an addFile, not metadata. + DeltaSharingUtils.overrideIteratorBlock[String]( + blockId = TestClientForDeltaFormatSharing.getBlockId(sharedTableName, "getMetadata"), + // scalastyle:off line.size.limit + values = Seq( + """{"add": {"path":"random","id":"random","partitionValues":{},"size":1,"motificationTime":1,"dataChange":false}}""" + ).toIterator + ) + val exception2 = intercept[IllegalStateException] { + spark.read.format("deltaSharing").option("responseFormat", "delta").load(tablePath).schema + } + assert( + exception2.getMessage + .contains(s"Failed to get Protocol for $tableFullName") + ) + + // table_with_broken_metadata + // able to parse as a DeltaSharingSingleAction, but it's an addFile, not metadata. + DeltaSharingUtils.overrideIteratorBlock[String]( + blockId = TestClientForDeltaFormatSharing.getBlockId(sharedTableName, "getMetadata"), + values = Seq( + """{"protocol":{"minReaderVersion":1}}""" + ).toIterator + ) + val exception3 = intercept[IllegalStateException] { + spark.read.format("deltaSharing").option("responseFormat", "delta").load(tablePath).schema + } + assert( + exception3.getMessage + .contains(s"Failed to get Metadata for $tableFullName") + ) + } + + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + val tableFullName = s"share1.default.$sharedTableName" + test(s"${profileFile.getCanonicalPath}#$tableFullName", tableFullName) + } + } + } + + def assertLimit(tablePath: String, expectedLimit: Seq[Long]): Unit = { + assert(expectedLimit == + TestClientForDeltaFormatSharing.limits.filter(_._1.contains(tablePath)).map(_._2)) + } + + def assertRequestedFormat(tablePath: String, expectedFormat: Seq[String]): Unit = { + assert(expectedFormat == + TestClientForDeltaFormatSharing.requestedFormat.filter(_._1.contains(tablePath)).map(_._2)) + } + /** + * snapshot queries + */ + test("DeltaSharingDataSource able to read simple data") { + withTempDir { tempDir => + val deltaTableName = "delta_table_simple" + withTable(deltaTableName) { + createTable(deltaTableName) + sql( + s"INSERT INTO $deltaTableName" + + """ VALUES (1, "one", "2023-01-01", "2023-01-01 00:00:00"), + |(2, "two", "2023-02-02", "2023-02-02 00:00:00")""".stripMargin + ) + + val expectedSchema: StructType = new StructType() + .add("c1", IntegerType) + .add("c2", StringType) + .add("c3", DateType) + .add("c4", TimestampType) + val expected = Seq( + Row(1, "one", sqlDate("2023-01-01"), sqlTimestamp("2023-01-01 00:00:00")), + Row(2, "two", sqlDate("2023-02-02"), sqlTimestamp("2023-02-02 00:00:00")) + ) + + Seq(true, false).foreach { enableLimitPushdown => + val sharedTableName = s"shared_table_simple_$enableLimitPushdown" + prepareMockedClientAndFileSystemResult(deltaTableName, sharedTableName) + prepareMockedClientGetTableVersion(deltaTableName, sharedTableName) + + def test(tablePath: String, tableName: String): Unit = { + assert( + expectedSchema == spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .load(tablePath) + .schema + ) + val df = + spark.read.format("deltaSharing").option("responseFormat", "delta").load(tablePath) + checkAnswer(df, expected) + assert(df.count() > 0) + assertLimit(tableName, Seq.empty[Long]) + val limitDf = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .load(tablePath) + .limit(1) + assert(limitDf.collect().size == 1) + assertLimit(tableName, Some(1L).filter(_ => enableLimitPushdown).toSeq) + } + + val limitPushdownConfig = Map( + "spark.delta.sharing.limitPushdown.enabled" -> enableLimitPushdown.toString + ) + withSQLConf((limitPushdownConfig ++ getDeltaSharingClassesSQLConf).toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + val tableName = s"share1.default.$sharedTableName" + test(s"${profileFile.getCanonicalPath}#$tableName", tableName) + } + } + } + } + } + + test("DeltaSharingDataSource able to auto resolve responseFormat") { + withTempDir { tempDir => + val deltaTableName = "delta_table_auto" + withTable(deltaTableName) { + createSimpleTable(deltaTableName, enableCdf = false) + sql( + s"""INSERT INTO $deltaTableName VALUES (1, "one"), (2, "one")""".stripMargin + ) + sql( + s"""INSERT INTO $deltaTableName VALUES (1, "two"), (2, "two")""".stripMargin + ) + + val expectedSchema: StructType = new StructType() + .add("c1", IntegerType) + .add("c2", StringType) + + def testAutoResolve(tablePath: String, tableName: String, expectedFormat: String): Unit = { + assert( + expectedSchema == spark.read + .format("deltaSharing") + .load(tablePath) + .schema + ) + + val deltaDf = spark.read.format("delta").table(deltaTableName) + val sharingDf = spark.read.format("deltaSharing").load(tablePath) + checkAnswer(deltaDf, sharingDf) + assert(sharingDf.count() > 0) + assertLimit(tableName, Seq.empty[Long]) + assertRequestedFormat(tableName, Seq(expectedFormat)) + + val limitDf = spark.read + .format("deltaSharing") + .load(tablePath) + .limit(1) + assert(limitDf.collect().size == 1) + assertLimit(tableName, Seq(1L)) + + val deltaDfV1 = spark.read.format("delta").option("versionAsOf", 1).table(deltaTableName) + val sharingDfV1 = + spark.read.format("deltaSharing").option("versionAsOf", 1).load(tablePath) + checkAnswer(deltaDfV1, sharingDfV1) + assert(sharingDfV1.count() > 0) + assertRequestedFormat(tableName, Seq(expectedFormat)) + } + + // Test for delta format response + val sharedDeltaTable = "shared_delta_table" + prepareMockedClientAndFileSystemResult(deltaTableName, sharedDeltaTable) + prepareMockedClientAndFileSystemResult( + deltaTableName, + sharedDeltaTable, + versionAsOf = Some(1) + ) + prepareMockedClientGetTableVersion(deltaTableName, sharedDeltaTable) + + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + testAutoResolve( + s"${profileFile.getCanonicalPath}#share1.default.$sharedDeltaTable", + s"share1.default.$sharedDeltaTable", + "delta" + ) + } + + // Test for parquet format response + val sharedParquetTable = "shared_parquet_table" + prepareMockedClientAndFileSystemResultForParquet( + deltaTableName, + sharedParquetTable + ) + prepareMockedClientAndFileSystemResultForParquet( + deltaTableName, + sharedParquetTable, + versionAsOf = Some(1) + ) + prepareMockedClientGetTableVersion(deltaTableName, sharedParquetTable) + + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + testAutoResolve( + s"${profileFile.getCanonicalPath}#share1.default.$sharedParquetTable", + s"share1.default.$sharedParquetTable", + "parquet" + ) + } + } + } + } + + test("DeltaSharingDataSource able to read data with filters and select") { + withTempDir { tempDir => + val deltaTableName = "delta_table_filters" + withTable(deltaTableName) { + createSimpleTable(deltaTableName, enableCdf = false) + sql(s"""INSERT INTO $deltaTableName VALUES (1, "first"), (2, "first")""") + sql(s"""INSERT INTO $deltaTableName VALUES (1, "second"), (2, "second")""") + sql(s"""INSERT INTO $deltaTableName VALUES (1, "third"), (2, "third")""") + + val sharedTableName = "shared_table_filters" + prepareMockedClientAndFileSystemResult(deltaTableName, sharedTableName) + prepareMockedClientGetTableVersion(deltaTableName, sharedTableName) + + // The files returned from delta sharing client are the same for these queries. + // This is to test the filters are passed correctly to TahoeLogFileIndex for the local delta + // log. + def testFiltersAndSelect(tablePath: String): Unit = { + var expected = Seq(Row(1, "first"), Row(1, "second"), Row(1, "third"), Row(2, "second")) + var df = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .load(tablePath) + .filter(col("c1") === 1 || col("c2") === "second") + checkAnswer(df, expected) + + expected = Seq(Row(1, "first"), Row(1, "second"), Row(1, "third")) + df = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .load(tablePath) + .filter(col("c1") === 1) + checkAnswer(df, expected) + + expected = Seq(Row(1, "second"), Row(2, "second")) + df = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .load(tablePath) + .filter(col("c2") === "second") + checkAnswer(df, expected) + + // with select as well + expected = Seq(Row(1), Row(1), Row(1), Row(2), Row(2), Row(2)) + df = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .load(tablePath) + .select("c1") + checkAnswer(df, expected) + + expected = Seq( + Row("first"), + Row("first"), + Row("second"), + Row("second"), + Row("third"), + Row("third") + ) + df = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .load(tablePath) + .select("c2") + checkAnswer(df, expected) + + expected = Seq(Row(1), Row(2)) + df = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .load(tablePath) + .filter(col("c2") === "second") + .select("c1") + checkAnswer(df, expected) + } + + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + testFiltersAndSelect(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableName") + } + } + } + } + + test("DeltaSharingDataSource able to read data for time travel queries") { + withTempDir { tempDir => + val deltaTableName = "delta_table_time_travel" + withTable(deltaTableName) { + createTable(deltaTableName) + + sql( + s"INSERT INTO $deltaTableName" + + """ VALUES (1, "one", "2023-01-01", "2023-01-01 00:00:00")""".stripMargin + ) + sql( + s"INSERT INTO $deltaTableName" + + """ VALUES (2, "two", "2023-02-02", "2023-02-02 00:00:00")""".stripMargin + ) + sql( + s"INSERT INTO $deltaTableName" + + """ VALUES (3, "three", "2023-03-03", "2023-03-03 00:00:00")""".stripMargin + ) + + val sharedTableNameV1 = "shared_table_v1" + prepareMockedClientAndFileSystemResult( + deltaTable = deltaTableName, + sharedTable = sharedTableNameV1, + versionAsOf = Some(1L) + ) + + def testVersionAsOf1(tablePath: String): Unit = { + val dfV1 = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("versionAsOf", 1) + .load(tablePath) + val expectedV1 = Seq( + Row(1, "one", sqlDate("2023-01-01"), sqlTimestamp("2023-01-01 00:00:00")) + ) + checkAnswer(dfV1, expectedV1) + } + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + testVersionAsOf1(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableNameV1") + } + + // using different table name because spark caches the content read from a file, i.e., + // the delta log from 0.json. + // TODO: figure out how to get a per query id and use it in getCustomTablePath to + // differentiate the same table used in different queries. + // TODO: Also check if it's possible to disable the file cache. + val sharedTableNameV3 = "shared_table_v3" + prepareMockedClientAndFileSystemResult( + deltaTable = deltaTableName, + sharedTable = sharedTableNameV3, + versionAsOf = Some(3L) + ) + + def testVersionAsOf3(tablePath: String): Unit = { + val dfV3 = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("versionAsOf", 3) + .load(tablePath) + val expectedV3 = Seq( + Row(1, "one", sqlDate("2023-01-01"), sqlTimestamp("2023-01-01 00:00:00")), + Row(2, "two", sqlDate("2023-02-02"), sqlTimestamp("2023-02-02 00:00:00")), + Row(3, "three", sqlDate("2023-03-03"), sqlTimestamp("2023-03-03 00:00:00")) + ) + checkAnswer(dfV3, expectedV3) + } + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + testVersionAsOf3(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableNameV3") + } + + val sharedTableNameTs = "shared_table_ts" + // Given the result of delta sharing rpc is mocked, the actual value of the timestampStr + // can be any thing that's valid for DeltaSharingOptions, and formattedTimestamp is the + // parsed result and will be sent in the delta sharing rpc. + val timestampStr = "2023-01-01 00:00:00" + val formattedTimestamp = "2023-01-01T08:00:00Z" + + prepareMockedClientGetTableVersion(deltaTableName, sharedTableNameTs) + prepareMockedClientAndFileSystemResult( + deltaTable = deltaTableName, + sharedTable = sharedTableNameTs, + versionAsOf = None, + timestampAsOf = Some(formattedTimestamp) + ) + + def testTimestampQuery(tablePath: String): Unit = { + val dfTs = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("timestampAsOf", timestampStr) + .load(tablePath) + val expectedTs = Seq( + Row(1, "one", sqlDate("2023-01-01"), sqlTimestamp("2023-01-01 00:00:00")), + Row(2, "two", sqlDate("2023-02-02"), sqlTimestamp("2023-02-02 00:00:00")), + Row(3, "three", sqlDate("2023-03-03"), sqlTimestamp("2023-03-03 00:00:00")) + ) + checkAnswer(dfTs, expectedTs) + } + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + testTimestampQuery(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableNameTs") + } + } + } + } + + test("DeltaSharingDataSource able to read data with more entries") { + withTempDir { tempDir => + val deltaTableName = "delta_table_more" + withTable(deltaTableName) { + createSimpleTable(deltaTableName, enableCdf = false) + // The table operations take about 6~10 seconds. + for (i <- 0 to 9) { + val iteration = s"iteration $i" + val valuesBuilder = Seq.newBuilder[String] + for (j <- 0 to 49) { + valuesBuilder += s"""(${i * 10 + j}, "$iteration")""" + } + sql(s"INSERT INTO $deltaTableName VALUES ${valuesBuilder.result().mkString(",")}") + } + + val sharedTableName = "shared_table_more" + prepareMockedClientAndFileSystemResult(deltaTableName, sharedTableName) + prepareMockedClientGetTableVersion(deltaTableName, sharedTableName) + + val expectedSchema: StructType = new StructType() + .add("c1", IntegerType) + .add("c2", StringType) + val expected = spark.read.format("delta").table(deltaTableName) + + def test(tablePath: String): Unit = { + assert( + expectedSchema == spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .load(tablePath) + .schema + ) + val df = + spark.read.format("deltaSharing").option("responseFormat", "delta").load(tablePath) + checkAnswer(df, expected) + } + + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + test(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableName") + } + } + } + } + + test("DeltaSharingDataSource able to read data with join on the same table") { + withTempDir { tempDir => + val deltaTableName = "delta_table_join" + withTable(deltaTableName) { + createSimpleTable(deltaTableName, enableCdf = false) + sql(s"""INSERT INTO $deltaTableName VALUES (1, "first"), (2, "first")""") + sql(s"""INSERT INTO $deltaTableName VALUES (1, "second"), (2, "second")""") + sql(s"""INSERT INTO $deltaTableName VALUES (1, "third"), (2, "third")""") + + val sharedTableName = "shared_table_join" + prepareMockedClientAndFileSystemResult(deltaTableName, sharedTableName) + prepareMockedClientGetTableVersion(deltaTableName, sharedTableName) + prepareMockedClientAndFileSystemResult( + deltaTableName, + sharedTableName, + versionAsOf = Some(1L) + ) + + def testJoin(tablePath: String): Unit = { + // Query the same latest version + val deltaDfLatest = spark.read.format("delta").table(deltaTableName) + val deltaDfV1 = spark.read.format("delta").option("versionAsOf", 1).table(deltaTableName) + val sharingDfLatest = + spark.read.format("deltaSharing").option("responseFormat", "delta").load(tablePath) + val sharingDfV1 = + spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("versionAsOf", 1) + .load(tablePath) + + var deltaDfJoined = deltaDfLatest.join(deltaDfLatest, "c1") + var sharingDfJoined = sharingDfLatest.join(sharingDfLatest, "c1") + // CheckAnswer ensures that delta sharing produces the same result as delta. + // The check on the size is used to double check that a valid dataframe is generated. + checkAnswer(deltaDfJoined, sharingDfJoined) + assert(sharingDfJoined.count() > 0) + + // Query the same versionAsOf + deltaDfJoined = deltaDfV1.join(deltaDfV1, "c1") + sharingDfJoined = sharingDfV1.join(sharingDfV1, "c1") + checkAnswer(deltaDfJoined, sharingDfJoined) + assert(sharingDfJoined.count() > 0) + + // Query with different versions + deltaDfJoined = deltaDfLatest.join(deltaDfV1, "c1") + sharingDfJoined = sharingDfLatest.join(sharingDfV1, "c1") + checkAnswer(deltaDfJoined, sharingDfJoined) + // Size is 6 because for each of the 6 rows in latest, there is 1 row with the same c1 + // value in v1. + assert(sharingDfJoined.count() > 0) + } + + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + testJoin(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableName") + } + } + } + } + + test("DeltaSharingDataSource able to read empty data") { + withTempDir { tempDir => + val deltaTableName = "delta_table_empty" + withTable(deltaTableName) { + createSimpleTable(deltaTableName, enableCdf = true) + sql(s"""INSERT INTO $deltaTableName VALUES (1, "first"), (2, "first")""") + sql(s"""INSERT INTO $deltaTableName VALUES (1, "second"), (2, "second")""") + sql(s"DELETE FROM $deltaTableName WHERE c1 <= 2") + // This command is just to create an empty table version at version 4. + spark.sql(s"ALTER TABLE $deltaTableName SET TBLPROPERTIES('delta.minReaderVersion' = 1)") + + val sharedTableName = "shared_table_empty" + prepareMockedClientAndFileSystemResult(deltaTableName, sharedTableName) + prepareMockedClientGetTableVersion(deltaTableName, sharedTableName) + + def testEmpty(tablePath: String): Unit = { + // linzhou + val deltaDf = spark.read.format("delta").table(deltaTableName) + val sharingDf = + spark.read.format("deltaSharing").option("responseFormat", "delta").load(tablePath) + checkAnswer(deltaDf, sharingDf) + assert(sharingDf.count() == 0) + + val deltaCdfDf = spark.read + .format("delta") + .option("readChangeFeed", "true") + .option("startingVersion", 4) + .table(deltaTableName) + val sharingCdfDf = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("readChangeFeed", "true") + .option("startingVersion", 4) + .load(tablePath) + checkAnswer(deltaCdfDf, sharingCdfDf) + assert(sharingCdfDf.count() == 0) + } + + // There's only metadata change but not actual files in version 4. + prepareMockedClientAndFileSystemResultForCdf( + deltaTableName, + sharedTableName, + startingVersion = 4 + ) + + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + testEmpty(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableName") + } + } + } + } + + /** + * cdf queries + */ + test("DeltaSharingDataSource able to read data for simple cdf query") { + withTempDir { tempDir => + val deltaTableName = "delta_table_cdf" + withTable(deltaTableName) { + sql(s""" + |CREATE TABLE $deltaTableName (c1 INT, c2 STRING) USING DELTA PARTITIONED BY (c2) + |TBLPROPERTIES (delta.enableChangeDataFeed = true) + |""".stripMargin) + // 2 inserts in version 1, 1 with c1=2 + sql(s"""INSERT INTO $deltaTableName VALUES (1, "one"), (2, "two")""") + // 1 insert in version 2, 0 with c1=2 + sql(s"""INSERT INTO $deltaTableName VALUES (3, "two")""") + // 0 operations in version 3 + sql(s"""OPTIMIZE $deltaTableName""") + // 2 updates in version 4, 2 with c1=2 + sql(s"""UPDATE $deltaTableName SET c2="new two" where c1=2""") + // 1 delete in version 5, 1 with c1=2 + sql(s"""DELETE FROM $deltaTableName WHERE c1 = 2""") + + val sharedTableName = "shard_table_cdf" + prepareMockedClientGetTableVersion(deltaTableName, sharedTableName) + + Seq(0, 1, 2, 3, 4, 5).foreach { startingVersion => + val ts = getTimeStampForVersion(deltaTableName, startingVersion) + val startingTimestamp = DateTimeUtils.toJavaTimestamp(ts * 1000).toInstant.toString + prepareMockedClientAndFileSystemResultForCdf( + deltaTableName, + sharedTableName, + startingVersion, + Some(startingTimestamp) + ) + + def test(tablePath: String): Unit = { + val expectedSchema: StructType = new StructType() + .add("c1", IntegerType) + .add("c2", StringType) + .add("_change_type", StringType) + .add("_commit_version", LongType) + .add("_commit_timestamp", TimestampType) + val schema = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("readChangeFeed", "true") + .option("startingVersion", startingVersion) + .load(tablePath) + .schema + assert(expectedSchema == schema) + + val expected = spark.read + .format("delta") + .option("readChangeFeed", "true") + .option("startingVersion", startingVersion) + .table(deltaTableName) + val df = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("readChangeFeed", "true") + .option("startingVersion", startingVersion) + .load(tablePath) + checkAnswer(df, expected) + assert(df.count() > 0) + } + + def testFiltersAndSelect(tablePath: String): Unit = { + val expectedSchema: StructType = new StructType() + .add("c2", StringType) + .add("_change_type", StringType) + .add("_commit_version", LongType) + val schema = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("readChangeFeed", "true") + .option("startingVersion", startingVersion) + .load(tablePath) + .select("c2", "_change_type", "_commit_version") + .schema + assert(expectedSchema == schema) + + val expected = spark.read + .format("delta") + .option("readChangeFeed", "true") + .option("startingVersion", startingVersion) + .table(deltaTableName) + .select("c2", "_change_type", "_commit_version") + val dfVersion = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("readChangeFeed", "true") + .option("startingVersion", startingVersion) + .load(tablePath) + .select("c2", "_change_type", "_commit_version") + checkAnswer(dfVersion, expected) + val dfTime = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("readChangeFeed", "true") + .option("startingTimestamp", startingTimestamp) + .load(tablePath) + .select("c2", "_change_type", "_commit_version") + checkAnswer(dfTime, expected) + assert(dfTime.count() > 0) + + val expectedFiltered = spark.read + .format("delta") + .option("readChangeFeed", "true") + .option("startingVersion", startingVersion) + .table(deltaTableName) + .select("c2", "_change_type", "_commit_version") + .filter(col("c1") === 2) + val dfFiltered = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("readChangeFeed", "true") + .option("startingVersion", startingVersion) + .load(tablePath) + .select("c2", "_change_type", "_commit_version") + .filter(col("c1") === 2) + checkAnswer(dfFiltered, expectedFiltered) + assert(dfFiltered.count() > 0) + } + + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + test(profileFile.getCanonicalPath + s"#share1.default.$sharedTableName") + testFiltersAndSelect( + profileFile.getCanonicalPath + s"#share1.default.$sharedTableName" + ) + } + } + + // test join on the same table in cdf query + def testJoin(tablePath: String): Unit = { + val deltaV0 = spark.read + .format("delta") + .option("readChangeFeed", "true") + .option("startingVersion", 0) + .table(deltaTableName) + val deltaV3 = spark.read + .format("delta") + .option("readChangeFeed", "true") + .option("startingVersion", 3) + .table(deltaTableName) + val sharingV0 = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("readChangeFeed", "true") + .option("startingVersion", 0) + .load(tablePath) + val sharingV3 = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("readChangeFeed", "true") + .option("startingVersion", 3) + .load(tablePath) + + def testJoinedDf( + deltaLeft: DataFrame, + deltaRight: DataFrame, + sharingLeft: DataFrame, + sharingRight: DataFrame, + expectedSize: Int): Unit = { + val deltaJoined = deltaLeft.join(deltaRight, usingColumns = Seq("c1", "c2")) + val sharingJoined = sharingLeft.join(sharingRight, usingColumns = Seq("c1", "c2")) + checkAnswer(deltaJoined, sharingJoined) + assert(sharingJoined.count() > 0) + } + testJoinedDf(deltaV0, deltaV0, sharingV0, sharingV0, 10) + testJoinedDf(deltaV3, deltaV3, sharingV3, sharingV3, 5) + testJoinedDf(deltaV0, deltaV3, sharingV0, sharingV3, 6) + } + + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + testJoin(profileFile.getCanonicalPath + s"#share1.default.$sharedTableName") + } + } + } + } + + test("DeltaSharingDataSource able to read data for cdf query with more entries") { + withTempDir { tempDir => + val deltaTableName = "delta_table_cdf_more" + withTable(deltaTableName) { + createSimpleTable(deltaTableName, enableCdf = true) + // The table operations take about 20~30 seconds. + for (i <- 0 to 9) { + val iteration = s"iteration $i" + val valuesBuilder = Seq.newBuilder[String] + for (j <- 0 to 49) { + valuesBuilder += s"""(${i * 10 + j}, "$iteration")""" + } + sql(s"INSERT INTO $deltaTableName VALUES ${valuesBuilder.result().mkString(",")}") + sql(s"""UPDATE $deltaTableName SET c1 = c1 + 100 where c2 = "${iteration}"""") + sql(s"""DELETE FROM $deltaTableName where c2 = "${iteration}"""") + } + + val sharedTableName = "shard_table_cdf_more" + prepareMockedClientGetTableVersion(deltaTableName, sharedTableName) + Seq(0, 10, 20, 30).foreach { startingVersion => + prepareMockedClientAndFileSystemResultForCdf( + deltaTableName, + sharedTableName, + startingVersion + ) + + val expected = spark.read + .format("delta") + .option("readChangeFeed", "true") + .option("startingVersion", startingVersion) + .table(deltaTableName) + + def test(tablePath: String): Unit = { + val df = spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .option("readChangeFeed", "true") + .option("startingVersion", startingVersion) + .load(tablePath) + checkAnswer(df, expected) + assert(df.count() > 0) + } + + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + test(profileFile.getCanonicalPath + s"#share1.default.$sharedTableName") + } + } + } + } + } +} + +class DeltaSharingDataSourceDeltaSuite extends DeltaSharingDataSourceDeltaSuiteBase {} diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaTestUtils.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaTestUtils.scala new file mode 100644 index 00000000000..56a1c180943 --- /dev/null +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaTestUtils.scala @@ -0,0 +1,654 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sharing.spark + +import java.io.File +import java.nio.charset.StandardCharsets.UTF_8 + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.delta.{DeltaLog, Snapshot} +import org.apache.spark.sql.delta.actions.{ + Action, + AddCDCFile, + AddFile, + DeletionVectorDescriptor, + Metadata, + RemoveFile +} +import org.apache.spark.sql.delta.deletionvectors.{ + RoaringBitmapArray, + RoaringBitmapArrayFormat +} +import org.apache.spark.sql.delta.util.{FileNames, JsonUtils} +import com.google.common.hash.Hashing +import io.delta.sharing.client.model.{ + AddFile => ClientAddFile, + Metadata => ClientMetadata, + Protocol => ClientProtocol +} +import io.delta.sharing.spark.model.{ + DeltaSharingFileAction, + DeltaSharingMetadata, + DeltaSharingProtocol +} +import org.apache.commons.io.FileUtils +import org.apache.hadoop.fs.Path + +import org.apache.spark.SparkConf +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.test.SharedSparkSession + +trait DeltaSharingDataSourceDeltaTestUtils extends SharedSparkSession { + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.delta.sharing.preSignedUrl.expirationMs", "15000") + .set("spark.delta.sharing.driver.refreshCheckIntervalMs", "5000") + .set("spark.delta.sharing.driver.refreshThresholdMs", "10000") + .set("spark.delta.sharing.driver.accessThresholdToExpireMs", "120000") + } + + private[spark] def removePartitionPrefix(filePath: String): String = { + filePath.split("/").last + } + + private def getResponseDVAndId( + sharedTable: String, + deletionVector: DeletionVectorDescriptor): (DeletionVectorDescriptor, String) = { + if (deletionVector != null) { + if (deletionVector.storageType == DeletionVectorDescriptor.INLINE_DV_MARKER) { + (deletionVector, Hashing.sha256().hashString(deletionVector.uniqueId, UTF_8).toString) + } else { + val dvPath = deletionVector.absolutePath(new Path("not-used")) + ( + deletionVector.copy( + pathOrInlineDv = TestDeltaSharingFileSystem.encode(sharedTable, dvPath.getName), + storageType = DeletionVectorDescriptor.PATH_DV_MARKER + ), + Hashing.sha256().hashString(deletionVector.uniqueId, UTF_8).toString + ) + } + } else { + (null, null) + } + } + + private def isDataFile(filePath: String): Boolean = { + filePath.endsWith(".parquet") || filePath.endsWith(".bin") + } + + // Convert from delta AddFile to DeltaSharingFileAction to serialize to json. + private def getDeltaSharingFileActionForAddFile( + addFile: AddFile, + sharedTable: String, + version: Long, + timestamp: Long): DeltaSharingFileAction = { + val parquetFile = removePartitionPrefix(addFile.path) + + val (responseDV, dvFileId) = getResponseDVAndId(sharedTable, addFile.deletionVector) + + DeltaSharingFileAction( + id = Hashing.sha256().hashString(parquetFile, UTF_8).toString, + version = version, + timestamp = timestamp, + deletionVectorFileId = dvFileId, + deltaSingleAction = addFile + .copy( + path = TestDeltaSharingFileSystem.encode(sharedTable, parquetFile), + deletionVector = responseDV + ) + .wrap + ) + } + + // Convert from delta RemoveFile to DeltaSharingFileAction to serialize to json. + // scalastyle:off removeFile + private def getDeltaSharingFileActionForRemoveFile( + removeFile: RemoveFile, + sharedTable: String, + version: Long, + timestamp: Long): DeltaSharingFileAction = { + val parquetFile = removePartitionPrefix(removeFile.path) + + val (responseDV, dvFileId) = getResponseDVAndId(sharedTable, removeFile.deletionVector) + + DeltaSharingFileAction( + id = Hashing.sha256().hashString(parquetFile, UTF_8).toString, + version = version, + timestamp = timestamp, + deletionVectorFileId = dvFileId, + deltaSingleAction = removeFile + .copy( + path = TestDeltaSharingFileSystem.encode(sharedTable, parquetFile), + deletionVector = responseDV + ) + .wrap + ) + // scalastyle:on removeFile + } + + // Reset the result for client.GetTableVersion for the sharedTable based on the latest table + // version of the deltaTable, use BlockManager to store the result. + private[spark] def prepareMockedClientGetTableVersion( + deltaTable: String, + sharedTable: String): Unit = { + val snapshotToUse = getSnapshotToUse(deltaTable, None) + DeltaSharingUtils.overrideSingleBlock[Long]( + blockId = TestClientForDeltaFormatSharing.getBlockId(sharedTable, "getTableVersion"), + value = snapshotToUse.version + ) + } + + def getTimeStampForVersion(deltaTable: String, version: Long): Long = { + val snapshotToUse = getSnapshotToUse(deltaTable, None) + FileUtils + .listFiles(new File(snapshotToUse.deltaLog.logPath.toUri()), null, true) + .asScala + .foreach { f => + if (FileNames.isDeltaFile(new Path(f.getName))) { + if (FileNames.getFileVersion(new Path(f.getName)) == version) { + return f.lastModified + } + } + } + 0 + } + + // Prepare the result(Protocol and Metadata) for client.GetMetadata for the sharedTable based on + // the latest table info of the deltaTable, store them in BlockManager. + private[spark] def prepareMockedClientMetadata(deltaTable: String, sharedTable: String): Unit = { + val snapshotToUse = getSnapshotToUse(deltaTable, None) + val dsProtocol: DeltaSharingProtocol = DeltaSharingProtocol(snapshotToUse.protocol) + val dsMetadata: DeltaSharingMetadata = DeltaSharingMetadata( + deltaMetadata = snapshotToUse.metadata + ) + + // Put the metadata in blockManager for DeltaSharingClient to return for getMetadata. + DeltaSharingUtils.overrideIteratorBlock[String]( + blockId = TestClientForDeltaFormatSharing.getBlockId(sharedTable, "getMetadata"), + values = Seq(dsProtocol.json, dsMetadata.json).toIterator + ) + } + + private def updateAddFileWithInlineDV( + addFile: AddFile, + inlineDvFormat: RoaringBitmapArrayFormat.Value, + bitmap: RoaringBitmapArray): AddFile = { + val dv = DeletionVectorDescriptor.inlineInLog( + bitmap.serializeAsByteArray(inlineDvFormat), + bitmap.cardinality + ) + addFile + .removeRows( + deletionVector = dv, + updateStats = true + ) + ._1 + } + + private def updateDvPathToCount( + addFile: AddFile, + pathToCount: scala.collection.mutable.Map[String, Int]): Unit = { + if (addFile.deletionVector != null && + addFile.deletionVector.storageType != DeletionVectorDescriptor.INLINE_DV_MARKER) { + val dvPath = addFile.deletionVector.pathOrInlineDv + pathToCount.put(dvPath, pathToCount.getOrElse(dvPath, 0) + 1) + } + } + + // Sort by id in decreasing order. + private def deltaSharingFileActionDecreaseOrderFunc( + f1: model.DeltaSharingFileAction, + f2: model.DeltaSharingFileAction): Boolean = { + f1.id > f2.id + } + + // Sort by id in increasing order. + private def deltaSharingFileActionIncreaseOrderFunc( + f1: model.DeltaSharingFileAction, + f2: model.DeltaSharingFileAction): Boolean = { + f1.id < f2.id + } + + private def getSnapshotToUse(deltaTable: String, versionAsOf: Option[Long]): Snapshot = { + val deltaLog = DeltaLog.forTable(spark, new TableIdentifier(deltaTable)) + if (versionAsOf.isDefined) { + deltaLog.getSnapshotAt(versionAsOf.get) + } else { + deltaLog.update() + } + } + + // This function does 2 jobs: + // 1. Prepare the result for functions of delta sharing rest client, i.e., (Protocol, Metadata) + // for getMetadata, (Protocol, Metadata, and list of lines from delta actions) for getFiles, use + // BlockManager to store the data to make them available across different classes. All the lines + // are for responseFormat=parquet. + // 2. Put the parquet file in blockManager for DeltaSharingFileSystem to load bytes out of it. + private[spark] def prepareMockedClientAndFileSystemResultForParquet( + deltaTable: String, + sharedTable: String, + versionAsOf: Option[Long] = None): Unit = { + val lines = Seq.newBuilder[String] + var totalSize = 0L + val clientAddFilesArrayBuffer = ArrayBuffer[ClientAddFile]() + + // To prepare faked delta sharing responses with needed files for DeltaSharingClient. + val snapshotToUse = getSnapshotToUse(deltaTable, versionAsOf) + + snapshotToUse.allFiles.collect().foreach { addFile => + val parquetFile = removePartitionPrefix(addFile.path) + val clientAddFile = ClientAddFile( + url = TestDeltaSharingFileSystem.encode(sharedTable, parquetFile), + id = Hashing.md5().hashString(parquetFile, UTF_8).toString, + partitionValues = addFile.partitionValues, + size = addFile.size, + stats = null, + version = snapshotToUse.version, + timestamp = snapshotToUse.timestamp + ) + totalSize = totalSize + addFile.size + clientAddFilesArrayBuffer += clientAddFile + } + + // Scan through the parquet files of the local delta table, and prepare the data of parquet file + // reading in DeltaSharingFileSystem. + val files = + FileUtils.listFiles(new File(snapshotToUse.deltaLog.dataPath.toUri()), null, true).asScala + files.foreach { f => + val filePath = f.getCanonicalPath + if (isDataFile(filePath)) { + // Put the parquet file in blockManager for DeltaSharingFileSystem to load bytes out of it. + DeltaSharingUtils.overrideIteratorBlock[Byte]( + blockId = TestDeltaSharingFileSystem.getBlockId(sharedTable, f.getName), + values = FileUtils.readFileToByteArray(f).toIterator + ) + } + } + + val clientProtocol = ClientProtocol(minReaderVersion = 1) + // This is specifically to set the size of the metadata. + val deltaMetadata = snapshotToUse.metadata + val clientMetadata = ClientMetadata( + id = deltaMetadata.id, + name = deltaMetadata.name, + description = deltaMetadata.description, + schemaString = deltaMetadata.schemaString, + configuration = deltaMetadata.configuration, + partitionColumns = deltaMetadata.partitionColumns, + size = totalSize + ) + lines += JsonUtils.toJson(clientProtocol.wrap) + lines += JsonUtils.toJson(clientMetadata.wrap) + clientAddFilesArrayBuffer.toSeq.foreach { clientAddFile => + lines += JsonUtils.toJson(clientAddFile.wrap) + } + + // Put the metadata in blockManager for DeltaSharingClient to return metadata when being asked. + DeltaSharingUtils.overrideIteratorBlock[String]( + blockId = TestClientForDeltaFormatSharing.getBlockId( + sharedTableName = sharedTable, + queryType = "getMetadata", + versionAsOf = versionAsOf + ), + values = Seq( + JsonUtils.toJson(clientProtocol.wrap), + JsonUtils.toJson(clientMetadata.wrap) + ).toIterator + ) + + // Put the delta log (list of actions) in blockManager for DeltaSharingClient to return as the + // http response when getFiles is called. + DeltaSharingUtils.overrideIteratorBlock[String]( + blockId = TestClientForDeltaFormatSharing.getBlockId( + sharedTableName = sharedTable, + queryType = "getFiles", + versionAsOf = versionAsOf + ), + values = lines.result().toIterator + ) + } + + // This function does 2 jobs: + // 1. Prepare the result for functions of delta sharing rest client, i.e., (Protocol, Metadata) + // for getMetadata, (Protocol, Metadata, and list of lines from delta actions) for getFiles, use + // BlockManager to store the data to make them available across different classes. + // 2. Put the parquet file in blockManager for DeltaSharingFileSystem to load bytes out of it. + private[spark] def prepareMockedClientAndFileSystemResult( + deltaTable: String, + sharedTable: String, + versionAsOf: Option[Long] = None, + timestampAsOf: Option[String] = None, + inlineDvFormat: Option[RoaringBitmapArrayFormat.Value] = None, + assertMultipleDvsInOneFile: Boolean = false, + reverseFileOrder: Boolean = false): Unit = { + val lines = Seq.newBuilder[String] + var totalSize = 0L + + // To prepare faked delta sharing responses with needed files for DeltaSharingClient. + val snapshotToUse = getSnapshotToUse(deltaTable, versionAsOf) + val fileActionsArrayBuffer = ArrayBuffer[model.DeltaSharingFileAction]() + val dvPathToCount = scala.collection.mutable.Map[String, Int]() + snapshotToUse.allFiles.collect().foreach { addFile => + if (assertMultipleDvsInOneFile) { + updateDvPathToCount(addFile, dvPathToCount) + } + + val updatedAdd = if (inlineDvFormat.isDefined) { + // Remove row 0 and 2 in the AddFile. + updateAddFileWithInlineDV(addFile, inlineDvFormat.get, RoaringBitmapArray(0L, 2L)) + } else { + addFile + } + + val dsAddFile = getDeltaSharingFileActionForAddFile( + updatedAdd, + sharedTable, + snapshotToUse.version, + snapshotToUse.timestamp + ) + totalSize = totalSize + addFile.size + fileActionsArrayBuffer += dsAddFile + } + val fileActionSeq = if (reverseFileOrder) { + fileActionsArrayBuffer.toSeq.sortWith(deltaSharingFileActionDecreaseOrderFunc) + } else { + fileActionsArrayBuffer.toSeq.sortWith(deltaSharingFileActionIncreaseOrderFunc) + } + var previousIdOpt: Option[String] = None + fileActionSeq.foreach { fileAction => + if (reverseFileOrder) { + assert( + // Using < instead of <= because there can be a removeFile and addFile pointing to the + // same parquet file which result in the same file id, since id is a hash of file path. + // This is ok because eventually it can read data out of the correct parquet file. + !previousIdOpt.exists(_ < fileAction.id), + s"fileActions must be in decreasing order by id: ${previousIdOpt} is not smaller than" + + s" ${fileAction.id}." + ) + previousIdOpt = Some(fileAction.id) + } + lines += fileAction.json + } + if (assertMultipleDvsInOneFile) { + assert(dvPathToCount.max._2 > 1) + } + + // Scan through the parquet files of the local delta table, and prepare the data of parquet file + // reading in DeltaSharingFileSystem. + val files = + FileUtils.listFiles(new File(snapshotToUse.deltaLog.dataPath.toUri()), null, true).asScala + files.foreach { f => + val filePath = f.getCanonicalPath + if (isDataFile(filePath)) { + // Put the parquet file in blockManager for DeltaSharingFileSystem to load bytes out of it. + DeltaSharingUtils.overrideIteratorBlock[Byte]( + blockId = TestDeltaSharingFileSystem.getBlockId(sharedTable, f.getName), + values = FileUtils.readFileToByteArray(f).toIterator + ) + } + } + + // This is specifically to set the size of the metadata. + val dsMetadata = DeltaSharingMetadata( + deltaMetadata = snapshotToUse.metadata, + size = totalSize + ) + val dsProtocol = DeltaSharingProtocol(deltaProtocol = snapshotToUse.protocol) + // Put the metadata in blockManager for DeltaSharingClient to return metadata when being asked. + DeltaSharingUtils.overrideIteratorBlock[String]( + blockId = TestClientForDeltaFormatSharing.getBlockId( + sharedTableName = sharedTable, + queryType = "getMetadata", + versionAsOf = versionAsOf, + timestampAsOf = timestampAsOf + ), + values = Seq(dsProtocol.json, dsMetadata.json).toIterator + ) + + lines += dsProtocol.json + lines += dsMetadata.json + // Put the delta log (list of actions) in blockManager for DeltaSharingClient to return as the + // http response when getFiles is called. + DeltaSharingUtils.overrideIteratorBlock[String]( + blockId = TestClientForDeltaFormatSharing.getBlockId( + sharedTableName = sharedTable, + queryType = "getFiles", + versionAsOf = versionAsOf, + timestampAsOf = timestampAsOf + ), + values = lines.result().toIterator + ) + } + + private[spark] def prepareMockedClientAndFileSystemResultForStreaming( + deltaTable: String, + sharedTable: String, + startingVersion: Long, + endingVersion: Long, + assertDVExists: Boolean = false): Unit = { + val actionLines = Seq.newBuilder[String] + + var maxVersion = -1L + var totalSize = 0L + + val deltaLog = DeltaLog.forTable(spark, new TableIdentifier(deltaTable)) + val startingSnapshot = deltaLog.getSnapshotAt(startingVersion) + actionLines += DeltaSharingProtocol(deltaProtocol = startingSnapshot.protocol).json + actionLines += DeltaSharingMetadata( + deltaMetadata = startingSnapshot.metadata, + version = startingVersion + ).json + + val logFiles = + FileUtils.listFiles(new File(deltaLog.logPath.toUri()), null, true).asScala + var dvExists = false + logFiles.foreach { f => + if (FileNames.isDeltaFile(new Path(f.getName))) { + val version = FileNames.getFileVersion(new Path(f.getName)) + if (version >= startingVersion && version <= endingVersion) { + // protocol/metadata are processed from startingSnapshot, only process versions greater + // than startingVersion for real actions and possible metadata changes. + maxVersion = maxVersion.max(version) + val timestamp = f.lastModified + + FileUtils.readLines(f).asScala.foreach { l => + val action = Action.fromJson(l) + action match { + case m: Metadata => + actionLines += DeltaSharingMetadata( + deltaMetadata = m, + version = version + ).json + case addFile: AddFile if addFile.dataChange => + // Convert from delta AddFile to DeltaSharingAddFile to serialize to json. + val dsAddFile = + getDeltaSharingFileActionForAddFile(addFile, sharedTable, version, timestamp) + dvExists = dvExists || (dsAddFile.deletionVectorFileId != null) + totalSize = totalSize + addFile.size + actionLines += dsAddFile.json + case removeFile: RemoveFile if removeFile.dataChange => + // scalastyle:off removeFile + val dsRemoveFile = getDeltaSharingFileActionForRemoveFile( + removeFile, + sharedTable, + version, + timestamp + ) + // scalastyle:on removeFile + dvExists = dvExists || (dsRemoveFile.deletionVectorFileId != null) + totalSize = totalSize + removeFile.size.getOrElse(0L) + actionLines += dsRemoveFile.json + case _ => // ignore all other actions such as CommitInfo. + } + } + } + } + } + val dataFiles = + FileUtils.listFiles(new File(deltaLog.dataPath.toUri()), null, true).asScala + dataFiles.foreach { f => + if (isDataFile(f.getCanonicalPath)) { + DeltaSharingUtils.overrideIteratorBlock[Byte]( + blockId = TestDeltaSharingFileSystem.getBlockId(sharedTable, f.getName), + values = FileUtils.readFileToByteArray(f).toIterator + ) + } + } + + if (assertDVExists) { + assert(dvExists, "There should be DV in the files returned from server.") + } + + DeltaSharingUtils.overrideIteratorBlock[String]( + blockId = TestClientForDeltaFormatSharing.getBlockId( + sharedTable, + s"getFiles_${startingVersion}_$endingVersion" + ), + values = actionLines.result().toIterator + ) + } + + private[spark] def prepareMockedClientAndFileSystemResultForCdf( + deltaTable: String, + sharedTable: String, + startingVersion: Long, + startingTimestamp: Option[String] = None, + inlineDvFormat: Option[RoaringBitmapArrayFormat.Value] = None, + assertMultipleDvsInOneFile: Boolean = false): Unit = { + val actionLines = Seq.newBuilder[String] + + var maxVersion = -1L + var totalSize = 0L + + val deltaLog = DeltaLog.forTable(spark, new TableIdentifier(deltaTable)) + val startingSnapshot = deltaLog.getSnapshotAt(startingVersion) + actionLines += DeltaSharingProtocol(deltaProtocol = startingSnapshot.protocol).json + actionLines += DeltaSharingMetadata( + deltaMetadata = startingSnapshot.metadata, + version = startingVersion + ).json + + val dvPathToCount = scala.collection.mutable.Map[String, Int]() + val files = + FileUtils.listFiles(new File(deltaLog.logPath.toUri()), null, true).asScala + files.foreach { f => + if (FileNames.isDeltaFile(new Path(f.getName))) { + val version = FileNames.getFileVersion(new Path(f.getName)) + if (version >= startingVersion) { + // protocol/metadata are processed from startingSnapshot, only process versions greater + // than startingVersion for real actions and possible metadata changes. + maxVersion = maxVersion.max(version) + val timestamp = f.lastModified + FileUtils.readLines(f).asScala.foreach { l => + val action = Action.fromJson(l) + action match { + case m: Metadata => + actionLines += DeltaSharingMetadata( + deltaMetadata = m, + version = version + ).json + case addFile: AddFile if addFile.dataChange => + if (assertMultipleDvsInOneFile) { + updateDvPathToCount(addFile, dvPathToCount) + } + val updatedAdd = if (inlineDvFormat.isDefined) { + // Remove row 0 and 1 in the AddFile. + updateAddFileWithInlineDV(addFile, inlineDvFormat.get, RoaringBitmapArray(0L, 1L)) + } else { + addFile + } + val dsAddFile = + getDeltaSharingFileActionForAddFile(updatedAdd, sharedTable, version, timestamp) + totalSize = totalSize + updatedAdd.size + actionLines += dsAddFile.json + case removeFile: RemoveFile if removeFile.dataChange => + // scalastyle:off removeFile + val dsRemoveFile = getDeltaSharingFileActionForRemoveFile( + removeFile, + sharedTable, + version, + timestamp + ) + // scalastyle:on removeFile + totalSize = totalSize + removeFile.size.getOrElse(0L) + actionLines += dsRemoveFile.json + case cdcFile: AddCDCFile => + val parquetFile = removePartitionPrefix(cdcFile.path) + + // Convert from delta AddCDCFile to DeltaSharingFileAction to serialize to json. + val dsCDCFile = DeltaSharingFileAction( + id = Hashing.sha256().hashString(parquetFile, UTF_8).toString, + version = version, + timestamp = timestamp, + deltaSingleAction = cdcFile + .copy( + path = TestDeltaSharingFileSystem.encode(sharedTable, parquetFile) + ) + .wrap + ) + totalSize = totalSize + cdcFile.size + actionLines += dsCDCFile.json + case _ => // ignore other lines + } + } + } + } + } + val dataFiles = + FileUtils.listFiles(new File(deltaLog.dataPath.toUri()), null, true).asScala + dataFiles.foreach { f => + val filePath = f.getCanonicalPath + if (isDataFile(filePath)) { + DeltaSharingUtils.overrideIteratorBlock[Byte]( + blockId = TestDeltaSharingFileSystem.getBlockId(sharedTable, f.getName), + values = FileUtils.readFileToByteArray(f).toIterator + ) + } + } + + if (assertMultipleDvsInOneFile) { + assert(dvPathToCount.max._2 > 1) + } + + DeltaSharingUtils.overrideIteratorBlock[String]( + blockId = + TestClientForDeltaFormatSharing.getBlockId(sharedTable, s"getCDFFiles_$startingVersion"), + values = actionLines.result().toIterator + ) + if (startingTimestamp.isDefined) { + DeltaSharingUtils.overrideIteratorBlock[String]( + blockId = TestClientForDeltaFormatSharing.getBlockId( + sharedTable, + s"getCDFFiles_${startingTimestamp.get}" + ), + values = actionLines.result().toIterator + ) + } + } + protected def getDeltaSharingClassesSQLConf: Map[String, String] = { + Map( + "fs.delta-sharing.impl" -> classOf[TestDeltaSharingFileSystem].getName, + "spark.delta.sharing.client.class" -> + classOf[TestClientForDeltaFormatSharing].getName, + "spark.delta.sharing.profile.provider.class" -> + "io.delta.sharing.client.DeltaSharingFileProfileProvider" + ) + } +} diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingFileIndexSuite.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingFileIndexSuite.scala index 9c1b4036fac..748ed8e4d63 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingFileIndexSuite.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingFileIndexSuite.scala @@ -30,7 +30,7 @@ import io.delta.sharing.client.util.JsonUtils import org.apache.commons.io.FileUtils import org.apache.hadoop.fs.Path -import org.apache.spark.{SparkConf, SparkEnv} +import org.apache.spark.SparkEnv import org.apache.spark.delta.sharing.{PreSignedUrlCache, PreSignedUrlFetcher} import org.apache.spark.sql.{QueryTest, SparkSession} import org.apache.spark.sql.catalyst.expressions.{ @@ -39,7 +39,6 @@ import org.apache.spark.sql.catalyst.expressions.{ Literal => SqlLiteral } import org.apache.spark.sql.delta.sharing.DeltaSharingTestSparkUtils -import org.apache.spark.sql.test.{SharedSparkSession} import org.apache.spark.sql.types.{FloatType, IntegerType} private object TestUtils { @@ -190,7 +189,7 @@ class TestDeltaSharingClientForFileIndex( class DeltaSharingFileIndexSuite extends QueryTest with DeltaSQLCommandTest - with SharedSparkSession + with DeltaSharingDataSourceDeltaTestUtils with DeltaSharingTestSparkUtils { import TestUtils._ @@ -292,14 +291,6 @@ class DeltaSharingFileIndexSuite } } - override protected def sparkConf: SparkConf = { - super.sparkConf - .set("spark.delta.sharing.preSignedUrl.expirationMs", defaultUrlExpirationMs.toString) - .set("spark.delta.sharing.driver.refreshCheckIntervalMs", "1000") - .set("spark.delta.sharing.driver.refreshThresholdMs", "2000") - .set("spark.delta.sharing.driver.accessThresholdToExpireMs", "60000") - } - test("refresh works") { PreSignedUrlCache.registerIfNeeded(SparkEnv.get) @@ -333,7 +324,7 @@ class DeltaSharingFileIndexSuite 1000 ) // sleep for expirationTimeMs to ensure that the urls are refreshed. - Thread.sleep(defaultUrlExpirationMs) + Thread.sleep(15000) // Verify that the url is refreshed as paths(1), not paths(0) anymore. assert(fetcher.getUrl == paths(1)) diff --git a/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala b/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala new file mode 100644 index 00000000000..06f8d8776eb --- /dev/null +++ b/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala @@ -0,0 +1,270 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sharing.spark + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.delta.util.JsonUtils +import io.delta.sharing.client.{ + DeltaSharingClient, + DeltaSharingProfileProvider, + DeltaSharingRestClient +} +import io.delta.sharing.client.model.{ + AddFile => ClientAddFile, + DeltaTableFiles, + DeltaTableMetadata, + SingleAction, + Table +} + +import org.apache.spark.SparkEnv +import org.apache.spark.storage.BlockId + +/** + * A mocked delta sharing client for DeltaFormatSharing. + * The test suite need to prepare the mocked delta sharing rpc response and store them in + * BlockManager. Then this client will just load the response of return upon rpc call. + */ +private[spark] class TestClientForDeltaFormatSharing( + profileProvider: DeltaSharingProfileProvider, + timeoutInSeconds: Int = 120, + numRetries: Int = 10, + maxRetryDuration: Long = Long.MaxValue, + sslTrustAll: Boolean = false, + forStreaming: Boolean = false, + responseFormat: String = DeltaSharingRestClient.RESPONSE_FORMAT_DELTA, + readerFeatures: String = "", + queryTablePaginationEnabled: Boolean = false, + maxFilesPerReq: Int = 100000) + extends DeltaSharingClient { + + assert( + responseFormat == DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET || + (readerFeatures.contains("deletionVectors") && readerFeatures.contains("columnMapping")), + "deletionVectors and columnMapping should be supported in all types of queries." + ) + + import TestClientForDeltaFormatSharing._ + + override def listAllTables(): Seq[Table] = throw new UnsupportedOperationException("not needed") + + override def getMetadata( + table: Table, + versionAsOf: Option[Long] = None, + timestampAsOf: Option[String] = None): DeltaTableMetadata = { + val iterator = SparkEnv.get.blockManager + .get[String](getBlockId(table.name, "getMetadata", versionAsOf, timestampAsOf)) + .map(_.data.asInstanceOf[Iterator[String]]) + .getOrElse { + throw new IllegalStateException( + s"getMetadata is missing for: ${table.name}, versionAsOf:$versionAsOf, " + + s"timestampAsOf:$timestampAsOf. This shouldn't happen in the unit test." + ) + } + // iterator.toSeq doesn't trigger CompletionIterator in BlockManager which releases the reader + // lock on the underlying block. iterator hasNext does trigger it. + val linesBuilder = Seq.newBuilder[String] + while (iterator.hasNext) { + linesBuilder += iterator.next() + } + if (table.name.contains("shared_parquet_table")) { + val lines = linesBuilder.result() + val protocol = JsonUtils.fromJson[SingleAction](lines(0)).protocol + val metadata = JsonUtils.fromJson[SingleAction](lines(1)).metaData + DeltaTableMetadata( + version = versionAsOf.getOrElse(getTableVersion(table)), + protocol = protocol, + metadata = metadata, + respondedFormat = DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET + ) + } else { + DeltaTableMetadata( + version = versionAsOf.getOrElse(getTableVersion(table)), + lines = linesBuilder.result(), + respondedFormat = DeltaSharingRestClient.RESPONSE_FORMAT_DELTA + ) + } + } + + override def getTableVersion(table: Table, startingTimestamp: Option[String] = None): Long = { + val versionOpt = SparkEnv.get.blockManager.getSingle[Long]( + getBlockId(table.name, "getTableVersion") + ) + val version = versionOpt.getOrElse { + throw new IllegalStateException( + s"getTableVersion is missing for: ${table.name}. This shouldn't happen in the unit test." + ) + } + SparkEnv.get.blockManager.releaseLock(getBlockId(table.name, "getTableVersion")) + version + } + + override def getFiles( + table: Table, + predicates: Seq[String], + limit: Option[Long], + versionAsOf: Option[Long], + timestampAsOf: Option[String], + jsonPredicateHints: Option[String], + refreshToken: Option[String] + ): DeltaTableFiles = { + limit.foreach(lim => TestClientForDeltaFormatSharing.limits.put( + s"${table.share}.${table.schema}.${table.name}", lim)) + TestClientForDeltaFormatSharing.requestedFormat.put( + s"${table.share}.${table.schema}.${table.name}", responseFormat) + val iterator = SparkEnv.get.blockManager + .get[String](getBlockId(table.name, "getFiles", versionAsOf, timestampAsOf)) + .map(_.data.asInstanceOf[Iterator[String]]) + .getOrElse { + throw new IllegalStateException( + s"getFiles is missing for: ${table.name} versionAsOf:$versionAsOf, " + + s"timestampAsOf:$timestampAsOf. This shouldn't happen in the unit test." + ) + } + // iterator.toSeq doesn't trigger CompletionIterator in BlockManager which releases the reader + // lock on the underlying block. iterator hasNext does trigger it. + val linesBuilder = Seq.newBuilder[String] + while (iterator.hasNext) { + linesBuilder += iterator.next() + } + if (table.name.contains("shared_parquet_table")) { + val lines = linesBuilder.result() + val protocol = JsonUtils.fromJson[SingleAction](lines(0)).protocol + val metadata = JsonUtils.fromJson[SingleAction](lines(1)).metaData + val files = ArrayBuffer[ClientAddFile]() + lines.drop(2).foreach { line => + val action = JsonUtils.fromJson[SingleAction](line) + if (action.file != null) { + files.append(action.file) + } else { + throw new IllegalStateException(s"Unexpected Line:${line}") + } + } + DeltaTableFiles( + versionAsOf.getOrElse(getTableVersion(table)), + protocol, + metadata, + files.toSeq, + respondedFormat = DeltaSharingRestClient.RESPONSE_FORMAT_PARQUET + ) + } else { + DeltaTableFiles( + version = versionAsOf.getOrElse(getTableVersion(table)), + lines = linesBuilder.result(), + respondedFormat = DeltaSharingRestClient.RESPONSE_FORMAT_DELTA + ) + } + } + + override def getFiles( + table: Table, + startingVersion: Long, + endingVersion: Option[Long] + ): DeltaTableFiles = { + assert( + endingVersion.isDefined, + "endingVersion is not defined. This shouldn't happen in unit test." + ) + val iterator = SparkEnv.get.blockManager + .get[String](getBlockId(table.name, s"getFiles_${startingVersion}_${endingVersion.get}")) + .map(_.data.asInstanceOf[Iterator[String]]) + .getOrElse { + throw new IllegalStateException( + s"getFiles is missing for: ${table.name} with [${startingVersion}, " + + s"${endingVersion.get}]. This shouldn't happen in the unit test." + ) + } + // iterator.toSeq doesn't trigger CompletionIterator in BlockManager which releases the reader + // lock on the underlying block. iterator hasNext does trigger it. + val linesBuilder = Seq.newBuilder[String] + while (iterator.hasNext) { + linesBuilder += iterator.next() + } + DeltaTableFiles( + version = getTableVersion(table), + lines = linesBuilder.result(), + respondedFormat = DeltaSharingRestClient.RESPONSE_FORMAT_DELTA + ) + } + + override def getCDFFiles( + table: Table, + cdfOptions: Map[String, String], + includeHistoricalMetadata: Boolean + ): DeltaTableFiles = { + val suffix = cdfOptions + .get(DeltaSharingOptions.CDF_START_VERSION) + .getOrElse( + cdfOptions.get(DeltaSharingOptions.CDF_START_TIMESTAMP).get + ) + val iterator = SparkEnv.get.blockManager + .get[String]( + getBlockId( + table.name, + s"getCDFFiles_$suffix" + ) + ) + .map( + _.data.asInstanceOf[Iterator[String]] + ) + .getOrElse { + throw new IllegalStateException( + s"getCDFFiles is missing for: ${table.name}. This shouldn't happen in the unit test." + ) + } + // iterator.toSeq doesn't trigger CompletionIterator in BlockManager which releases the reader + // lock on the underlying block. iterator hasNext does trigger it. + val linesBuilder = Seq.newBuilder[String] + while (iterator.hasNext) { + linesBuilder += iterator.next() + } + DeltaTableFiles( + version = getTableVersion(table), + lines = linesBuilder.result(), + respondedFormat = DeltaSharingRestClient.RESPONSE_FORMAT_DELTA + ) + } + + override def getForStreaming(): Boolean = forStreaming + + override def getProfileProvider: DeltaSharingProfileProvider = profileProvider +} + +object TestClientForDeltaFormatSharing { + def getBlockId( + sharedTableName: String, + queryType: String, + versionAsOf: Option[Long] = None, + timestampAsOf: Option[String] = None): BlockId = { + assert(!(versionAsOf.isDefined && timestampAsOf.isDefined)) + val suffix = if (versionAsOf.isDefined) { + s"_v${versionAsOf.get}" + } else if (timestampAsOf.isDefined) { + s"_t${timestampAsOf.get}" + } else { + "" + } + BlockId( + s"${DeltaSharingUtils.DELTA_SHARING_BLOCK_ID_PREFIX}" + + s"_${sharedTableName}_$queryType$suffix" + ) + } + + val limits = scala.collection.mutable.Map[String, Long]() + val requestedFormat = scala.collection.mutable.Map[String, String]() +} diff --git a/sharing/src/test/scala/io/delta/sharing/spark/TestDeltaSharingFileSystem.scala b/sharing/src/test/scala/io/delta/sharing/spark/TestDeltaSharingFileSystem.scala new file mode 100644 index 00000000000..2d372afba75 --- /dev/null +++ b/sharing/src/test/scala/io/delta/sharing/spark/TestDeltaSharingFileSystem.scala @@ -0,0 +1,140 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.sharing.spark + +import java.io.FileNotFoundException +import java.net.{URI, URLDecoder, URLEncoder} +import java.util.concurrent.TimeUnit + +import io.delta.sharing.client.DeltaSharingFileSystem +import org.apache.hadoop.fs._ +import org.apache.hadoop.fs.permission.FsPermission +import org.apache.hadoop.util.Progressable + +import org.apache.spark.SparkEnv +import org.apache.spark.delta.sharing.{PreSignedUrlCache, PreSignedUrlFetcher} +import org.apache.spark.storage.BlockId + +/** + * Read-only file system for DeltaSharingDataSourceDeltaSuite. + * To replace DeltaSharingFileSystem and return the content for parquet files. + */ +private[spark] class TestDeltaSharingFileSystem extends FileSystem { + import TestDeltaSharingFileSystem._ + + private lazy val preSignedUrlCacheRef = PreSignedUrlCache.getEndpointRefInExecutor(SparkEnv.get) + + override def getScheme: String = SCHEME + + override def getUri(): URI = URI.create(s"$SCHEME:///") + + override def open(f: Path, bufferSize: Int): FSDataInputStream = { + val path = DeltaSharingFileSystem.decode(f) + val fetcher = + new PreSignedUrlFetcher( + preSignedUrlCacheRef, + path.tablePath, + path.fileId, + TimeUnit.MINUTES.toMillis(10) + ) + val (tableName, parquetFilePath) = decode(fetcher.getUrl()) + val arrayBuilder = Array.newBuilder[Byte] + val iterator = SparkEnv.get.blockManager + .get[Byte](getBlockId(tableName, parquetFilePath)) + .map( + _.data.asInstanceOf[Iterator[Byte]] + ) + .getOrElse { + throw new FileNotFoundException(f.toString) + } + while (iterator.hasNext) { + arrayBuilder += iterator.next() + } + new FSDataInputStream(new SeekableByteArrayInputStream(arrayBuilder.result())) + } + + override def create( + f: Path, + permission: FsPermission, + overwrite: Boolean, + bufferSize: Int, + replication: Short, + blockSize: Long, + progress: Progressable): FSDataOutputStream = + throw new UnsupportedOperationException("create") + + override def append(f: Path, bufferSize: Int, progress: Progressable): FSDataOutputStream = + throw new UnsupportedOperationException("append") + + override def rename(src: Path, dst: Path): Boolean = + throw new UnsupportedOperationException("rename") + + override def delete(f: Path, recursive: Boolean): Boolean = + throw new UnsupportedOperationException("delete") + + override def listStatus(f: Path): Array[FileStatus] = + throw new UnsupportedOperationException("listStatus") + + override def setWorkingDirectory(new_dir: Path): Unit = + throw new UnsupportedOperationException("setWorkingDirectory") + + override def getWorkingDirectory: Path = new Path(getUri) + + override def mkdirs(f: Path, permission: FsPermission): Boolean = + throw new UnsupportedOperationException("mkdirs") + + override def getFileStatus(f: Path): FileStatus = { + val resolved = makeQualified(f) + new FileStatus(DeltaSharingFileSystem.decode(resolved).fileSize, false, 0, 1, 0, f) + } + + override def close(): Unit = { + super.close() + } +} + +private[spark] object TestDeltaSharingFileSystem { + val SCHEME = "delta-sharing" + + def getBlockId(tableName: String, parquetFilePath: String): BlockId = { + BlockId( + s"${DeltaSharingUtils.DELTA_SHARING_BLOCK_ID_PREFIX}_" + + s"{$tableName}_$parquetFilePath" + ) + } + + // The encoded string is purely for testing purpose to contain the table name and file path, + // which will be decoded and used to find block in block manager. + // In real traffic, it will be a pre-signed url. + def encode(tableName: String, parquetFilePath: String): String = { + val encodedTableName = URLEncoder.encode(tableName, "UTF-8") + val encodedParquetFilePath = URLEncoder.encode(parquetFilePath, "UTF-8") + // SCHEME:/// is needed for making this path an absolute path + s"$SCHEME:///$encodedTableName/$encodedParquetFilePath" + } + + def decode(encodedPath: String): (String, String) = { + val Array(tableName, parquetFilePath) = encodedPath + .stripPrefix(s"$SCHEME:///") + .stripPrefix(s"$SCHEME:/") + .split("/") + .map( + URLDecoder.decode(_, "UTF-8") + ) + (tableName, parquetFilePath) + } +} From 0bbfb9eaec9dce8bfc18d5e99b5afdd5844b4eb7 Mon Sep 17 00:00:00 2001 From: Lin Zhou Date: Tue, 9 Jan 2024 23:47:45 -0800 Subject: [PATCH 2/4] resolve PR comments --- .../io/delta/sharing/spark/DeltaSharingCDFUtils.scala | 9 ++++++++- .../io/delta/sharing/spark/DeltaSharingDataSource.scala | 2 +- .../delta/sharing/spark/DeltaSharingLogFileSystem.scala | 2 +- .../delta/sharing/spark/DeltaSharingCDFUtilsSuite.scala | 2 +- 4 files changed, 11 insertions(+), 4 deletions(-) diff --git a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingCDFUtils.scala b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingCDFUtils.scala index d6be355562a..7b8a8294c67 100644 --- a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingCDFUtils.scala +++ b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingCDFUtils.scala @@ -36,7 +36,12 @@ object DeltaSharingCDFUtils extends Logging { (System.currentTimeMillis() - start) / 1000.0 } - private[sharing] def prepareCDCRelation( + /** + * Prepares the BaseRelation for cdf queries on a delta sharing table. Since there's no limit + * pushdown or filter pushdown involved, it wiill firatly fetch all the files from the delta + * sharing server, prepare the local delta log, and leverage DeltaTableV2 to produce the relation. + */ + private[sharing] def prepareCDFRelation( sqlContext: SQLContext, options: DeltaSharingOptions, table: DeltaSharingTable, @@ -73,6 +78,8 @@ object DeltaSharingCDFUtils extends Logging { // CachedTableManager. tablePath = DeltaSharingUtils.getTablePathWithIdSuffix(path, queryParamsHashId), idToUrl = deltaLogMetadata.idToUrl, + // A weak reference is needed by the CachedTableManager to decide whether the query is done + // and it's ok to clean up the id to url mapping for this table. refs = Seq(new WeakReference(this)), profileProvider = client.getProfileProvider, refresher = DeltaSharingUtils.getRefresherForGetCDFFiles( diff --git a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala index 28fa1eb1206..c76b9389d49 100644 --- a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala +++ b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingDataSource.scala @@ -100,7 +100,7 @@ private[sharing] class DeltaSharingDataSource ) if (options.readChangeFeed) { - return DeltaSharingCDFUtils.prepareCDCRelation(sqlContext, options, dsTable, client) + return DeltaSharingCDFUtils.prepareCDFRelation(sqlContext, options, dsTable, client) } // 2. getMetadata for schema to be used in the file index. val deltaTableMetadata = DeltaSharingUtils.queryDeltaTableMetadata( diff --git a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingLogFileSystem.scala b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingLogFileSystem.scala index 50f9c99b3b1..02f9d9e0462 100644 --- a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingLogFileSystem.scala +++ b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingLogFileSystem.scala @@ -354,7 +354,7 @@ private[sharing] object DeltaSharingLogFileSystem extends Logging { /** - * Construct local delta log based on lines returned from delta sharing server. + * Construct local delta log based on delta log actions returned from delta sharing server. * * @param lines a list of delta actions, to be processed and put in the local delta log, * each action contains a version field to indicate the version of log to diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingCDFUtilsSuite.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingCDFUtilsSuite.scala index aa8d54d2b48..61d5fc1f40b 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingCDFUtilsSuite.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingCDFUtilsSuite.scala @@ -191,7 +191,7 @@ class DeltaSharingCDFUtilsSuite val dsTable = Table(share = shareName, schema = schemaName, name = sharedTableName) val options = new DeltaSharingOptions(Map("path" -> tablePath.toString)) - DeltaSharingCDFUtils.prepareCDCRelation( + DeltaSharingCDFUtils.prepareCDFRelation( SparkSession.active.sqlContext, options, dsTable, From bb5018772fab8c20d9ceed2f8177cd519bc2f18c Mon Sep 17 00:00:00 2001 From: Lin Zhou Date: Wed, 10 Jan 2024 00:10:19 -0800 Subject: [PATCH 3/4] remove --- .../sharing/spark/DeltaSharingDataSourceDeltaTestUtils.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaTestUtils.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaTestUtils.scala index 601a6297ac6..cc29d6d9976 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaTestUtils.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaTestUtils.scala @@ -622,7 +622,6 @@ trait DeltaSharingDataSourceDeltaTestUtils extends SharedSparkSession { ) } } - if (assertMultipleDvsInOneFile) { assert(dvPathToCount.max._2 > 1) } From 82d6d6e52b6837c859ecc65bf3031b90c1735e5e Mon Sep 17 00:00:00 2001 From: Lin Zhou Date: Wed, 10 Jan 2024 00:36:10 -0800 Subject: [PATCH 4/4] fix --- .../sharing/spark/DeltaSharingDataSourceDeltaTestUtils.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaTestUtils.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaTestUtils.scala index cc29d6d9976..601a6297ac6 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaTestUtils.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaTestUtils.scala @@ -622,6 +622,7 @@ trait DeltaSharingDataSourceDeltaTestUtils extends SharedSparkSession { ) } } + if (assertMultipleDvsInOneFile) { assert(dvPathToCount.max._2 > 1) }