Skip to content

Commit

Permalink
INSERT/REPLACE complex target column types are validated against sour…
Browse files Browse the repository at this point in the history
…ce input expressions (apache#16223)

* * fix

* * fix

* * address review comments

* * fix

* * simplify tests

* * fix complex type nullability issue

* * address review comments

* * address test review comments

* * fix checkstyle
  • Loading branch information
zachjsh authored Apr 16, 2024
1 parent cf841b8 commit a5428e7
Show file tree
Hide file tree
Showing 6 changed files with 575 additions and 611 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.druid.metadata.TestDerbyConnector.DerbyConnectorRule5;
import org.apache.druid.sql.calcite.CalciteCatalogInsertTest;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.apache.druid.sql.calcite.util.SqlTestFramework;
import org.junit.jupiter.api.extension.RegisterExtension;

Expand Down Expand Up @@ -69,8 +68,8 @@ public void finalizeTestFramework(SqlTestFramework sqlTestFramework)

public void buildDatasources()
{
resolvedTables.forEach((datasourceName, datasourceTable) -> {
DatasourceFacade catalogMetadata = ((DatasourceTable) datasourceTable).effectiveMetadata().catalogMetadata();
RESOLVED_TABLES.forEach((datasourceName, datasourceTable) -> {
DatasourceFacade catalogMetadata = datasourceTable.effectiveMetadata().catalogMetadata();
TableBuilder tableBuilder = TableBuilder.datasource(datasourceName, catalogMetadata.segmentGranularityString());
catalogMetadata.columnFacades().forEach(
columnFacade -> {
Expand All @@ -92,14 +91,6 @@ public void buildDatasources()

createTableMetadata(tableBuilder.build());
});
DatasourceFacade catalogMetadata =
((DatasourceTable) resolvedTables.get("foo")).effectiveMetadata().catalogMetadata();
TableBuilder tableBuilder = TableBuilder.datasource("foo", catalogMetadata.segmentGranularityString());
catalogMetadata.columnFacades().forEach(
columnFacade -> {
tableBuilder.column(columnFacade.spec().name(), columnFacade.spec().dataType());
}
);
}

private void createTableMetadata(TableMetadata table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.druid.metadata.TestDerbyConnector.DerbyConnectorRule5;
import org.apache.druid.sql.calcite.CalciteCatalogReplaceTest;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.apache.druid.sql.calcite.util.SqlTestFramework;
import org.junit.jupiter.api.extension.RegisterExtension;

Expand Down Expand Up @@ -68,8 +67,8 @@ public void finalizeTestFramework(SqlTestFramework sqlTestFramework)

public void buildDatasources()
{
resolvedTables.forEach((datasourceName, datasourceTable) -> {
DatasourceFacade catalogMetadata = ((DatasourceTable) datasourceTable).effectiveMetadata().catalogMetadata();
RESOLVED_TABLES.forEach((datasourceName, datasourceTable) -> {
DatasourceFacade catalogMetadata = datasourceTable.effectiveMetadata().catalogMetadata();
TableBuilder tableBuilder = TableBuilder.datasource(datasourceName, catalogMetadata.segmentGranularityString());
catalogMetadata.columnFacades().forEach(
columnFacade -> {
Expand All @@ -92,7 +91,7 @@ public void buildDatasources()
createTableMetadata(tableBuilder.build());
});
DatasourceFacade catalogMetadata =
((DatasourceTable) resolvedTables.get("foo")).effectiveMetadata().catalogMetadata();
RESOLVED_TABLES.get("foo").effectiveMetadata().catalogMetadata();
TableBuilder tableBuilder = TableBuilder.datasource("foo", catalogMetadata.segmentGranularityString());
catalogMetadata.columnFacades().forEach(
columnFacade -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.calcite.sql.SqlWindow;
import org.apache.calcite.sql.SqlWith;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.type.SqlTypeUtil;
import org.apache.calcite.sql.validate.IdentifierNamespace;
Expand All @@ -54,19 +53,21 @@
import org.apache.calcite.util.Static;
import org.apache.calcite.util.Util;
import org.apache.druid.catalog.model.facade.DatasourceFacade;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.Types;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
import org.apache.druid.sql.calcite.parser.ExternalDestinationSqlIdentifier;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.apache.druid.sql.calcite.table.RowSignatures;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.ArrayList;
Expand Down Expand Up @@ -474,19 +475,11 @@ private RelDataType validateTargetType(
fields.add(Pair.of(colName, sourceField.getType()));
continue;
}
SqlTypeName sqlTypeName = SqlTypeName.get(definedCol.sqlStorageType());
RelDataType relType = typeFactory.createSqlType(sqlTypeName);
if (NullHandling.replaceWithDefault() && !SqlTypeFamily.STRING.contains(relType)) {
fields.add(Pair.of(
colName,
relType
));
} else {
fields.add(Pair.of(
colName,
typeFactory.createTypeWithNullability(relType, true)
));
}
RelDataType relType = computeTypeForDefinedCol(definedCol, sourceField);
fields.add(Pair.of(
colName,
typeFactory.createTypeWithNullability(relType, sourceField.getType().isNullable())
));
}

// Perform the SQL-standard check: that the SELECT column can be
Expand Down Expand Up @@ -516,8 +509,14 @@ protected void checkTypeAssignment(
RelDataType targetFieldRelDataType = targetFields.get(i).getType();
ColumnType sourceFieldColumnType = Calcites.getColumnTypeForRelDataType(sourceFielRelDataType);
ColumnType targetFieldColumnType = Calcites.getColumnTypeForRelDataType(targetFieldRelDataType);

if (targetFieldColumnType != ColumnType.leastRestrictiveType(targetFieldColumnType, sourceFieldColumnType)) {
try {
if (!Objects.equals(
targetFieldColumnType,
ColumnType.leastRestrictiveType(targetFieldColumnType, sourceFieldColumnType))) {
throw new Types.IncompatibleTypeException(targetFieldColumnType, sourceFieldColumnType);
}
}
catch (Types.IncompatibleTypeException e) {
SqlNode node = getNthExpr(query, i, sourceCount);
String targetTypeString;
String sourceTypeString;
Expand All @@ -534,12 +533,39 @@ protected void checkTypeAssignment(
Static.RESOURCE.typeNotAssignable(
targetFields.get(i).getName(), targetTypeString,
sourceFields.get(i).getName(), sourceTypeString));

}
}
// the call to base class definition will insert implicit casts / coercions where needed.
super.checkTypeAssignment(sourceScope, table, sourceRowType, targetRowType, query);
}

protected RelDataType computeTypeForDefinedCol(
final DatasourceFacade.ColumnFacade definedCol,
final RelDataTypeField sourceField
)
{
SqlTypeName sqlTypeName = SqlTypeName.get(definedCol.sqlStorageType());
RelDataType relType;
if (sqlTypeName != null) {
relType = typeFactory.createSqlType(sqlTypeName);
} else {
ColumnType columnType = ColumnType.fromString(definedCol.sqlStorageType());
if (columnType != null && columnType.getType().equals(ValueType.COMPLEX)) {
relType = RowSignatures.makeComplexType(typeFactory, columnType, sourceField.getType().isNullable());
} else {
relType = RowSignatures.columnTypeToRelDataType(
typeFactory,
columnType,
// this nullability is ignored for complex types for some reason, hence the check for complex above.
sourceField.getType().isNullable()
);
}
}

return relType;
}

/**
* Locates the n'th expression in an INSERT or UPDATE query.
*
Expand Down
Loading

0 comments on commit a5428e7

Please sign in to comment.