Skip to content

Commit

Permalink
[Enhancement] Try refresh Iceberg table on file not found
Browse files Browse the repository at this point in the history
Signed-off-by: Samrose Ahmed <[email protected]>
  • Loading branch information
Samrose-Ahmed committed Aug 7, 2024
1 parent 6de3ab7 commit 2c316ce
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 0 deletions.
3 changes: 3 additions & 0 deletions be/src/formats/parquet/file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,9 @@ Status FileReader::get_next(ChunkPtr* chunk) {
return Status::OK();
}
} else {
if (status.code() == TStatusCode::REMOTE_FILE_NOT_FOUND) {
return status;
}
auto s = strings::Substitute("FileReader::get_next failed. reason = $0, file = $1", status.to_string(),
_file->filename());
LOG(WARNING) << s;
Expand Down
3 changes: 3 additions & 0 deletions be/src/io/s3_input_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
namespace starrocks::io {

inline Status make_error_status(const Aws::S3::S3Error& error) {
if (error.GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) {
return Status::RemoteFileNotFound("S3 file does not exist");
}
return Status::IOError(fmt::format(
"BE access S3 file failed, SdkResponseCode={}, SdkErrorType={}, SdkErrorMessage={}",
static_cast<int>(error.GetResponseCode()), static_cast<int>(error.GetErrorType()), error.GetMessage()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ public HDFSScanNodePredicates getScanNodePredicates() {
return scanNodePredicates;
}

public IcebergTable getIcebergTable() {
return icebergTable;
}

@Override
protected String debugString() {
MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

import com.google.common.collect.ImmutableSet;
import com.starrocks.catalog.HiveTable;
import com.starrocks.catalog.IcebergTable;
import com.starrocks.common.Config;
import com.starrocks.common.UserException;
import com.starrocks.common.profile.Tracers;
import com.starrocks.common.util.DebugUtil;
import com.starrocks.connector.ConnectorMetadata;
import com.starrocks.connector.exception.RemoteFileNotFoundException;
import com.starrocks.planner.HdfsScanNode;
import com.starrocks.planner.IcebergScanNode;
import com.starrocks.planner.ScanNode;
import com.starrocks.rpc.RpcException;
import com.starrocks.server.CatalogMgr;
Expand Down Expand Up @@ -79,6 +81,33 @@ private static void handleRemoteFileNotFound(RemoteFileNotFoundException e, Retr
// clear query level metadata cache
metadata.clear();
}
} else if (scanNode instanceof IcebergScanNode) {
IcebergTable icebergTable = ((IcebergScanNode) scanNode).getIcebergTable();
String catalogName = icebergTable.getCatalogName();
if (CatalogMgr.isExternalCatalog(catalogName)) {
existExternalCatalog = true;
ConnectorMetadata metadata = GlobalStateMgr.getCurrentState().getMetadataMgr()
.getOptionalMetadata(icebergTable.getCatalogName()).get();
// refresh catalog level metadata cache
metadata.refreshTable(icebergTable.getRemoteDbName(), icebergTable, new ArrayList<>(), true);
// clear query level metadata cache
metadata.clear();
// Replan
try {
context.execPlan = StatementPlanner.plan(context.parsedStmt, context.connectContext);
} catch (Exception planEx) {
// throw original
if (LOG.isDebugEnabled()) {
ConnectContext connectContext = context.connectContext;
LOG.debug("encounter exception when retry, [QueryId={}] [SQL={}], ",
DebugUtil.printId(connectContext.getExecutionId()),
context.parsedStmt.getOrigStmt() == null ? "" :
context.parsedStmt.getOrigStmt().originStmt,
planEx);
}
throw e;
}
}
}
}
if (!existExternalCatalog) {
Expand Down

0 comments on commit 2c316ce

Please sign in to comment.