Skip to content

Commit

Permalink
feat(transfer-manager): add ParallelUploadConfig.Builder#setUploadBlo…
Browse files Browse the repository at this point in the history
…bInfoFactory (#2936)

When uploading a file from the files system, there are scenarios when a job would need to customize the actual BlobInfo used to upload to GCS.

Add the new UploadBlobInfoFactory which allows a user to produce their own BlobInfo instance given the bucketName and fileName. When producing the BlobInfo the application can also customize other metadata fields.

A few convenience adapter methods are available in UploadBlobInfoFactory to simplify common operations.

Fixes #2638
  • Loading branch information
BenWhitehead authored Feb 18, 2025
1 parent e8ba858 commit 86e9ae8
Show file tree
Hide file tree
Showing 6 changed files with 357 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage.transfermanager;

public final class BucketNameMismatchException extends RuntimeException {

public BucketNameMismatchException(String actual, String expected) {
super(
String.format(
"Bucket name in produced BlobInfo did not match bucket name from config. (%s != %s)",
actual, expected));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import org.checkerframework.checker.nullness.qual.NonNull;

/**
Expand All @@ -33,19 +35,19 @@
public final class ParallelUploadConfig {

private final boolean skipIfExists;
@NonNull private final String prefix;
@NonNull private final String bucketName;
@NonNull private final UploadBlobInfoFactory uploadBlobInfoFactory;

@NonNull private final List<BlobWriteOption> writeOptsPerRequest;

private ParallelUploadConfig(
boolean skipIfExists,
@NonNull String prefix,
@NonNull String bucketName,
@NonNull UploadBlobInfoFactory uploadBlobInfoFactory,
@NonNull List<BlobWriteOption> writeOptsPerRequest) {
this.skipIfExists = skipIfExists;
this.prefix = prefix;
this.bucketName = bucketName;
this.uploadBlobInfoFactory = uploadBlobInfoFactory;
this.writeOptsPerRequest = applySkipIfExists(skipIfExists, writeOptsPerRequest);
}

Expand All @@ -63,9 +65,26 @@ public boolean isSkipIfExists() {
* A common prefix that will be applied to all object paths in the destination bucket
*
* @see Builder#setPrefix(String)
* @see Builder#setUploadBlobInfoFactory(UploadBlobInfoFactory)
* @see UploadBlobInfoFactory#prefixObjectNames(String)
*/
public @NonNull String getPrefix() {
return prefix;
if (uploadBlobInfoFactory instanceof PrefixObjectNames) {
PrefixObjectNames prefixObjectNames = (PrefixObjectNames) uploadBlobInfoFactory;
return prefixObjectNames.prefix;
}
return "";
}

/**
* The {@link UploadBlobInfoFactory} which will be used to produce a {@link BlobInfo}s based on a
* provided bucket name and file name.
*
* @see Builder#setUploadBlobInfoFactory(UploadBlobInfoFactory)
* @since 2.49.0
*/
public @NonNull UploadBlobInfoFactory getUploadBlobInfoFactory() {
return uploadBlobInfoFactory;
}

/**
Expand Down Expand Up @@ -96,22 +115,22 @@ public boolean equals(Object o) {
}
ParallelUploadConfig that = (ParallelUploadConfig) o;
return skipIfExists == that.skipIfExists
&& prefix.equals(that.prefix)
&& bucketName.equals(that.bucketName)
&& uploadBlobInfoFactory.equals(that.uploadBlobInfoFactory)
&& writeOptsPerRequest.equals(that.writeOptsPerRequest);
}

@Override
public int hashCode() {
return Objects.hash(skipIfExists, prefix, bucketName, writeOptsPerRequest);
return Objects.hash(skipIfExists, bucketName, uploadBlobInfoFactory, writeOptsPerRequest);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("skipIfExists", skipIfExists)
.add("prefix", prefix)
.add("bucketName", bucketName)
.add("uploadBlobInfoFactory", uploadBlobInfoFactory)
.add("writeOptsPerRequest", writeOptsPerRequest)
.toString();
}
Expand All @@ -137,13 +156,13 @@ private static List<BlobWriteOption> applySkipIfExists(
public static final class Builder {

private boolean skipIfExists;
private @NonNull String prefix;
private @NonNull String bucketName;
private @NonNull UploadBlobInfoFactory uploadBlobInfoFactory;
private @NonNull List<BlobWriteOption> writeOptsPerRequest;

private Builder() {
this.prefix = "";
this.bucketName = "";
this.uploadBlobInfoFactory = UploadBlobInfoFactory.defaultInstance();
this.writeOptsPerRequest = ImmutableList.of();
}

Expand All @@ -162,11 +181,37 @@ public Builder setSkipIfExists(boolean skipIfExists) {
/**
* Sets a common prefix that will be applied to all object paths in the destination bucket.
*
* <p><i>NOTE</i>: this method and {@link #setUploadBlobInfoFactory(UploadBlobInfoFactory)} are
* mutually exclusive, and last invocation "wins".
*
* @return the builder instance with the value for prefix modified.
* @see ParallelUploadConfig#getPrefix()
* @see ParallelUploadConfig.Builder#setUploadBlobInfoFactory(UploadBlobInfoFactory)
* @see UploadBlobInfoFactory#prefixObjectNames(String)
*/
public Builder setPrefix(@NonNull String prefix) {
this.prefix = prefix;
this.uploadBlobInfoFactory = UploadBlobInfoFactory.prefixObjectNames(prefix);
return this;
}

/**
* Sets a {@link UploadBlobInfoFactory} which can be used to produce a custom BlobInfo based on
* a provided bucket name and file name.
*
* <p>The bucket name in the returned BlobInfo MUST be equal to the value provided to {@link
* #setBucketName(String)}, if not that upload will fail with a {@link
* TransferStatus#FAILED_TO_START} and a {@link BucketNameMismatchException}.
*
* <p><i>NOTE</i>: this method and {@link #setPrefix(String)} are mutually exclusive, and last
* invocation "wins".
*
* @return the builder instance with the value for uploadBlobInfoFactory modified.
* @see ParallelUploadConfig#getPrefix()
* @see ParallelUploadConfig#getUploadBlobInfoFactory()
* @since 2.49.0
*/
public Builder setUploadBlobInfoFactory(@NonNull UploadBlobInfoFactory uploadBlobInfoFactory) {
this.uploadBlobInfoFactory = uploadBlobInfoFactory;
return this;
}

Expand Down Expand Up @@ -199,10 +244,99 @@ public Builder setWriteOptsPerRequest(@NonNull List<BlobWriteOption> writeOptsPe
* @return {@link ParallelUploadConfig}
*/
public ParallelUploadConfig build() {
checkNotNull(prefix);
checkNotNull(bucketName);
checkNotNull(uploadBlobInfoFactory);
checkNotNull(writeOptsPerRequest);
return new ParallelUploadConfig(skipIfExists, prefix, bucketName, writeOptsPerRequest);
return new ParallelUploadConfig(
skipIfExists, bucketName, uploadBlobInfoFactory, writeOptsPerRequest);
}
}

public interface UploadBlobInfoFactory {

/**
* Method to produce a {@link BlobInfo} to be used for the upload to Cloud Storage.
*
* <p>The bucket name in the returned BlobInfo MUST be equal to the value provided to the {@link
* ParallelUploadConfig.Builder#setBucketName(String)}, if not that upload will fail with a
* {@link TransferStatus#FAILED_TO_START} and a {@link BucketNameMismatchException}.
*
* @param bucketName The name of the bucket to be uploaded to. The value provided here will be
* the value from {@link ParallelUploadConfig#getBucketName()}.
* @param fileName The String representation of the absolute path of the file to be uploaded
* @return The instance of {@link BlobInfo} that should be used to upload the file to Cloud
* Storage.
*/
BlobInfo apply(String bucketName, String fileName);

/**
* Adapter factory to provide the same semantics as if using {@link Builder#setPrefix(String)}
*/
static UploadBlobInfoFactory prefixObjectNames(String prefix) {
return new PrefixObjectNames(prefix);
}

/** The default instance which applies not modification to the provided {@code fileName} */
static UploadBlobInfoFactory defaultInstance() {
return DefaultUploadBlobInfoFactory.INSTANCE;
}

/**
* Convenience method to "lift" a {@link Function} that transforms the file name to an {@link
* UploadBlobInfoFactory}
*/
static UploadBlobInfoFactory transformFileName(Function<String, String> fileNameTransformer) {
return (b, f) -> BlobInfo.newBuilder(b, fileNameTransformer.apply(f)).build();
}
}

private static final class DefaultUploadBlobInfoFactory implements UploadBlobInfoFactory {
private static final DefaultUploadBlobInfoFactory INSTANCE = new DefaultUploadBlobInfoFactory();

private DefaultUploadBlobInfoFactory() {}

@Override
public BlobInfo apply(String bucketName, String fileName) {
return BlobInfo.newBuilder(bucketName, fileName).build();
}
}

private static final class PrefixObjectNames implements UploadBlobInfoFactory {
private final String prefix;

private PrefixObjectNames(String prefix) {
this.prefix = prefix;
}

@Override
public BlobInfo apply(String bucketName, String fileName) {
String separator = "";
if (!fileName.startsWith("/")) {
separator = "/";
}
return BlobInfo.newBuilder(bucketName, prefix + separator + fileName).build();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof PrefixObjectNames)) {
return false;
}
PrefixObjectNames that = (PrefixObjectNames) o;
return Objects.equals(prefix, that.prefix);
}

@Override
public int hashCode() {
return Objects.hashCode(prefix);
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("prefix", prefix).toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,17 @@ public void close() throws Exception {
List<ApiFuture<UploadResult>> uploadTasks = new ArrayList<>();
for (Path file : files) {
if (Files.isDirectory(file)) throw new IllegalStateException("Directories are not supported");
String blobName = TransferManagerUtils.createBlobName(config, file);
BlobInfo blobInfo = BlobInfo.newBuilder(config.getBucketName(), blobName).build();
String bucketName = config.getBucketName();
BlobInfo blobInfo =
config.getUploadBlobInfoFactory().apply(bucketName, file.toAbsolutePath().toString());
if (!blobInfo.getBucket().equals(bucketName)) {
uploadTasks.add(
ApiFutures.immediateFuture(
UploadResult.newBuilder(blobInfo, TransferStatus.FAILED_TO_START)
.setException(new BucketNameMismatchException(blobInfo.getBucket(), bucketName))
.build()));
continue;
}
if (transferManagerConfig.isAllowParallelCompositeUpload()
&& qos.parallelCompositeUpload(Files.size(file))) {
ParallelCompositeUploadCallable callable =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,6 @@ final class TransferManagerUtils {

private TransferManagerUtils() {}

static String createBlobName(ParallelUploadConfig config, Path file) {
if (config.getPrefix().isEmpty()) {
return file.toString();
} else {
return config.getPrefix().concat(file.toString());
}
}

static Path createDestPath(ParallelDownloadConfig config, BlobInfo originalBlob) {
Path newPath =
config
Expand Down
Loading

0 comments on commit 86e9ae8

Please sign in to comment.