Skip to content

Commit c156c98

Browse files
jkyllingscottsand-db
authored andcommitted
Use startAfter in S3SingleDriverLogStore.listFrom
## Description The current implementation of `S3SingleDriverLogStore.listFrom` lists the entire content of the parent directory and filters the result. This can take a long time if the parent directory contains a lot of files. In practice, this can happen for _delta_log folders with a lot of commits. We change the implementation to use the [startAfter](https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/s3/model/ListObjectsV2Request.html#startAfter--) parameter such that we only get keys lexicographically greater or equal than the resolved path in the S3 list response. This will usually reduce the number of S3 list requests from `size of _delta_log / 1000` to 1. This resolves #(#1191). I've tested the patch briefly with the sample test described in #(#1191). The [previous iteration of this patch](jkylling@ec998ee) has been tested a bit more. Correctness has not been tested thoroughly. ## Does this PR introduce _any_ user-facing changes? No Closes #1210 Signed-off-by: Scott Sandre <[email protected]> GitOrigin-RevId: 2a0d1279655672cbdffd5604b7d7d781556888b9
1 parent 4a401ea commit c156c98

File tree

6 files changed

+297
-1
lines changed

6 files changed

+297
-1
lines changed

build.sbt

+1
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ lazy val storage = (project in file("storage"))
171171
// User can provide any 2.x or 3.x version. We don't use any new fancy APIs. Watch out for
172172
// versions with known vulnerabilities.
173173
"org.apache.hadoop" % "hadoop-common" % "3.3.1" % "provided",
174+
"org.apache.hadoop" % "hadoop-aws" % "3.3.1" % "provided",
174175

175176
// Test Deps
176177
"org.scalatest" %% "scalatest" % "3.2.11" % "test",

run-integration-tests.py

+27
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,23 @@ def run_dynamodb_logstore_integration_tests(root_dir, version, test_name, extra_
179179
raise
180180

181181

182+
def run_s3_log_store_util_integration_tests():
183+
print("\n\n##### Running S3LogStoreUtil tests #####")
184+
185+
env = { "S3_LOG_STORE_UTIL_TEST_ENABLED": "true" }
186+
assert os.environ.get("S3_LOG_STORE_UTIL_TEST_BUCKET") is not None, "S3_LOG_STORE_UTIL_TEST_BUCKET must be set"
187+
assert os.environ.get("S3_LOG_STORE_UTIL_TEST_RUN_UID") is not None, "S3_LOG_STORE_UTIL_TEST_RUN_UID must be set"
188+
189+
try:
190+
cmd = ["build/sbt", "project storage", "testOnly -- -n IntegrationTest"]
191+
print("\nRunning IntegrationTests of storage\n=====================")
192+
print("Command: %s" % " ".join(cmd))
193+
run_cmd(cmd, stream_output=True, env=env)
194+
except:
195+
print("Failed IntegrationTests")
196+
raise
197+
198+
182199
def run_pip_installation_tests(root_dir, version, use_testpypi, extra_maven_repo):
183200
print("\n\n##### Running pip installation tests on version %s #####" % str(version))
184201
clear_artifact_cache()
@@ -292,6 +309,12 @@ def __exit__(self, tpe, value, traceback):
292309
default=False,
293310
action="store_true",
294311
help="Run only Scala tests")
312+
parser.add_argument(
313+
"--s3-log-store-util-only",
314+
required=False,
315+
default=False,
316+
action="store_true",
317+
help="Run only S3LogStoreUtil tests")
295318
parser.add_argument(
296319
"--scala-version",
297320
required=False,
@@ -370,6 +393,10 @@ def __exit__(self, tpe, value, traceback):
370393
args.dbb_packages, args.dbb_conf, args.use_local)
371394
quit()
372395

396+
if args.s3_log_store_util_only:
397+
run_s3_log_store_util_integration_tests()
398+
quit()
399+
373400
if run_scala:
374401
run_scala_integration_tests(root_dir, args.version, args.test, args.maven_repo,
375402
args.scala_version, args.use_local)

storage/src/main/java/io/delta/storage/S3SingleDriverLogStore.java

