-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for S3 EncryptionMaterialsProvider to PrestoS3FileSystem #3802
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,7 +29,10 @@ | |
import com.amazonaws.regions.Regions; | ||
import com.amazonaws.services.s3.AmazonS3; | ||
import com.amazonaws.services.s3.AmazonS3Client; | ||
import com.amazonaws.services.s3.AmazonS3EncryptionClient; | ||
import com.amazonaws.services.s3.model.AmazonS3Exception; | ||
import com.amazonaws.services.s3.model.CryptoConfiguration; | ||
import com.amazonaws.services.s3.model.EncryptionMaterialsProvider; | ||
import com.amazonaws.services.s3.model.GetObjectRequest; | ||
import com.amazonaws.services.s3.model.ListObjectsRequest; | ||
import com.amazonaws.services.s3.model.ObjectListing; | ||
|
@@ -49,6 +52,7 @@ | |
import io.airlift.log.Logger; | ||
import io.airlift.units.DataSize; | ||
import io.airlift.units.Duration; | ||
import org.apache.hadoop.conf.Configurable; | ||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.hadoop.fs.BlockLocation; | ||
import org.apache.hadoop.fs.BufferedFSInputStream; | ||
|
@@ -121,6 +125,7 @@ public static PrestoS3FileSystemStats getFileSystemStats() | |
public static final String S3_MULTIPART_MIN_FILE_SIZE = "presto.s3.multipart.min-file-size"; | ||
public static final String S3_MULTIPART_MIN_PART_SIZE = "presto.s3.multipart.min-part-size"; | ||
public static final String S3_USE_INSTANCE_CREDENTIALS = "presto.s3.use-instance-credentials"; | ||
public static final String S3_ENCRYPTION_MATERIALS_PROVIDER = "presto.s3.encryption-materials-provider"; | ||
|
||
private static final DataSize BLOCK_SIZE = new DataSize(32, MEGABYTE); | ||
private static final DataSize MAX_SKIP_SIZE = new DataSize(1, MEGABYTE); | ||
|
@@ -264,14 +269,20 @@ public FileStatus getFileStatus(Path path) | |
} | ||
|
||
return new FileStatus( | ||
metadata.getContentLength(), | ||
getObjectSize(metadata), | ||
false, | ||
1, | ||
BLOCK_SIZE.toBytes(), | ||
lastModifiedTime(metadata), | ||
qualifiedPath(path)); | ||
} | ||
|
||
private static long getObjectSize(ObjectMetadata metadata) | ||
{ | ||
String unencryptedContentLength = metadata.getUserMetadata().get("x-amz-unencrypted-content-length"); | ||
return unencryptedContentLength != null ? Long.parseLong(unencryptedContentLength) : metadata.getContentLength(); | ||
} | ||
|
||
@Override | ||
public FSDataInputStream open(Path path, int bufferSize) | ||
throws IOException | ||
|
@@ -444,6 +455,9 @@ private Iterator<LocatedFileStatus> statusFromPrefixes(List<String> prefixes) | |
|
||
private Iterator<LocatedFileStatus> statusFromObjects(List<S3ObjectSummary> objects) | ||
{ | ||
// NOTE: for encrypted objects, S3ObjectSummary.size() used below is NOT correct, | ||
// however, to get the correct size we'd need to make an additional request to get | ||
// user metadata, and in this case it doesn't matter. | ||
return objects.stream() | ||
.filter(object -> !object.getKey().endsWith("/")) | ||
.map(object -> new FileStatus( | ||
|
@@ -552,7 +566,14 @@ private static String keyFromPath(Path path) | |
private AmazonS3Client createAmazonS3Client(URI uri, Configuration hadoopConfig, ClientConfiguration clientConfig) | ||
{ | ||
AWSCredentialsProvider credentials = getAwsCredentialsProvider(uri, hadoopConfig); | ||
AmazonS3Client client = new AmazonS3Client(credentials, clientConfig, METRIC_COLLECTOR); | ||
EncryptionMaterialsProvider emp = createEncryptionMaterialsProvider(hadoopConfig); | ||
AmazonS3Client client; | ||
if (emp != null) { | ||
client = new AmazonS3EncryptionClient(credentials, emp, clientConfig, new CryptoConfiguration(), METRIC_COLLECTOR); | ||
} | ||
else { | ||
client = new AmazonS3Client(credentials, clientConfig, METRIC_COLLECTOR); | ||
} | ||
|
||
// use local region when running inside of EC2 | ||
Region region = Regions.getCurrentRegion(); | ||
|
@@ -562,6 +583,26 @@ private AmazonS3Client createAmazonS3Client(URI uri, Configuration hadoopConfig, | |
return client; | ||
} | ||
|
||
private EncryptionMaterialsProvider createEncryptionMaterialsProvider(Configuration hadoopConfig) | ||
{ | ||
EncryptionMaterialsProvider emp = null; | ||
String empClassName = hadoopConfig.get(S3_ENCRYPTION_MATERIALS_PROVIDER); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you have a custom implementation of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes — we implement an encryption materials provider that connects to our internal KMS, it reads initialization config from the hadoop config passed in. This works if you place your EMP jars in the $PRESTO_HOME/plugin/hive-hadoop2 directory. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Configuring this via the Hadoop config system doesn't fit with the way Presto and the Hive connector do configuration. We have first class config support in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK – I didn't know about HiveClientConfig and HiveClientConfigurationUpdater, I'll make changes there to add the encryption materials provider classname. I would still pass the hadoop config into the EMP implementation though (if it implements Configurable), as the specific configs for a given EMP will be different for each implementation. Does that work? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have a better understanding now. Since this requires a custom JAR and additional Hadoop configuration, we can add this as-is without the config going into There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice! Thanks. -nate From: David Phillips <[email protected]mailto:[email protected]> In presto-hive/src/main/java/com/facebook/presto/hive/PrestoS3FileSystem.javahttps://github.com//pull/3802#discussion_r43166192:
I have a better understanding now. Since this requires a custom JAR and additional Hadoop configuration, we can add this as-is without the config going into HiveClientConfig. Thanks for the explanation! — CONFIDENTIALITY NOTICE: This e-mail and any attachments are for the exclusive and confidential use of the intended recipient and may constitute non-public information. If you received this e-mail in error, disclosing, copying, distributing or taking any action in reliance of this e-mail is strictly prohibited and may be unlawful. Instead, please notify us immediately by return e-mail and promptly delete this message and its attachments from your computer system. We do not waive any work product or other applicable legal privilege(s) by the transmission of this message. |
||
if (empClassName != null) { | ||
try { | ||
Class empClass = Class.forName(empClassName); | ||
emp = (EncryptionMaterialsProvider) empClass.newInstance(); | ||
if (emp instanceof Configurable) { | ||
((Configurable) emp).setConf(hadoopConfig); | ||
} | ||
} | ||
catch (Exception x) { | ||
throw new RuntimeException("Unable to load or create S3 encryption materials provider " + empClassName + ": " + x, x); | ||
} | ||
} | ||
|
||
return emp; | ||
} | ||
|
||
private AWSCredentialsProvider getAwsCredentialsProvider(URI uri, Configuration conf) | ||
{ | ||
// first try credentials from URI or static properties | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this "doesn't matter" because the encrypted size is always larger? The Hive connector computes splits based on this size, so if the encrypted size was smaller, it would be possible to not generate a split for part of the file and thus miss some data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct — doesn't matter because the encrypted size is always larger, and in this case since it's just used for the split calc. It matters most when trying to seek within a given file since seeking in S3 must be done relative to the unencrypted size (for instance, to read the last X bytes of a file to pull out metadata the reported size must be the unencrypted size).