Skip to content

Commit

Permalink
Only create container in AzureStorage for write operations (apache#…
Browse files Browse the repository at this point in the history
…16558)

* Remove unused constants

* Refactor getBlockBlobLength

* Better link

* Upper-case log

* Mark defaultStorageAccount nullable

This is the case if you do not use Azure for deep-storage but ingest from Azure blobs.

* Do not always create a new container if it doesn't exist

Specifically, only create a container if uploading a blob or writing a blob stream

* Add lots of comments, group methods

* Revert "Mark defaultStorageAccount nullable"

* Add mockito for junit

* Add extra test

* Add comment

Thanks George.

* Pass blockSize as Long

* Test more branches...
  • Loading branch information
amaechler authored Jun 7, 2024
1 parent efe9079 commit e6a82e8
Show file tree
Hide file tree
Showing 9 changed files with 421 additions and 191 deletions.
6 changes: 6 additions & 0 deletions extensions-core/azure-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,12 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.data.input.azure;

import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.specialized.BlockBlobClient;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
Expand Down Expand Up @@ -161,12 +160,7 @@ public Iterator<LocationWithSize> getDescriptorIteratorForPrefixes(List<URI> pre
public long getObjectSize(CloudObjectLocation location)
{
try {
final BlockBlobClient blobWithAttributes = storage.getBlockBlobReferenceWithAttributes(
location.getBucket(),
location.getPath()
);

return blobWithAttributes.getProperties().getBlobSize();
return storage.getBlockBlobLength(location.getBucket(), location.getPath());
}
catch (BlobStorageException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.data.input.azure;

import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.specialized.BlockBlobClient;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
Expand Down Expand Up @@ -180,12 +179,7 @@ public long getObjectSize(CloudObjectLocation location)
try {
AzureStorage azureStorage = new AzureStorage(azureIngestClientFactory, location.getBucket());
Pair<String, String> locationInfo = getContainerAndPathFromObjectLocation(location);
final BlockBlobClient blobWithAttributes = azureStorage.getBlockBlobReferenceWithAttributes(
locationInfo.lhs,
locationInfo.rhs
);

return blobWithAttributes.getProperties().getBlobSize();
return azureStorage.getBlockBlobLength(locationInfo.lhs, locationInfo.rhs);
}
catch (BlobStorageException e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -246,7 +240,9 @@ public String toString()
public static Pair<String, String> getContainerAndPathFromObjectLocation(CloudObjectLocation location)
{
String[] pathParts = location.getPath().split("/", 2);
// If there is no path specified, use a empty path as azure will throw a exception that is more clear than a index error.

// If there is no path specified, use an empty path as Azure will throw an exception
// that is more clear than an index error.
return Pair.of(pathParts[0], pathParts.length == 2 ? pathParts[1] : "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
public class AzureDataSegmentKiller implements DataSegmentKiller
{
private static final Logger log = new Logger(AzureDataSegmentKiller.class);
private static final Integer MAX_MULTI_OBJECT_DELETE_SIZE = 256; // https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id

private final AzureDataSegmentConfig segmentConfig;
private final AzureInputDataConfig inputDataConfig;
Expand Down
Loading

0 comments on commit e6a82e8

Please sign in to comment.