+25-1
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,13 @@
3232
import com.google.common.cache.CacheBuilder;
3333
import com.google.common.io.CountingOutputStream;
3434
import io.delta.storage.internal.FileNameUtils;
35+
import io.delta.storage.internal.S3LogStoreUtil;
3536
import org.apache.hadoop.conf.Configuration;
3637
import org.apache.hadoop.fs.FileStatus;
3738
import org.apache.hadoop.fs.FileSystem;
39+
import org.apache.hadoop.fs.LocalFileSystem;
3840
import org.apache.hadoop.fs.Path;
41+
import org.apache.hadoop.fs.RawLocalFileSystem;
3942

4043
/**
4144
* Single Spark-driver/JVM LogStore implementation for S3.
@@ -61,6 +64,17 @@
6164
*/
6265
public class S3SingleDriverLogStore extends HadoopFileSystemLogStore {
6366

67+
/**
68+
* Enables a faster implementation of listFrom by setting the startAfter parameter in S3 list
69+
* requests. The feature is enabled by setting the property delta.enableFastS3AListFrom in the
70+
* Hadoop configuration.
71+
*
72+
* This feature requires the Hadoop file system used for S3 paths to be castable to
73+
* org.apache.hadoop.fs.s3a.S3AFileSystem.
74+
*/
75+
private final boolean enableFastListFrom
76+
= initHadoopConf().getBoolean("delta.enableFastS3AListFrom", false);
77+
6478
///////////////////////////
6579
// Static Helper Methods //
6680
///////////////////////////
@@ -223,8 +237,18 @@ private Iterator<FileStatus> listFromInternal(
223237
);
224238
}
225239

