Skip to content

Commit

Permalink
branch-3.0: [fix] let backup work on azure #46411 (#46445)
Browse files Browse the repository at this point in the history
Cherry-picked from #46411

Co-authored-by: Yongqiang YANG <[email protected]>
  • Loading branch information
github-actions[bot] and dataroaring authored Jan 10, 2025
1 parent a70a6b7 commit 9b375be
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 28 deletions.
2 changes: 1 addition & 1 deletion be/src/io/fs/azure_obj_storage_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ ObjectStorageResponse AzureObjStorageClient::list_objects(const ObjectStoragePat
return _client->ListBlobs(list_opts);
});
get_file_file(resp);
while (!resp.NextPageToken->empty()) {
while (resp.NextPageToken.HasValue()) {
list_opts.ContinuationToken = resp.NextPageToken;
resp = s3_get_rate_limit([&]() {
SCOPED_BVAR_LATENCY(s3_bvar::s3_list_latency);
Expand Down
4 changes: 2 additions & 2 deletions be/src/util/s3_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ std::shared_ptr<io::ObjStorageClient> S3ClientFactory::_create_azure_client(
std::make_shared<Azure::Storage::StorageSharedKeyCredential>(s3_conf.ak, s3_conf.sk);

const std::string container_name = s3_conf.bucket;
const std::string uri = fmt::format("{}://{}.blob.core.windows.net/{}",
config::s3_client_http_scheme, s3_conf.ak, container_name);
const std::string uri =
fmt::format("{}://{}.blob.core.windows.net/{}", "https", s3_conf.ak, container_name);

auto containerClient = std::make_shared<Azure::Storage::Blobs::BlobContainerClient>(uri, cred);
LOG_INFO("create one azure client with {}", s3_conf.to_string());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ public TStorageBackendType toThrift() {
return TStorageBackendType.JFS;
case LOCAL:
return TStorageBackendType.LOCAL;
// deprecated
case AZURE:
return TStorageBackendType.AZURE;
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ private static Map<String, String> getBeAWSPropertiesFromS3(Map<String, String>
if (properties.containsKey(PropertyConverter.USE_PATH_STYLE)) {
beProperties.put(PropertyConverter.USE_PATH_STYLE, properties.get(PropertyConverter.USE_PATH_STYLE));
}
if (properties.containsKey(S3Properties.PROVIDER)) {
beProperties.put(S3Properties.PROVIDER, properties.get(S3Properties.PROVIDER));
}
return beProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.azure.storage.blob.batch.BlobBatch;
import com.azure.storage.blob.batch.BlobBatchClient;
import com.azure.storage.blob.batch.BlobBatchClientBuilder;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobProperties;
import com.azure.storage.blob.models.BlobStorageException;
Expand All @@ -53,6 +54,7 @@
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
Expand Down Expand Up @@ -196,6 +198,9 @@ public Status deleteObject(String remotePath) {
LOG.info("delete file " + remotePath + " success");
return Status.OK;
} catch (BlobStorageException e) {
if (e.getErrorCode() == BlobErrorCode.BLOB_NOT_FOUND) {
return Status.OK;
}
return new Status(
Status.ErrCode.COMMON_ERROR,
"get file from azure error: " + e.getServiceMessage());
Expand Down Expand Up @@ -331,6 +336,7 @@ public Status globList(String remotePath, List<RemoteFile> result, boolean fileN
LOG.info("path pattern {}", pathPattern.toString());
PathMatcher matcher = FileSystems.getDefault().getPathMatcher("glob:" + pathPattern.toString());

HashSet<String> directorySet = new HashSet<>();
String listPrefix = getLongestPrefix(globPath);
LOG.info("azure glob list prefix is {}", listPrefix);
ListBlobsOptions options = new ListBlobsOptions().setPrefix(listPrefix);
Expand All @@ -343,18 +349,36 @@ public Status globList(String remotePath, List<RemoteFile> result, boolean fileN
elementCnt++;
java.nio.file.Path blobPath = Paths.get(blobItem.getName());

if (!matcher.matches(blobPath)) {
continue;
boolean isPrefix = false;
while (blobPath.normalize().toString().startsWith(listPrefix)) {
if (LOG.isDebugEnabled()) {
LOG.debug("get blob {}", blobPath.normalize().toString());
}
if (!matcher.matches(blobPath)) {
isPrefix = true;
blobPath = blobPath.getParent();
continue;
}
if (directorySet.contains(blobPath.normalize().toString())) {
break;
}
if (isPrefix) {
directorySet.add(blobPath.normalize().toString());
}

matchCnt++;
RemoteFile remoteFile = new RemoteFile(
fileNameOnly ? blobPath.getFileName().toString() : constructS3Path(blobPath.toString(),
uri.getBucket()),
!isPrefix,
isPrefix ? -1 : blobItem.getProperties().getContentLength(),
isPrefix ? -1 : blobItem.getProperties().getContentLength(),
isPrefix ? 0 : blobItem.getProperties().getLastModified().getSecond());
result.add(remoteFile);

blobPath = blobPath.getParent();
isPrefix = true;
}
matchCnt++;
RemoteFile remoteFile = new RemoteFile(
fileNameOnly ? blobPath.getFileName().toString() : constructS3Path(blobPath.toString(),
uri.getBucket()),
!blobItem.isPrefix(),
blobItem.isPrefix() ? -1 : blobItem.getProperties().getContentLength(),
blobItem.getProperties().getContentLength(),
blobItem.getProperties().getLastModified().getSecond());
result.add(remoteFile);
}
newContinuationToken = pagedResponse.getContinuationToken();
} while (newContinuationToken != null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ protected void setProperties(Map<String, String> properties) {
.equalsIgnoreCase("true");
forceParsingByStandardUri = this.properties.getOrDefault(PropertyConverter.FORCE_PARSING_BY_STANDARD_URI,
"false").equalsIgnoreCase("true");

String endpoint = this.properties.get(S3Properties.ENDPOINT);
String region = this.properties.get(S3Properties.REGION);

this.properties.put(S3Properties.REGION, PropertyConverter.checkRegion(endpoint, region, S3Properties.REGION));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.doris.fs.remote;

import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.backup.Status;
import org.apache.doris.common.UserException;
Expand All @@ -35,13 +34,13 @@ public class AzureFileSystem extends ObjFileSystem {
private static final Logger LOG = LogManager.getLogger(AzureFileSystem.class);

public AzureFileSystem(Map<String, String> properties) {
super(StorageType.AZURE.name(), StorageType.AZURE, new AzureObjStorage(properties));
super(StorageType.AZURE.name(), StorageType.S3, new AzureObjStorage(properties));
initFsProperties();
}

@VisibleForTesting
public AzureFileSystem(AzureObjStorage storage) {
super(StorageBackend.StorageType.AZURE.name(), StorageBackend.StorageType.AZURE, storage);
super(StorageType.AZURE.name(), StorageType.S3, storage);
initFsProperties();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@
import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalDatabase;
import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalTable;
import org.apache.doris.fs.PersistentFileSystem;
import org.apache.doris.fs.remote.AzureFileSystem;
import org.apache.doris.fs.remote.BrokerFileSystem;
import org.apache.doris.fs.remote.ObjFileSystem;
import org.apache.doris.fs.remote.S3FileSystem;
Expand Down Expand Up @@ -566,7 +567,8 @@ public class GsonUtils {
.registerSubtype(JFSFileSystem.class, JFSFileSystem.class.getSimpleName())
.registerSubtype(OFSFileSystem.class, OFSFileSystem.class.getSimpleName())
.registerSubtype(ObjFileSystem.class, ObjFileSystem.class.getSimpleName())
.registerSubtype(S3FileSystem.class, S3FileSystem.class.getSimpleName());
.registerSubtype(S3FileSystem.class, S3FileSystem.class.getSimpleName())
.registerSubtype(AzureFileSystem.class, AzureFileSystem.class.getSimpleName());

private static RuntimeTypeAdapterFactory<org.apache.doris.backup.AbstractJob>
jobBackupTypeAdapterFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public void testS3RepositoryPropertiesConverter() throws Exception {
CreateRepositoryStmt analyzedStmtNew = createStmt(s3RepoNew);
Assertions.assertEquals(analyzedStmtNew.getProperties().size(), 3);
Repository repositoryNew = getRepository(analyzedStmtNew, "s3_repo_new");
Assertions.assertEquals(repositoryNew.getRemoteFileSystem().getProperties().size(), 4);
Assertions.assertEquals(repositoryNew.getRemoteFileSystem().getProperties().size(), 5);
}

private static Repository getRepository(CreateRepositoryStmt analyzedStmt, String name) throws DdlException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -58,19 +59,20 @@ public I(String pattern, long expectedMatchSize) {
}

public static List<I> genInputs() {
// refer genObjectKeys
List<I> inputs = new ArrayList<I>();
inputs.add(new I("s3://gavin-test-jp/azure-test/1/*/tmp*", 8196L));
inputs.add(new I("s3://gavin-test-jp/azure-test/1/tmp*", 4098L));
inputs.add(new I("s3://gavin-test-jp/azure-test/1/*tmp*", 4098L));
inputs.add(new I("s3://gavin-test-jp/azure-test/1/**/tmp*", 20490L));
inputs.add(new I("s3://gavin-test-jp/azure-test/**/tmp*", 32784L));
inputs.add(new I("s3://gavin-test-jp/azure-test/*", 0L)); // no files at 1st level
inputs.add(new I("s3://gavin-test-jp/azure-test/*", 3L)); // no files at 1st level
inputs.add(new I("s3://gavin-test-jp/azure-test/2/*", 4098L));
inputs.add(new I("s3://gavin-test-jp/azure-test/2*/*", 4098L));
inputs.add(new I("s3://gavin-test-jp/azure-test/2/*I*", 591L));
inputs.add(new I("s3://gavin-test-jp/azure-test/1", 0L));
inputs.add(new I("s3://gavin-test-jp/azure-test/2", 0L));
inputs.add(new I("s3://gavin-test-jp/azure-test/3", 0L));
inputs.add(new I("s3://gavin-test-jp/azure-test/1", 1L));
inputs.add(new I("s3://gavin-test-jp/azure-test/2", 1L));
inputs.add(new I("s3://gavin-test-jp/azure-test/3", 1L));
inputs.add(new I("s3://gavin-test-jp/azure-test/1/tmp.k*", 61L));
inputs.add(new I("s3://gavin-test-jp/azure-test/1/tmp.[a-z]*", 1722L));
inputs.add(new I("s3://gavin-test-jp/azure-test/[12]/tmp.[a-z]*", 3444L));
Expand Down Expand Up @@ -120,13 +122,13 @@ public void testGlobListWithMockedAzureStorage() {
boolean fileNameOnly = false;
// FIXME(gavin): Mock the result returned from azure blob to make this UT work when no aksk and network
Status st = azs.globList(i.pattern, result, fileNameOnly);
System.out.println("testGlobListWithMockedAzureStorage pattern: " + i.pattern + " matched " + result.size());
Assertions.assertTrue(st.ok());
Assertions.assertEquals(i.expectedMatchSize, result.size());
for (int j = 0; j < result.size() && j < 10; ++j) {
System.out.println(result.get(j).getName());
}
System.out.println("pattern: " + i.pattern + " matched " + result.size());
System.out.println("====================");

});
}

Expand All @@ -136,13 +138,20 @@ public void testFsGlob() {
String pattern = i.pattern.substring(19); // remove prefix s3://gavin-test-jp/
PathMatcher matcher = FileSystems.getDefault().getPathMatcher("glob:" + pattern);
List<String> matchedPaths = new ArrayList<>();
HashSet<String> directories = new HashSet<>();
for (String p : genObjectKeys()) {
java.nio.file.Path blobPath = Paths.get(p);
if (!matcher.matches(blobPath)) {
continue;

while (blobPath != null) {
if (matcher.matches(blobPath) && !directories.contains(blobPath.toString())) {
matchedPaths.add(blobPath.toString());
directories.add(blobPath.toString());
}
blobPath = blobPath.getParent();
}
matchedPaths.add(p);
}
System.out.println("pattern: " + i.pattern + " matched " + matchedPaths.size());
System.out.println("====================");
Assertions.assertEquals(i.expectedMatchSize, matchedPaths.size());
}
}
Expand Down

0 comments on commit 9b375be

Please sign in to comment.