Skip to content

Commit a31d0bc

Browse files
committed
Fix config for S3 multi-cluster mode
- problem 1: there was a mismatch between the config params in the documentation (`io.delta.storage.S3DynamoDBLogStore.ddb.tableName`) and the config params actually used in the code + in the integration test (`io.delta.storage.ddb.tableName`). - solution 1: include the `S3DynamoDBLogStore` string in the config in the code. - problem 2: the `io.delta.storage` prefix didn't work when specified using `--conf`. they DID work using `spark.conf.set()` or `SparkSession.builder.config()` but not with `--conf`. - solution 2: we now allow 2 prefixes. - `spark.io.delta.storage.S3DynamoDBLogStore.$key.....` this will work all contexts (`--conf`, `spark.conf.set()`, etc). - `io.delta.storage.S3DynamoDBLogStore.$key`. this is the original prefix. this will be able to be used by delta-standalone and flink and hive. they just use hadoopConfs and don't need to have prefix starting with `spark` - resolves #1094 - update config for S3 multi-cluster mode (i.e. S3DynamoDBLogStore) to match the public documentation. the configs were missing the string literal `S3DynamoDBLogStore` from the conf prefix - now supports 2 confs. `spark.io.delta.storage.S3DynamoDBLogStore.$key` and `io.delta.storage.S3DynamoDBLogStore.$key`. - added unit test for the 2 new confs - ran integration test using existing + new s3 table, and using existing + new ddb table - ran manual tests using locally published jars on pyspark + spark-shell + spark-submit (spark-submit via integration test) - tested using `--conf` as well as `spark.conf.set` as well as `SparkSession.builder.config()` Not really, just fixes a just-released, experimental LogStore config key Closes #1095 Signed-off-by: Scott Sandre <[email protected]> GitOrigin-RevId: e5db1e6b0dfe958e3234644462891f269313ca33
1 parent e9d6435 commit a31d0bc

File tree

4 files changed

+122
-62
lines changed

4 files changed

+122
-62
lines changed

core/src/test/scala/org/apache/spark/sql/delta/LogStoreSuite.scala

-39
Original file line numberDiff line numberDiff line change
@@ -497,45 +497,6 @@ class FailingRenameAbstractFileSystem(uri: URI, conf: org.apache.hadoop.conf.Con
497497
}
498498
}
499499

500-
///////////////////////////////////////////////////////////////////////////////
501-
// Fake LogStore class & test suite to check that hadoopConf is set properly //
502-
///////////////////////////////////////////////////////////////////////////////
503-
504-
class FakePublicLogStore(initHadoopConf: Configuration)
505-
extends io.delta.storage.HDFSLogStore(initHadoopConf) {
506-
507-
assert(initHadoopConf.get("spark.delta.storage.custom.key") == "foo")
508-
assert(initHadoopConf.get("this.is.a.non.spark.prefix.key") == "bar")
509-
}
510-
511-
/**
512-
* We want to ensure that, to set configuration values for the Java LogStore implementations,
513-
* users can simply use `--conf $key=$value`, instead of `--conf spark.hadoop.$key=$value`.
514-
*
515-
* We also want to test that users can use a non-Spark prefix, so that our public, Java LogStore
516-
* implementations are not coupled to Spark-related conf keys.
517-
*/
518-
class CorrectHadoopConfLogStoreSuite
519-
extends SparkFunSuite
520-
with LocalSparkSession
521-
with LogStoreProvider {
522-
523-
test("java LogStore is instantiated with hadoopConf with SQLConf values") {
524-
val sparkConf = new SparkConf()
525-
.setMaster("local")
526-
// equivalent to --conf spark.delta.storage.custom.key=foo
527-
.set("spark.delta.storage.custom.key", "foo")
528-
.set("spark.delta.logStore.class", classOf[FakePublicLogStore].getName)
529-
.set("this.is.a.non.spark.prefix.key", "bar")
530-
531-
withSparkSession(SparkSession.builder.config(sparkConf).getOrCreate()) { spark =>
532-
// this will instantiate the FakePublicLogStore above. If its assertion fails,
533-
// then this test will fail
534-
createLogStore(spark)
535-
}
536-
}
537-
}
538-
539500
////////////////////////////////////////////////////////////////////
540501
// Public LogStore (Java) suite tests from delta-storage artifact //
541502
////////////////////////////////////////////////////////////////////

