Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(2727): handle both Files and Inputstreams for uploads #2728

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.StorageException;
import com.google.common.io.ByteStreams;

import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Path;
Expand All @@ -31,12 +34,12 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

final class ParallelCompositeUploadCallable implements Callable<UploadResult> {
final class ParallelCompositeUploadCallable<T> implements Callable<UploadResult> {
private final Storage storage;

private final BlobInfo originalBlob;

private final Path sourceFile;
private final T source;

private final ParallelUploadConfig parallelUploadConfig;

Expand All @@ -45,12 +48,12 @@ final class ParallelCompositeUploadCallable implements Callable<UploadResult> {
public ParallelCompositeUploadCallable(
Storage storage,
BlobInfo originalBlob,
Path sourceFile,
T source,
ParallelUploadConfig parallelUploadConfig,
BlobWriteOption[] opts) {
this.storage = storage;
this.originalBlob = originalBlob;
this.sourceFile = sourceFile;
this.source = source;
this.parallelUploadConfig = parallelUploadConfig;
this.opts = opts;
}
Expand All @@ -61,9 +64,17 @@ public UploadResult call() {

private UploadResult uploadPCU() {
BlobWriteSession session = storage.blobWriteSession(originalBlob, opts);
try (WritableByteChannel writableByteChannel = session.open();
FileChannel fc = FileChannel.open(sourceFile, StandardOpenOption.READ)) {
ByteStreams.copy(fc, writableByteChannel);
try (WritableByteChannel writableByteChannel = session.open()) {
if (source instanceof Path) {
try (FileChannel fc = FileChannel.open((Path) source, StandardOpenOption.READ)) {
ByteStreams.copy(fc, writableByteChannel);
}
} else if (source instanceof InputStream) {
InputStream inputStream = (InputStream) source;
ByteStreams.copy(inputStream, Channels.newOutputStream(writableByteChannel));
} else {
throw new IllegalArgumentException(String.format("Unsupported source type %s", source.getClass().getName()));
}
} catch (StorageException e) {
if (parallelUploadConfig.isSkipIfExists() && e.getCode() == 412) {
return UploadResult.newBuilder(originalBlob, TransferStatus.SKIPPED)
Expand All @@ -85,7 +96,7 @@ private UploadResult uploadPCU() {
return UploadResult.newBuilder(originalBlob, TransferStatus.SUCCESS)
.setUploadedBlob(newBlob)
.build();
} catch (InterruptedException | ExecutionException | TimeoutException e) {
} catch (InterruptedException | ExecutionException | TimeoutException | IllegalArgumentException e) {
return UploadResult.newBuilder(originalBlob, TransferStatus.FAILED_TO_FINISH)
.setException(e)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

import com.google.cloud.storage.BlobInfo;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;

import org.checkerframework.checker.nullness.qual.NonNull;

/**
Expand Down Expand Up @@ -88,4 +91,35 @@ public interface TransferManager extends AutoCloseable {
*/
@NonNull
DownloadJob downloadBlobs(List<BlobInfo> blobs, ParallelDownloadConfig config);

/**
* Uploads a list of input streams in parallel. This operation will not block the invoking thread,
* awaiting results should be done on the returned UploadJob.
*
* <p>Accepts a {@link ParallelUploadConfig} which defines the constraints of parallel uploads or
* predefined defaults.
*
* <p>Example of creating a parallel upload with Transfer Manager.
*
* <pre>{@code
* Storage storage = StorageOptions.getDefaultInstance().getService();
* String bucketName = "my-unique-bucket";
* String sourceFileName = "file.txt";
* Blob blob = storage.get(bucketName, sourceFileName)
* InputStream inputStream = Channels.newInputStream(blob.reader());
* List<Path> streams = Map.of(sourceFileName, inputStream);
*
* ParallelUploadConfig parallelUploadConfig =
* ParallelUploadConfig.newBuilder()
* .setBucketName(bucketName)
* .build();
*
* UploadJob uploadedFiles = transferManager.uploadFiles(streams, config);
*
* }</pre>
*
* @return an {@link UploadJob}
*/
@NonNull
UploadJob uploadFiles(Map<String, InputStream> streams, ParallelUploadConfig config) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.api.core.ListenableFutureToApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.cloud.Tuple;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.BlobWriteSessionConfigs;
Expand All @@ -36,15 +37,19 @@
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.BinaryOperator;
import java.util.function.Function;

import org.checkerframework.checker.nullness.qual.NonNull;

final class TransferManagerImpl implements TransferManager {
Expand Down Expand Up @@ -105,31 +110,18 @@ public void close() throws Exception {
@Override
public @NonNull UploadJob uploadFiles(List<Path> files, ParallelUploadConfig config)
throws IOException {
Storage.BlobWriteOption[] opts =
config.getWriteOptsPerRequest().toArray(new BlobWriteOption[0]);
List<ApiFuture<UploadResult>> uploadTasks = new ArrayList<>();
for (Path file : files) {
return uploadBySource(files, config, file -> {
if (Files.isDirectory(file)) throw new IllegalStateException("Directories are not supported");
String blobName = TransferManagerUtils.createBlobName(config, file);
BlobInfo blobInfo = BlobInfo.newBuilder(config.getBucketName(), blobName).build();
if (transferManagerConfig.isAllowParallelCompositeUpload()
&& qos.parallelCompositeUpload(Files.size(file))) {
ParallelCompositeUploadCallable callable =
new ParallelCompositeUploadCallable(storage, blobInfo, file, config, opts);
SettableApiFuture<UploadResult> resultFuture = SettableApiFuture.create();
pcuQueue.add(new PendingPcuTask(callable, resultFuture));
uploadTasks.add(resultFuture);
schedulePcuPoller();
} else {
UploadCallable callable =
new UploadCallable(transferManagerConfig, storage, blobInfo, file, config, opts);
uploadTasks.add(convert(executor.submit(callable)));
try {
return new UploadSource<>(
TransferManagerUtils.createBlobName(config, file),
file,
qos.parallelCompositeUpload(Files.size(file))
);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return UploadJob.newBuilder()
.setParallelUploadConfig(config)
.setUploadResults(ImmutableList.copyOf(uploadTasks))
.build();
});
}

@Override
Expand Down Expand Up @@ -185,6 +177,50 @@ public void close() throws Exception {
.build();
}

@Override
public @NonNull UploadJob uploadFiles(Map<String, InputStream> streams, ParallelUploadConfig config) throws IOException {
return uploadBySource(streams.entrySet(), config, entry -> new UploadSource<>(
entry.getKey(), entry.getValue(), true
));
}

private <K, V> @NonNull UploadJob uploadBySource(Iterable<K> source, ParallelUploadConfig config, Function<K, UploadSource<V>> sourceExtractor) throws IOException {
Storage.BlobWriteOption[] opts = config.getWriteOptsPerRequest().toArray(new BlobWriteOption[0]);
List<ApiFuture<UploadResult>> uploadTasks = new ArrayList<>();

for (K entry : source) {
UploadSource<V> uploadSource = sourceExtractor.apply(entry);
Tuple<ApiFuture<UploadResult>, Boolean> uploadResult = upload(uploadSource.getBlobName(), uploadSource.getSource(), uploadSource.isParallelCompositeUpload(), opts, config);
uploadTasks.add(uploadResult.x());
if (uploadResult.y()) {
schedulePcuPoller();
}
}

return UploadJob.newBuilder()
.setParallelUploadConfig(config)
.setUploadResults(ImmutableList.copyOf(uploadTasks))
.build();
}

public @NonNull <T> Tuple<ApiFuture<UploadResult>, Boolean> upload(String blobName, T source,
boolean isParallelCompositeUpload,
Storage.BlobWriteOption[] opts,
ParallelUploadConfig config) {
BlobInfo blobInfo = BlobInfo.newBuilder(config.getBucketName(), blobName).build();
if (transferManagerConfig.isAllowParallelCompositeUpload() && isParallelCompositeUpload) {
ParallelCompositeUploadCallable<T> callable =
new ParallelCompositeUploadCallable<>(storage, blobInfo, source, config, opts);
SettableApiFuture<UploadResult> resultFuture = SettableApiFuture.create();
pcuQueue.add(new PendingPcuTask(callable, resultFuture));
return Tuple.of(resultFuture, true);
} else {
UploadCallable<T> callable =
new UploadCallable<>(storage, blobInfo, source, config, opts);
return Tuple.of(convert(executor.submit(callable)), false);
}
}

private void schedulePcuPoller() {
if (pcuPoller == null) {
synchronized (pcuPollerSync) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,44 +21,50 @@
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.StorageException;

import java.io.InputStream;
import java.nio.file.Path;
import java.util.concurrent.Callable;

final class UploadCallable implements Callable<UploadResult> {
private final TransferManagerConfig transferManagerConfig;
final class UploadCallable<T> implements Callable<UploadResult> {
private final Storage storage;

private final BlobInfo originalBlob;

private final Path sourceFile;
private final T source;

private final ParallelUploadConfig parallelUploadConfig;

private final Storage.BlobWriteOption[] opts;

public UploadCallable(
TransferManagerConfig transferManagerConfig,
Storage storage,
BlobInfo originalBlob,
Path sourceFile,
T source,
ParallelUploadConfig parallelUploadConfig,
BlobWriteOption[] opts) {
this.transferManagerConfig = transferManagerConfig;
this.storage = storage;
this.originalBlob = originalBlob;
this.sourceFile = sourceFile;
this.source = source;
this.parallelUploadConfig = parallelUploadConfig;
this.opts = opts;
}

public UploadResult call() throws Exception {
public UploadResult call() {
// TODO: Check for chunking
return uploadWithoutChunking();
}

private UploadResult uploadWithoutChunking() {
try {
Blob from = storage.createFrom(originalBlob, sourceFile, opts);
Blob from;
if (source instanceof Path) {
from = storage.createFrom(originalBlob, (Path) source, opts);
} else if (source instanceof InputStream) {
from = storage.createFrom(originalBlob, (InputStream) source, opts);
} else {
throw new IllegalArgumentException("Unsupported source type: " + source.getClass().getName());
}
return UploadResult.newBuilder(originalBlob, TransferStatus.SUCCESS)
.setUploadedBlob(from.asBlobInfo())
.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.google.cloud.storage.transfermanager;

final class UploadSource<T> {
private final String blobName;
private final T source;
private final boolean isParallelCompositeUpload;

UploadSource(String blobName, T source, boolean isParallelCompositeUpload) {
this.blobName = blobName;
this.source = source;
this.isParallelCompositeUpload = isParallelCompositeUpload;
}

public String getBlobName() {
return blobName;
}

public T getSource() {
return source;
}

public boolean isParallelCompositeUpload() {
return isParallelCompositeUpload;
}
}