Skip to content

Commit

Permalink
Pipe: Fix BatchActivateTemplateStatement is not handled correctly whe…
Browse files Browse the repository at this point in the history
…n some of the timeseries already exists (#12587)
  • Loading branch information
Caideyipi authored May 24, 2024
1 parent 1517e81 commit aa2687c
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.receiver.visitor;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.db.exception.LoadRuntimeOutOfMemoryException;
import org.apache.iotdb.db.exception.sql.SemanticException;
Expand All @@ -44,13 +45,14 @@
*/
public class PipeStatementExceptionVisitor extends StatementVisitor<TSStatus, Exception> {
@Override
public TSStatus visitNode(StatementNode node, Exception context) {
public TSStatus visitNode(final StatementNode node, final Exception context) {
return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode())
.setMessage(context.getMessage());
}

@Override
public TSStatus visitLoadFile(LoadTsFileStatement loadTsFileStatement, Exception context) {
public TSStatus visitLoadFile(
final LoadTsFileStatement loadTsFileStatement, final Exception context) {
if (context instanceof LoadRuntimeOutOfMemoryException) {
return new TSStatus(
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
Expand All @@ -63,63 +65,78 @@ public TSStatus visitLoadFile(LoadTsFileStatement loadTsFileStatement, Exception
}

@Override
public TSStatus visitCreateTimeseries(CreateTimeSeriesStatement statement, Exception context) {
public TSStatus visitCreateTimeseries(
final CreateTimeSeriesStatement statement, final Exception context) {
return visitGeneralCreateTimeSeries(statement, context);
}

@Override
public TSStatus visitCreateAlignedTimeseries(
CreateAlignedTimeSeriesStatement statement, Exception context) {
final CreateAlignedTimeSeriesStatement statement, final Exception context) {
return visitGeneralCreateTimeSeries(statement, context);
}

@Override
public TSStatus visitCreateMultiTimeseries(
CreateMultiTimeSeriesStatement statement, Exception context) {
final CreateMultiTimeSeriesStatement statement, final Exception context) {
return visitGeneralCreateTimeSeries(statement, context);
}

@Override
public TSStatus visitInternalCreateTimeseries(
InternalCreateTimeSeriesStatement statement, Exception context) {
final InternalCreateTimeSeriesStatement statement, final Exception context) {
return visitGeneralCreateTimeSeries(statement, context);
}

@Override
public TSStatus visitInternalCreateMultiTimeSeries(
InternalCreateMultiTimeSeriesStatement statement, Exception context) {
final InternalCreateMultiTimeSeriesStatement statement, final Exception context) {
return visitGeneralCreateTimeSeries(statement, context);
}

private TSStatus visitGeneralCreateTimeSeries(Statement statement, Exception context) {
private TSStatus visitGeneralCreateTimeSeries(
final Statement statement, final Exception context) {
if (context instanceof SemanticException) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
} else if (isAutoCreateConflict(context)) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getCause().getMessage());
}
return visitStatement(statement, context);
}

@Override
public TSStatus visitActivateTemplate(
ActivateTemplateStatement activateTemplateStatement, Exception context) {
final ActivateTemplateStatement activateTemplateStatement, final Exception context) {
return visitGeneralActivateTemplate(activateTemplateStatement, context);
}

@Override
public TSStatus visitBatchActivateTemplate(
BatchActivateTemplateStatement batchActivateTemplateStatement, Exception context) {
final BatchActivateTemplateStatement batchActivateTemplateStatement,
final Exception context) {
return visitGeneralActivateTemplate(batchActivateTemplateStatement, context);
}

// InternalBatchActivateTemplateNode is converted to BatchActivateTemplateStatement
// No need to handle InternalBatchActivateTemplateStatement

private TSStatus visitGeneralActivateTemplate(
Statement activateTemplateStatement, Exception context) {
final Statement activateTemplateStatement, final Exception context) {
if (context instanceof MetadataException || context instanceof StatementAnalyzeException) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
} else if (isAutoCreateConflict(context)) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getCause().getMessage());
}
return visitStatement(activateTemplateStatement, context);
}

private boolean isAutoCreateConflict(final Exception e) {
return e instanceof RuntimeException
&& e.getCause() instanceof IoTDBException
&& e.getCause().getMessage().contains("already been created as database");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,16 @@ public TSStatus visitActivateTemplate(
@Override
public TSStatus visitBatchActivateTemplate(
final BatchActivateTemplateStatement batchActivateTemplateStatement, final TSStatus context) {
if (context.getCode() == TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
for (final TSStatus status : context.getSubStatus()) {
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& status.getCode() != TSStatusCode.TEMPLATE_IS_IN_USE.getStatusCode()) {
return visitStatement(batchActivateTemplateStatement, context);
}
}
return new TSStatus(TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(context.getMessage());
}
return visitGeneralActivateTemplate(batchActivateTemplateStatement, context);
}

Expand Down

0 comments on commit aa2687c

Please sign in to comment.