storage-s3-dynamodb/integration_tests/dynamodb_logstore.py

+13-6
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
export DELTA_NUM_ROWS=16
4646
4747
./run-integration-tests.py --run-storage-s3-dynamodb-integration-tests \
48-
--dbb-packages org.apache.hadoop:hadoop-aws:3.3.1,com.amazonaws:aws-java-sdk-bundle:1.12.142 \
48+
--dbb-packages org.apache.hadoop:hadoop-aws:3.3.1 \
4949
--dbb-conf spark.jars.ivySettings=/workspace/ivy.settings \
5050
spark.driver.extraJavaOptions=-Dlog4j.configuration=file:debug/log4j.properties
5151
"""
@@ -60,6 +60,7 @@
6060
delta_storage = os.environ.get("DELTA_STORAGE", "io.delta.storage.S3DynamoDBLogStore")
6161
dynamo_table_name = os.environ.get("DELTA_DYNAMO_TABLE", "delta_log_test")
6262
dynamo_region = os.environ.get("DELTA_DYNAMO_REGION", "us-west-2")
63+
# used only by FailingS3DynamoDBLogStore
6364
dynamo_error_rates = os.environ.get("DELTA_DYNAMO_ERROR_RATES", "")
6465

6566
if delta_table_path is None:
@@ -85,20 +86,26 @@
8586
.appName("utilities") \
8687
.master("local[*]") \
8788
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
88-
.config("spark.delta.logStore.class", delta_storage) \
89-
.config("io.delta.storage.ddb.tableName", dynamo_table_name) \
90-
.config("io.delta.storage.ddb.region", dynamo_region) \
91-
.config("io.delta.storage.errorRates", dynamo_error_rates) \
89+
.config("spark.delta.logStore.s3.impl", delta_storage) \
90+
.config("spark.delta.logStore.s3a.impl", delta_storage) \
91+
.config("spark.delta.logStore.s3n.impl", delta_storage) \
92+
.config("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", dynamo_table_name) \
93+
.config("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", dynamo_region) \
94+
.config("spark.io.delta.storage.S3DynamoDBLogStore.errorRates", dynamo_error_rates) \
95+
.config("spark.io.delta.storage.S3DynamoDBLogStore.provisionedThroughput.rcu", 12) \
96+
.config("spark.io.delta.storage.S3DynamoDBLogStore.provisionedThroughput.wcu", 13) \
9297
.getOrCreate()
9398

94-
# spark.sparkContext.setLogLevel("INFO")
99+
spark.sparkContext.setLogLevel("INFO")
95100

96101
data = spark.createDataFrame([], "id: int, a: int")
102+
print("writing:", data.collect())
97103
data.write.format("delta").mode("overwrite").partitionBy("id").save(delta_table_path)
98104

99105

100106
def write_tx(n):
101107
data = spark.createDataFrame([[n, n]], "id: int, a: int")
108+
print("writing:", data.collect())
102109
data.write.format("delta").mode("append").partitionBy("id").save(delta_table_path)
103110

104111

storage-s3-dynamodb/src/main/java/io/delta/storage/S3DynamoDBLogStore.java

+49-17
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,19 @@ public class S3DynamoDBLogStore extends BaseExternalLogStore {
7171
private static final Logger LOG = LoggerFactory.getLogger(S3DynamoDBLogStore.class);
7272

7373
/**
74-
* Configuration keys for the DynamoDB client
74+
* Configuration keys for the DynamoDB client.
75+
*
76+
* Keys are either of the form $SPARK_CONF_PREFIX.$CONF or $BASE_CONF_PREFIX.$CONF,
77+
* e.g. spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName
78+
* or io.delta.storage.S3DynamoDBLogStore.ddb.tableName
7579
*/
76-
private static final String CONF_PREFIX = "io.delta.storage.";
77-
private static final String DBB_CLIENT_TABLE = "ddb.tableName";
78-
private static final String DBB_CLIENT_REGION = "ddb.region";
79-
private static final String DBB_CLIENT_CREDENTIALS_PROVIDER = "credentials.provider";
80+
public static final String SPARK_CONF_PREFIX = "spark.io.delta.storage.S3DynamoDBLogStore";
81+
public static final String BASE_CONF_PREFIX = "io.delta.storage.S3DynamoDBLogStore";
82+
public static final String DDB_CLIENT_TABLE = "ddb.tableName";
83+
public static final String DDB_CLIENT_REGION = "ddb.region";
84+
public static final String DDB_CLIENT_CREDENTIALS_PROVIDER = "credentials.provider";
85+
public static final String DDB_CREATE_TABLE_RCU = "provisionedThroughput.rcu";
86+
public static final String DDB_CREATE_TABLE_WCU = "provisionedThroughput.wcu";
8087

8188
/**
8289
* DynamoDB table attribute keys
@@ -98,13 +105,13 @@ public class S3DynamoDBLogStore extends BaseExternalLogStore {
98105
public S3DynamoDBLogStore(Configuration hadoopConf) throws IOException {
99106
super(hadoopConf);
100107

101-
tableName = getParam(hadoopConf, DBB_CLIENT_TABLE, "delta_log");
108+
tableName = getParam(hadoopConf, DDB_CLIENT_TABLE, "delta_log");
102109
credentialsProviderName = getParam(
103110
hadoopConf,
104-
DBB_CLIENT_CREDENTIALS_PROVIDER,
111+
DDB_CLIENT_CREDENTIALS_PROVIDER,
105112
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
106113
);
107-
regionName = getParam(hadoopConf, DBB_CLIENT_REGION, "us-east-1");
114+
regionName = getParam(hadoopConf, DDB_CLIENT_REGION, "us-east-1");
108115
LOG.info("using tableName {}", tableName);
109116
LOG.info("using credentialsProviderName {}", credentialsProviderName);
110117
LOG.info("using regionName {}", regionName);
@@ -221,12 +228,12 @@ private void tryEnsureTableExists(Configuration hadoopConf) throws IOException {
221228
TableDescription descr = result.getTable();
222229
status = descr.getTableStatus();
223230
} catch (ResourceNotFoundException e) {
224-
final long rcu = Long.parseLong(getParam(hadoopConf, "provisionedThroughput.rcu", "5"));
225-
final long wcu = Long.parseLong(getParam(hadoopConf, "provisionedThroughput.wcu", "5"));
231+
final long rcu = Long.parseLong(getParam(hadoopConf, DDB_CREATE_TABLE_RCU, "5"));
232+
final long wcu = Long.parseLong(getParam(hadoopConf, DDB_CREATE_TABLE_WCU, "5"));
226233

227234
LOG.info(
228-
"DynamoDB table `{}` in region `{}` does not exist."
229-
+ "Creating it now with provisioned throughput of {} RCUs and {} WCUs.",
235+
"DynamoDB table `{}` in region `{}` does not exist. " +
236+
"Creating it now with provisioned throughput of {} RCUs and {} WCUs.",
230237
tableName, regionName, rcu, wcu);
231238
try {
232239
client.createTable(
@@ -289,10 +296,35 @@ private AmazonDynamoDBClient getClient() throws java.io.IOException {
289296
}
290297
}
291298

292-
protected String getParam(Configuration config, String name, String defaultValue) {
293-
return config.get(
294-
String.format("%s%s", CONF_PREFIX, name),
295-
defaultValue
296-
);
299+
/**
300+
* Get the hadoopConf param $name that is prefixed either with $SPARK_CONF_PREFIX or
301+
* $BASE_CONF_PREFIX.
302+
*
303+
* If two parameters exist, one for each prefix, then an IllegalArgumentException is thrown.
304+
*
305+
* If no parameters exist, then the $defaultValue is returned.
306+
*/
307+
protected static String getParam(Configuration hadoopConf, String name, String defaultValue) {
308+
final String sparkPrefixKey = String.format("%s.%s", SPARK_CONF_PREFIX, name);
309+
final String basePrefixKey = String.format("%s.%s", BASE_CONF_PREFIX, name);
310+
311+
final String sparkPrefixVal = hadoopConf.get(sparkPrefixKey);
312+
final String basePrefixVal = hadoopConf.get(basePrefixKey);
313+
314+
if (sparkPrefixVal != null &&
315+
basePrefixVal != null &&
316+
!sparkPrefixVal.equals(basePrefixVal)) {
317+
throw new IllegalArgumentException(
318+
String.format(
319+
"Configuration properties `%s=%s` and `%s=%s` have different values. " +
320+
"Please set only one.",
321+
sparkPrefixKey, sparkPrefixVal, basePrefixKey, basePrefixVal
322+
)
323+
);
324+
}
325+
326+
if (sparkPrefixVal != null) return sparkPrefixVal;
327+
if (basePrefixVal != null) return basePrefixVal;
328+
return defaultValue;
297329
}
298330
}

storage-s3-dynamodb/src/test/scala/io/delta/storage/ExternalLogStoreSuite.scala

+60
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ package io.delta.storage
1818

1919
import java.net.URI
2020

21+
import org.apache.hadoop.conf.Configuration
2122
import org.apache.hadoop.fs._
23+
import org.scalatest.funsuite.AnyFunSuite
2224

2325
import org.apache.spark.sql.delta.FakeFileSystem
2426
import org.apache.spark.sql.delta.util.FileNames
@@ -138,6 +140,64 @@ class ExternalLogStoreSuite extends org.apache.spark.sql.delta.PublicLogStoreSui
138140
protected def shouldUseRenameToWriteCheckpoint: Boolean = false
139141
}
140142

143+
///////////////////////////////////
144+
// S3DynamoDBLogStore Test Suite //
145+
///////////////////////////////////
146+
147+
class S3DynamoDBLogStoreSuite extends AnyFunSuite {
148+
test("getParam") {
149+
import S3DynamoDBLogStore._
150+
151+
val sparkPrefixKey = "spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName"
152+
val basePrefixKey = "io.delta.storage.S3DynamoDBLogStore.ddb.tableName"
153+
154+
// Sanity check
155+
require(sparkPrefixKey == SPARK_CONF_PREFIX + "." + DDB_CLIENT_TABLE)
156+
require(basePrefixKey == BASE_CONF_PREFIX + "." + DDB_CLIENT_TABLE)
157+
158+
// Case 1: no parameters exist, should use default
159+
assert(getParam(new Configuration(), DDB_CLIENT_TABLE, "default_table") == "default_table")
160+
161+
// Case 2: spark-prefix param only
162+
{
163+
val hadoopConf = new Configuration()
164+
hadoopConf.set(sparkPrefixKey, "some_other_table_2")
165+
assert(getParam(hadoopConf, DDB_CLIENT_TABLE, "default_table") == "some_other_table_2")
166+
}
167+
168+
// Case 3: base-prefix param only
169+
{
170+
val hadoopConf = new Configuration()
171+
hadoopConf.set(basePrefixKey, "some_other_table_3")
172+
assert(getParam(hadoopConf, DDB_CLIENT_TABLE, "default_table") == "some_other_table_3")
173+
}
174+
175+
// Case 4: both params set, same value
176+
{
177+
val hadoopConf = new Configuration()
178+
hadoopConf.set(sparkPrefixKey, "some_other_table_4")
179+
hadoopConf.set(basePrefixKey, "some_other_table_4")
180+
assert(getParam(hadoopConf, DDB_CLIENT_TABLE, "default_table") == "some_other_table_4")
181+
}
182+
183+
// Case 5: both param set, different value
184+
{
185+
val hadoopConf = new Configuration()
186+
hadoopConf.set(sparkPrefixKey, "some_other_table_5a")
187+
hadoopConf.set(basePrefixKey, "some_other_table_5b")
188+
val e = intercept[IllegalArgumentException] {
189+
getParam(hadoopConf, DDB_CLIENT_TABLE, "default_table")
190+
}.getMessage
191+
assert(e == (s"Configuration properties `$sparkPrefixKey=some_other_table_5a` and " +
192+
s"`$basePrefixKey=some_other_table_5b` have different values. Please set only one."))
193+
}
194+
}
195+
}
196+
197+
////////////////////////////////
198+
// File System Helper Classes //
199+
////////////////////////////////
200+
141201
/**
142202
* This utility enables failure simulation on file system.
143203
* Providing a matching suffix results in an exception being

0 commit comments

Comments
 (0)