diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/ParallelCompositeUploadCallable.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/ParallelCompositeUploadCallable.java index d306c4a6cf..3f4d8f0080 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/ParallelCompositeUploadCallable.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/ParallelCompositeUploadCallable.java @@ -22,6 +22,9 @@ import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.cloud.storage.StorageException; import com.google.common.io.ByteStreams; + +import java.io.InputStream; +import java.nio.channels.Channels; import java.nio.channels.FileChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.Path; @@ -31,12 +34,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -final class ParallelCompositeUploadCallable implements Callable { +final class ParallelCompositeUploadCallable implements Callable { private final Storage storage; private final BlobInfo originalBlob; - private final Path sourceFile; + private final T source; private final ParallelUploadConfig parallelUploadConfig; @@ -45,12 +48,12 @@ final class ParallelCompositeUploadCallable implements Callable { public ParallelCompositeUploadCallable( Storage storage, BlobInfo originalBlob, - Path sourceFile, + T source, ParallelUploadConfig parallelUploadConfig, BlobWriteOption[] opts) { this.storage = storage; this.originalBlob = originalBlob; - this.sourceFile = sourceFile; + this.source = source; this.parallelUploadConfig = parallelUploadConfig; this.opts = opts; } @@ -61,9 +64,17 @@ public UploadResult call() { private UploadResult uploadPCU() { BlobWriteSession session = storage.blobWriteSession(originalBlob, opts); - try (WritableByteChannel writableByteChannel = session.open(); - FileChannel fc = FileChannel.open(sourceFile, StandardOpenOption.READ)) { - ByteStreams.copy(fc, writableByteChannel); + try (WritableByteChannel writableByteChannel = session.open()) { + if (source instanceof Path) { + try (FileChannel fc = FileChannel.open((Path) source, StandardOpenOption.READ)) { + ByteStreams.copy(fc, writableByteChannel); + } + } else if (source instanceof InputStream) { + InputStream inputStream = (InputStream) source; + ByteStreams.copy(inputStream, Channels.newOutputStream(writableByteChannel)); + } else { + throw new IllegalArgumentException(String.format("Unsupported source type %s", source.getClass().getName())); + } } catch (StorageException e) { if (parallelUploadConfig.isSkipIfExists() && e.getCode() == 412) { return UploadResult.newBuilder(originalBlob, TransferStatus.SKIPPED) @@ -85,7 +96,7 @@ private UploadResult uploadPCU() { return UploadResult.newBuilder(originalBlob, TransferStatus.SUCCESS) .setUploadedBlob(newBlob) .build(); - } catch (InterruptedException | ExecutionException | TimeoutException e) { + } catch (InterruptedException | ExecutionException | TimeoutException | IllegalArgumentException e) { return UploadResult.newBuilder(originalBlob, TransferStatus.FAILED_TO_FINISH) .setException(e) .build(); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManager.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManager.java index d840523a2b..a24c1333d9 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManager.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManager.java @@ -18,8 +18,11 @@ import com.google.cloud.storage.BlobInfo; import java.io.IOException; +import java.io.InputStream; import java.nio.file.Path; import java.util.List; +import java.util.Map; + import org.checkerframework.checker.nullness.qual.NonNull; /** @@ -88,4 +91,35 @@ public interface TransferManager extends AutoCloseable { */ @NonNull DownloadJob downloadBlobs(List blobs, ParallelDownloadConfig config); + + /** + * Uploads a list of input streams in parallel. This operation will not block the invoking thread, + * awaiting results should be done on the returned UploadJob. + * + *

Accepts a {@link ParallelUploadConfig} which defines the constraints of parallel uploads or + * predefined defaults. + * + *

Example of creating a parallel upload with Transfer Manager. + * + *

{@code
+   * Storage storage = StorageOptions.getDefaultInstance().getService();
+   * String bucketName = "my-unique-bucket";
+   * String sourceFileName = "file.txt";
+   * Blob blob = storage.get(bucketName, sourceFileName)
+   * InputStream inputStream = Channels.newInputStream(blob.reader());
+   * List streams = Map.of(sourceFileName, inputStream);
+   *
+   * ParallelUploadConfig parallelUploadConfig =
+   *           ParallelUploadConfig.newBuilder()
+   *               .setBucketName(bucketName)
+   *               .build();
+   *
+   * UploadJob uploadedFiles = transferManager.uploadFiles(streams, config);
+   *
+   * }
+ * + * @return an {@link UploadJob} + */ + @NonNull + UploadJob uploadFiles(Map streams, ParallelUploadConfig config) throws IOException; } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java index 57e56ea201..8ee15297a4 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java @@ -21,6 +21,7 @@ import com.google.api.core.ListenableFutureToApiFuture; import com.google.api.core.SettableApiFuture; import com.google.api.gax.rpc.FixedHeaderProvider; +import com.google.cloud.Tuple; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.BlobWriteSessionConfigs; @@ -36,15 +37,19 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import java.io.IOException; +import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Deque; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.BinaryOperator; +import java.util.function.Function; + import org.checkerframework.checker.nullness.qual.NonNull; final class TransferManagerImpl implements TransferManager { @@ -105,31 +110,18 @@ public void close() throws Exception { @Override public @NonNull UploadJob uploadFiles(List files, ParallelUploadConfig config) throws IOException { - Storage.BlobWriteOption[] opts = - config.getWriteOptsPerRequest().toArray(new BlobWriteOption[0]); - List> uploadTasks = new ArrayList<>(); - for (Path file : files) { + return uploadBySource(files, config, file -> { if (Files.isDirectory(file)) throw new IllegalStateException("Directories are not supported"); - String blobName = TransferManagerUtils.createBlobName(config, file); - BlobInfo blobInfo = BlobInfo.newBuilder(config.getBucketName(), blobName).build(); - if (transferManagerConfig.isAllowParallelCompositeUpload() - && qos.parallelCompositeUpload(Files.size(file))) { - ParallelCompositeUploadCallable callable = - new ParallelCompositeUploadCallable(storage, blobInfo, file, config, opts); - SettableApiFuture resultFuture = SettableApiFuture.create(); - pcuQueue.add(new PendingPcuTask(callable, resultFuture)); - uploadTasks.add(resultFuture); - schedulePcuPoller(); - } else { - UploadCallable callable = - new UploadCallable(transferManagerConfig, storage, blobInfo, file, config, opts); - uploadTasks.add(convert(executor.submit(callable))); + try { + return new UploadSource<>( + TransferManagerUtils.createBlobName(config, file), + file, + qos.parallelCompositeUpload(Files.size(file)) + ); + } catch (IOException e) { + throw new RuntimeException(e); } - } - return UploadJob.newBuilder() - .setParallelUploadConfig(config) - .setUploadResults(ImmutableList.copyOf(uploadTasks)) - .build(); + }); } @Override @@ -185,6 +177,50 @@ public void close() throws Exception { .build(); } + @Override + public @NonNull UploadJob uploadFiles(Map streams, ParallelUploadConfig config) throws IOException { + return uploadBySource(streams.entrySet(), config, entry -> new UploadSource<>( + entry.getKey(), entry.getValue(), true + )); + } + + private @NonNull UploadJob uploadBySource(Iterable source, ParallelUploadConfig config, Function> sourceExtractor) throws IOException { + Storage.BlobWriteOption[] opts = config.getWriteOptsPerRequest().toArray(new BlobWriteOption[0]); + List> uploadTasks = new ArrayList<>(); + + for (K entry : source) { + UploadSource uploadSource = sourceExtractor.apply(entry); + Tuple, Boolean> uploadResult = upload(uploadSource.getBlobName(), uploadSource.getSource(), uploadSource.isParallelCompositeUpload(), opts, config); + uploadTasks.add(uploadResult.x()); + if (uploadResult.y()) { + schedulePcuPoller(); + } + } + + return UploadJob.newBuilder() + .setParallelUploadConfig(config) + .setUploadResults(ImmutableList.copyOf(uploadTasks)) + .build(); + } + + public @NonNull Tuple, Boolean> upload(String blobName, T source, + boolean isParallelCompositeUpload, + Storage.BlobWriteOption[] opts, + ParallelUploadConfig config) { + BlobInfo blobInfo = BlobInfo.newBuilder(config.getBucketName(), blobName).build(); + if (transferManagerConfig.isAllowParallelCompositeUpload() && isParallelCompositeUpload) { + ParallelCompositeUploadCallable callable = + new ParallelCompositeUploadCallable<>(storage, blobInfo, source, config, opts); + SettableApiFuture resultFuture = SettableApiFuture.create(); + pcuQueue.add(new PendingPcuTask(callable, resultFuture)); + return Tuple.of(resultFuture, true); + } else { + UploadCallable callable = + new UploadCallable<>(storage, blobInfo, source, config, opts); + return Tuple.of(convert(executor.submit(callable)), false); + } + } + private void schedulePcuPoller() { if (pcuPoller == null) { synchronized (pcuPollerSync) { diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadCallable.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadCallable.java index c2b2ff2169..283075acbe 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadCallable.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadCallable.java @@ -21,44 +21,50 @@ import com.google.cloud.storage.Storage; import com.google.cloud.storage.Storage.BlobWriteOption; import com.google.cloud.storage.StorageException; + +import java.io.InputStream; import java.nio.file.Path; import java.util.concurrent.Callable; -final class UploadCallable implements Callable { - private final TransferManagerConfig transferManagerConfig; +final class UploadCallable implements Callable { private final Storage storage; private final BlobInfo originalBlob; - private final Path sourceFile; + private final T source; private final ParallelUploadConfig parallelUploadConfig; private final Storage.BlobWriteOption[] opts; public UploadCallable( - TransferManagerConfig transferManagerConfig, Storage storage, BlobInfo originalBlob, - Path sourceFile, + T source, ParallelUploadConfig parallelUploadConfig, BlobWriteOption[] opts) { - this.transferManagerConfig = transferManagerConfig; this.storage = storage; this.originalBlob = originalBlob; - this.sourceFile = sourceFile; + this.source = source; this.parallelUploadConfig = parallelUploadConfig; this.opts = opts; } - public UploadResult call() throws Exception { + public UploadResult call() { // TODO: Check for chunking return uploadWithoutChunking(); } private UploadResult uploadWithoutChunking() { try { - Blob from = storage.createFrom(originalBlob, sourceFile, opts); + Blob from; + if (source instanceof Path) { + from = storage.createFrom(originalBlob, (Path) source, opts); + } else if (source instanceof InputStream) { + from = storage.createFrom(originalBlob, (InputStream) source, opts); + } else { + throw new IllegalArgumentException("Unsupported source type: " + source.getClass().getName()); + } return UploadResult.newBuilder(originalBlob, TransferStatus.SUCCESS) .setUploadedBlob(from.asBlobInfo()) .build(); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadSource.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadSource.java new file mode 100644 index 0000000000..bd1f5c6284 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/UploadSource.java @@ -0,0 +1,25 @@ +package com.google.cloud.storage.transfermanager; + +final class UploadSource { + private final String blobName; + private final T source; + private final boolean isParallelCompositeUpload; + + UploadSource(String blobName, T source, boolean isParallelCompositeUpload) { + this.blobName = blobName; + this.source = source; + this.isParallelCompositeUpload = isParallelCompositeUpload; + } + + public String getBlobName() { + return blobName; + } + + public T getSource() { + return source; + } + + public boolean isParallelCompositeUpload() { + return isParallelCompositeUpload; + } +} \ No newline at end of file