Skip to content

Commit

Permalink
[SC-86940][DELTA]Fix potential checkpoint corruption issue for GCS in…
Browse files Browse the repository at this point in the history
… OSS Delta

This PR moves checkpoint write to a separate thread for GCS to workaround the potential checkpoint corruption issue when a thread is interrupted.

GitOrigin-RevId: 418df2441923c85c6415f9baec157c923c4c3dca
  • Loading branch information
zsxwing authored and mengtong-db committed Nov 5, 2021
1 parent 315a4a0 commit 95e9076
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 63 deletions.
45 changes: 3 additions & 42 deletions contribs/src/test/scala/io/delta/storage/GCSLogStoreSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.net.URI
import org.apache.hadoop.fs.{FSDataOutputStream, Path, RawLocalFileSystem}
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.util.Progressable
import org.apache.spark.sql.delta.{FakeFileSystem, LogStoreSuiteBase}
import org.apache.spark.sql.delta.{FakeFileSystem, FakeGCSFileSystem, LogStoreSuiteBase}

class GCSLogStoreSuite extends LogStoreSuiteBase {

Expand All @@ -36,9 +36,9 @@ class GCSLogStoreSuite extends LogStoreSuiteBase {

test("gcs write should happen in a new thread") {
withTempDir { tempDir =>
// Use `FakeGSCFileSystem` to verify we write in the correct thread.
// Use `FakeGCSFileSystem` to verify we write in the correct thread.
withSQLConf(
"fs.gs.impl" -> classOf[FakeGSCFileSystem].getName,
"fs.gs.impl" -> classOf[FakeGCSFileSystem].getName,
"fs.gs.impl.disable.cache" -> "true") {
val store = createLogStore(spark)
store.write(
Expand All @@ -50,42 +50,3 @@ class GCSLogStoreSuite extends LogStoreSuiteBase {
}
}
}

/**
* A fake GCS file system to verify delta commits are written in a separate gcs thread.
*/
class FakeGSCFileSystem extends RawLocalFileSystem {
override def getScheme: String = "gs"
override def getUri: URI = URI.create("gs:/")

private def assertGCSThread(f: Path): Unit = {
if (f.getName.contains(".json")) {
assert(
Thread.currentThread().getName.contains("delta-gcs-"),
s"writing $f was happening in non gcs thread: ${Thread.currentThread()}")
}
}

override def create(
f: Path,
permission: FsPermission,
overwrite: Boolean,
bufferSize: Int,
replication: Short,
blockSize: Long,
progress: Progressable): FSDataOutputStream = {
assertGCSThread(f)
super.create(f, permission, overwrite, bufferSize, replication, blockSize, progress)
}

override def create(
f: Path,
overwrite: Boolean,
bufferSize: Int,
replication: Short,
blockSize: Long,
progress: Progressable): FSDataOutputStream = {
assertGCSThread(f)
super.create(f, overwrite, bufferSize, replication, blockSize, progress)
}
}
78 changes: 57 additions & 21 deletions core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.delta.storage.LogStore
import org.apache.spark.sql.delta.util.DeltaFileOperations
import org.apache.spark.sql.delta.util.FileNames._
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.{JobConf, TaskAttemptContextImpl, TaskAttemptID}
import org.apache.hadoop.mapreduce.{Job, TaskType}
Expand Down Expand Up @@ -285,28 +286,44 @@ object Checkpoints extends DeltaLogging {
} else {
path
}
try {
val writer = factory.newInstance(
writtenPath,
schema,
new TaskAttemptContextImpl(
new JobConf(serConf.value),
new TaskAttemptID("", 0, TaskType.REDUCE, 0, 0)))

iter.foreach { row =>
checkpointSize.add(1)
writer.write(row)
}
writer.close()
} catch {
case e: org.apache.hadoop.fs.FileAlreadyExistsException if !useRename =>
val p = new Path(writtenPath)
if (p.getFileSystem(serConf.value).exists(p)) {
// The file has been written by a zombie task. We can just use this checkpoint file
// rather than failing a Delta commit.
} else {
throw e
val writeAction = () => {
try {
val writer = factory.newInstance(
writtenPath,
schema,
new TaskAttemptContextImpl(
new JobConf(serConf.value),
new TaskAttemptID("", 0, TaskType.REDUCE, 0, 0)))

iter.foreach { row =>
checkpointSize.add(1)
writer.write(row)
}
// Note: `writer.close()` is not put in a `finally` clause because we don't want to
// close it when an exception happens. Closing the file would flush the content to the
// storage and create an incomplete file. A concurrent reader might see it and fail.
// This would leak resources but we don't have a way to abort the storage request here.
writer.close()
} catch {
case e: org.apache.hadoop.fs.FileAlreadyExistsException if !useRename =>
val p = new Path(writtenPath)
if (p.getFileSystem(serConf.value).exists(p)) {
// The file has been written by a zombie task. We can just use this checkpoint file
// rather than failing a Delta commit.
} else {
throw e
}
}
}
if (isGCSPath(serConf.value, new Path(writtenPath))) {
// GCS may upload an incomplete file when the current thread is interrupted, hence we move
// the write to a new thread so that the write cannot be interrupted.
// TODO Remove this hack when the GCS Hadoop connector fixes the issue.
DeltaFileOperations.runInNewThread("delta-gcs-checkpoint-write") {
writeAction()
}
} else {
writeAction()
}
Iterator(writtenPath)
}.collect().head
Expand Down Expand Up @@ -343,6 +360,25 @@ object Checkpoints extends DeltaLogging {
CheckpointMetaData(snapshot.version, checkpointSize.value, None)
}

// scalastyle:off line.size.limit
/**
* All GCS paths can only have the scheme of "gs". Note: the scheme checking is case insensitive.
* See:
* - https://github.com/databricks/hadoop-connectors/blob/master/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemBase.java#L493
* - https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/v2.2.3/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageFileSystem.java#L88
*/
// scalastyle:on line.size.limit
private[delta] def isGCSPath(hadoopConf: Configuration, path: Path): Boolean = {
val scheme = path.toUri.getScheme
if (scheme != null) {
scheme.equalsIgnoreCase("gs")
} else {
// When the schema is not available in the path, we check the file system scheme resolved from
// the path.
path.getFileSystem(hadoopConf).getScheme.equalsIgnoreCase("gs")
}
}

/**
* Modify the contents of the add column based on the table properties
*/
Expand Down
109 changes: 109 additions & 0 deletions core/src/test/scala/org/apache/spark/sql/delta/CheckpointsSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import java.net.URI

import org.apache.spark.sql.delta.storage.LocalLogStore
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataOutputStream, Path, RawLocalFileSystem}
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.util.Progressable

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.test.SharedSparkSession

class CheckpointsSuite extends QueryTest with SharedSparkSession {

protected override def sparkConf = {
// Set the gs LogStore impl to `LocalLogStore` so that it will work with `FakeGCSFileSystem`.
// The default one is `HDFSLogStore` which requires a `FileContext` but we don't have one.
super.sparkConf.set("spark.delta.logStore.gs.impl", classOf[LocalLogStore].getName)
}

test("SC-86940: isGCSPath") {
val conf = new Configuration()
assert(Checkpoints.isGCSPath(conf, new Path("gs://foo/bar")))
// Scheme is case insensitive
assert(Checkpoints.isGCSPath(conf, new Path("Gs://foo/bar")))
assert(Checkpoints.isGCSPath(conf, new Path("GS://foo/bar")))
assert(Checkpoints.isGCSPath(conf, new Path("gS://foo/bar")))
assert(!Checkpoints.isGCSPath(conf, new Path("non-gs://foo/bar")))
assert(!Checkpoints.isGCSPath(conf, new Path("/foo")))
// Set the default file system and verify we can detect it
conf.set("fs.defaultFS", "gs://foo/")
conf.set("fs.gs.impl", classOf[FakeGCSFileSystem].getName)
conf.set("fs.gs.impl.disable.cache", "true")
assert(Checkpoints.isGCSPath(conf, new Path("/foo")))
}

test("SC-86940: writing a GCS checkpoint should happen in a new thread") {
withTempDir { tempDir =>
val path = tempDir.getCanonicalPath
spark.range(1).write.format("delta").save(path)

// Use `FakeGCSFileSystem` which will verify we write in a separate gcs thread.
withSQLConf(
"fs.gs.impl" -> classOf[FakeGCSFileSystem].getName,
"fs.gs.impl.disable.cache" -> "true") {
DeltaLog.clearCache()
val gsPath = new Path(s"gs://${tempDir.getCanonicalPath}")
val deltaLog = DeltaLog.forTable(spark, gsPath)
deltaLog.checkpoint()
}
}
}
}

/**
* A fake GCS file system to verify delta commits are written in a separate gcs thread.
*/
class FakeGCSFileSystem extends RawLocalFileSystem {
override def getScheme: String = "gs"
override def getUri: URI = URI.create("gs:/")

private def assertGCSThread(f: Path): Unit = {
if (f.getName.contains(".json") || f.getName.contains(".checkpoint")) {
assert(
Thread.currentThread().getName.contains("delta-gcs-"),
s"writing $f was happening in non gcs thread: ${Thread.currentThread()}")
}
}

override def create(
f: Path,
permission: FsPermission,
overwrite: Boolean,
bufferSize: Int,
replication: Short,
blockSize: Long,
progress: Progressable): FSDataOutputStream = {
assertGCSThread(f)
super.create(f, permission, overwrite, bufferSize, replication, blockSize, progress)
}

override def create(
f: Path,
overwrite: Boolean,
bufferSize: Int,
replication: Short,
blockSize: Long,
progress: Progressable): FSDataOutputStream = {
assertGCSThread(f)
super.create(f, overwrite, bufferSize, replication, blockSize, progress)
}
}

0 comments on commit 95e9076

Please sign in to comment.