240+
FileStatus[] statuses;
241+
if (
242+
// LocalFileSystem and RawLocalFileSystem checks are needed for tests to pass
243+
fs instanceof LocalFileSystem || fs instanceof RawLocalFileSystem || !enableFastListFrom
244+
) {
245+
statuses = fs.listStatus(parentPath);
246+
} else {
247+
statuses = S3LogStoreUtil.s3ListFromArray(fs, resolvedPath, parentPath);
248+
}
249+
226250
final List<FileStatus> listedFromFs = Arrays
227-
.stream(fs.listStatus(parentPath))
251+
.stream(statuses)
228252
.filter(s -> s.getPath().getName().compareTo(resolvedPath.getName()) >= 0)
229253
.collect(Collectors.toList());
230254

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright (2022) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.delta.storage.internal;
18+
19+
import com.amazonaws.services.s3.model.ListObjectsV2Request;
20+
import org.apache.hadoop.fs.*;
21+
import org.apache.hadoop.fs.s3a.*;
22+
23+
import java.io.IOException;
24+
import java.nio.charset.StandardCharsets;
25+
import java.util.HashSet;
26+
27+
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_MAX_PAGING_KEYS;
28+
import static org.apache.hadoop.fs.s3a.Constants.MAX_PAGING_KEYS;
29+
import static org.apache.hadoop.fs.s3a.S3AUtils.iteratorToStatuses;
30+
31+
32+
/**
33+
* Static utility methods for the S3SingleDriverLogStore.
34+
*
35+
* Used to trick the class loader so we can use methods of org.apache.hadoop:hadoop-aws without needing to load this as
36+
* a dependency for tests in core.
37+
*/
38+
public final class S3LogStoreUtil {
39+
private S3LogStoreUtil() {}
40+
41+
private static PathFilter ACCEPT_ALL = new PathFilter() {
42+
@Override
43+
public boolean accept(Path file) {
44+
return true;
45+
}
46+
47+
@Override
48+
public String toString() {
49+
return "ACCEPT_ALL";
50+
}
51+
};
52+
53+
/**
54+
* Uses the S3ListRequest.v2 interface with the startAfter parameter to only list files
55+
* which are lexicographically greater than resolvedPath.
56+
*/
57+
private static RemoteIterator<S3AFileStatus> s3ListFrom(
58+
S3AFileSystem s3afs,
59+
Path resolvedPath,
60+
Path parentPath) throws IOException {
61+
int maxKeys = S3AUtils.intOption(s3afs.getConf(), MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
62+
Listing listing = s3afs.getListing();
63+
// List files lexicographically after resolvedPath inclusive within the same directory
64+
return listing.createFileStatusListingIterator(resolvedPath,
65+
S3ListRequest.v2(
66+
new ListObjectsV2Request()
67+
.withBucketName(s3afs.getBucket())
68+
.withMaxKeys(maxKeys)
69+
.withPrefix(s3afs.pathToKey(parentPath))
70+
.withStartAfter(keyBefore(s3afs.pathToKey(resolvedPath)))
71+
), ACCEPT_ALL,
72+
new Listing.AcceptAllButSelfAndS3nDirs(parentPath)
73+
);
74+
}
75+
76+
/**
77+
* Uses the S3ListRequest.v2 interface with the startAfter parameter to only list files
78+
* which are lexicographically greater than resolvedPath.
79+
*
80+
* Wraps s3ListFrom in an array. Contained in this class to avoid contaminating other
81+
* classes with dependencies on recent Hadoop versions.
82+
*
83+
* TODO: Remove this method when iterators are used everywhere.
84+
*/
85+
public static FileStatus[] s3ListFromArray(
86+
FileSystem fs,
87+
Path resolvedPath,
88+
Path parentPath) throws IOException {
89+
S3AFileSystem s3afs;
90+
try {
91+
s3afs = (S3AFileSystem) fs;
92+
} catch (ClassCastException e) {
93+
throw new UnsupportedOperationException(
94+
"The Hadoop file system used for the S3LogStore must be castable to " +
95+
"org.apache.hadoop.fs.s3a.S3AFileSystem.", e);
96+
}
97+
return iteratorToStatuses(S3LogStoreUtil.s3ListFrom(s3afs, resolvedPath, parentPath), new HashSet<>());
98+
}
99+
100+
/**
101+
* Get the key which is lexicographically right before key.
102+
* If the key is empty return null.
103+
* If the key ends in a null byte, remove the last byte.
104+
* Otherwise, subtract one from the last byte.
105+
*/
106+
static String keyBefore(String key) {
107+
byte[] bytes = key.getBytes(StandardCharsets.UTF_8);
108+
if(bytes.length == 0) return null;
109+
if(bytes[bytes.length - 1] > 0) {
110+
bytes[bytes.length - 1] -= 1;
111+
return new String(bytes, StandardCharsets.UTF_8);
112+
} else {
113+
return new String(bytes, 0, bytes.length - 1, StandardCharsets.UTF_8);
114+
}
115+
}
116+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package io.delta.storage.integration
2+
3+
import io.delta.storage.internal.{FileNameUtils, S3LogStoreUtil}
4+
import org.apache.hadoop.conf.Configuration
5+
import org.apache.hadoop.fs.Path
6+
import org.apache.hadoop.fs.s3a.S3AFileSystem
7+
import org.scalatest.Tag
8+
import org.scalatest.funsuite.AnyFunSuite
9+
10+
import java.net.URI
11+
import scala.math.max
12+
import scala.math.ceil
13+
14+
/**
15+
* These integration tests are executed by setting the
16+
* environment variables
17+
* S3_LOG_STORE_UTIL_TEST_BUCKET=some-s3-bucket-name
18+
* S3_LOG_STORE_UTIL_TEST_RUN_UID=some-uuid-for-test-run
19+
* and running
20+
* python run-integration-tests.py --s3-log-store-util-only
21+
*
22+
* Alternatively you can set the environment variables
23+
* S3_LOG_STORE_UTIL_TEST_ENABLED=true
24+
* S3_LOG_STORE_UTIL_TEST_BUCKET=some-s3-bucket-name
25+
* S3_LOG_STORE_UTIL_TEST_RUN_UID=some-uuid-for-test-run
26+
* and run the tests in this suite using your preferred
27+
* test execution mechanism (e.g., the IDE or sbt)
28+
*
29+
* S3_LOG_STORE_UTIL_TEST_BUCKET is the name of the S3 bucket used for the test.
30+
* S3_LOG_STORE_UTIL_TEST_RUN_UID is a prefix for all keys used in the test.
31+
* This is useful for isolating multiple test runs.
32+
*/
33+
class S3LogStoreUtilIntegrationTest extends AnyFunSuite {
34+
private val runIntegrationTests: Boolean =
35+
Option(System.getenv("S3_LOG_STORE_UTIL_TEST_ENABLED")).exists(_.toBoolean)
36+
private val bucket = System.getenv("S3_LOG_STORE_UTIL_TEST_BUCKET")
37+
private val testRunUID =
38+
System.getenv("S3_LOG_STORE_UTIL_TEST_RUN_UID") // Prefix for all S3 keys in the current run
39+
private lazy val fs: S3AFileSystem = {
40+
val fs = new S3AFileSystem()
41+
fs.initialize(new URI(s"s3a://$bucket"), configuration)
42+
fs
43+
}
44+
private val maxKeys = 2
45+
private val configuration = new Configuration()
46+
configuration.set( // for local testing only
47+
"fs.s3a.aws.credentials.provider",
48+
"com.amazonaws.auth.profile.ProfileCredentialsProvider",
49+
)
50+
configuration.set("fs.s3a.paging.maximum", maxKeys.toString)
51+
52+
private def touch(key: String) {
53+
fs.create(new Path(s"s3a://$bucket/$key")).close()
54+
}
55+
56+
private def key(table: String, version: Int): String =
57+
s"$testRunUID/$table/_delta_log/%020d.json".format(version)
58+
59+
private def path(table: String, version: Int): Path =
60+
new Path(s"s3a://$bucket/${key(table, version)}")
61+
62+
private def version(path: Path): Long = FileNameUtils.deltaVersion(path)
63+
64+
private val integrationTestTag = Tag("IntegrationTest")
65+
66+
def integrationTest(name: String)(testFun: => Any): Unit =
67+
if (runIntegrationTests) test(name, integrationTestTag)(testFun)
68+
69+
def testCase(testName: String, numKeys: Int): Unit = integrationTest(testName) {
70+
// Setup delta log
71+
(1 to numKeys).foreach(v => touch(s"$testRunUID/$testName/_delta_log/%020d.json".format(v)))
72+
73+
// Check number of S3 requests and correct listing
74+
(1 to numKeys + 2).foreach(v => {
75+
val startCount = fs.getIOStatistics.counters().get("object_list_request") +
76+
fs.getIOStatistics.counters().get("object_continue_list_request")
77+
val resolvedPath = path(testName, v)
78+
val response = S3LogStoreUtil.s3ListFromArray(fs, resolvedPath, resolvedPath.getParent)
79+
val endCount = fs.getIOStatistics.counters().get("object_list_request") +
80+
fs.getIOStatistics.counters().get("object_continue_list_request")
81+
// Check that we don't do more S3 list requests than necessary
82+
val numberOfKeysToList = numKeys - (v - 1)
83+
val optimalNumberOfListRequests =
84+
max(ceil(numberOfKeysToList / maxKeys.toDouble).toInt, 1)
85+
val actualNumberOfListRequests = endCount - startCount
86+
assert(optimalNumberOfListRequests == actualNumberOfListRequests)
87+
// Check that we get consecutive versions from v to the max version. The smallest version is 1
88+
assert((max(1, v) to numKeys) == response.map(r => version(r.getPath)).toSeq)
89+
})
90+
}
91+
92+
integrationTest("setup empty delta log") {
93+
touch(s"$testRunUID/empty/some.json")
94+
}
95+
96+
testCase("empty", 0)
97+
98+
testCase("small", 1)
99+
100+
testCase("medium", maxKeys)
101+
102+
testCase("large", 10 * maxKeys)
103+
104+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package io.delta.storage.internal
2+
3+
import org.scalatest.funsuite.AnyFunSuite
4+
5+
class S3LogStoreUtilTest extends AnyFunSuite {
6+
test("keyBefore") {
7+
assert("a" == S3LogStoreUtil.keyBefore("b"))
8+
assert("aa/aa" == S3LogStoreUtil.keyBefore("aa/ab"))
9+
assert(Seq(1.toByte, 1.toByte)
10+
== S3LogStoreUtil.keyBefore(new String(Seq(1.toByte, 2.toByte).toArray)).getBytes.toList)
11+
}
12+
13+
test("keyBefore with emojis") {
14+
assert("♥a" == S3LogStoreUtil.keyBefore("♥b"))
15+
}
16+
17+
test("keyBefore with zero bytes") {
18+
assert("abc" == S3LogStoreUtil.keyBefore("abc\u0000"))
19+
}
20+
21+
test("keyBefore with empty key") {
22+
assert(null == S3LogStoreUtil.keyBefore(""))
23+
}
24+
}

0 commit comments

Comments
 (0)