From 8d5a7634aee4864b5ffe45e76e0689f9ddef1076 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Mon, 24 Jun 2024 20:27:23 +0530 Subject: [PATCH] Fix empty datasource schema on the broker --- ...-compose.centralized-datasource-schema.yml | 2 +- .../druid/testing/utils/DataLoaderHelper.java | 4 +++ .../testing/utils/SqlTestQueryHelper.java | 23 ++++++++++++++++ .../schema/BrokerSegmentMetadataCache.java | 7 +++++ .../BrokerSegmentMetadataCacheTest.java | 26 +++++++++++++++++++ 5 files changed, 61 insertions(+), 1 deletion(-) diff --git a/integration-tests/docker/docker-compose.centralized-datasource-schema.yml b/integration-tests/docker/docker-compose.centralized-datasource-schema.yml index 39ce98b1302b..e89e49bc1327 100644 --- a/integration-tests/docker/docker-compose.centralized-datasource-schema.yml +++ b/integration-tests/docker/docker-compose.centralized-datasource-schema.yml @@ -81,7 +81,7 @@ services: service: druid-broker environment: - DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP} - - druid_sql_planner_metadataRefreshPeriod=PT20S + - druid_sql_planner_metadataRefreshPeriod=PT30S - druid_sql_planner_disableSegmentMetadataQueries=true depends_on: - druid-coordinator diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/DataLoaderHelper.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/DataLoaderHelper.java index 692ab962e62d..67317f3e9118 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/DataLoaderHelper.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/DataLoaderHelper.java @@ -23,6 +23,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.testing.clients.CoordinatorResourceTestClient; +import org.testng.Assert; public final class DataLoaderHelper { @@ -50,6 +51,9 @@ public void waitUntilDatasourceIsReady(String datasource) () -> sqlTestQueryHelper.isDatasourceLoadedInSQL(datasource), StringUtils.format("Waiting for [%s] to be ready for SQL queries", datasource) ); + + Assert.assertTrue(sqlTestQueryHelper.verifyTimeColumnIsPresent(datasource)); + LOG.info("Datasource [%s] ready for SQL queries", datasource); } } diff --git a/integration-tests/src/main/java/org/apache/druid/testing/utils/SqlTestQueryHelper.java b/integration-tests/src/main/java/org/apache/druid/testing/utils/SqlTestQueryHelper.java index 962b4a103d06..06a1f680b700 100644 --- a/integration-tests/src/main/java/org/apache/druid/testing/utils/SqlTestQueryHelper.java +++ b/integration-tests/src/main/java/org/apache/druid/testing/utils/SqlTestQueryHelper.java @@ -69,4 +69,27 @@ public boolean isDatasourceLoadedInSQL(String datasource) return false; } } + + public boolean verifyTimeColumnIsPresent(String datasource) + { + final SqlQuery query = new SqlQuery( + "SELECT __time FROM \"" + datasource + "\" LIMIT 1", + null, + false, + false, + false, + null, + null + ); + + try { + //noinspection unchecked + queryClient.query(getQueryURL(broker), query); + return true; + } + catch (Exception e) { + LOG.debug(e, "Check query failed"); + return false; + } + } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java index 0573d8d49ee1..5b67b019288b 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java @@ -246,6 +246,13 @@ public void refresh(final Set segmentsToRefresh, final Set da continue; } + if (rowSignature.getColumnNames().isEmpty()) { + // this case could arise when metadata refresh is disabled on broker + // and a new datasource is added + log.info("datasource [%s] has empty signature.", dataSource); + continue; + } + final PhysicalDatasourceMetadata physicalDatasourceMetadata = dataSourceMetadataFactory.build(dataSource, rowSignature); updateDSMetadata(dataSource, physicalDatasourceMetadata); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java index f8660b63494a..e44a6130b6b9 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java @@ -54,6 +54,7 @@ import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexStorageAdapter; +import org.apache.druid.segment.Segment; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.incremental.IncrementalIndexSchema; @@ -86,6 +87,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import javax.xml.crypto.Data; import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -1027,4 +1029,28 @@ public void testInvokeSegmentSchemaAnnounced() throws InterruptedException buildSchemaMarkAndTableLatch(); serverView.invokeSegmentSchemasAnnouncedDummy(); } + + @Test + public void testNoDatasourceSchemaWhenNoSegmentMetadata() throws InterruptedException, IOException + { + BrokerSegmentMetadataCacheConfig config = new BrokerSegmentMetadataCacheConfig(); + config.setDisableSegmentMetadataQueries(true); + + BrokerSegmentMetadataCache schema = buildSchemaMarkAndTableLatch( + config, + new NoopCoordinatorClient() + ); + + schema.start(); + schema.awaitInitialization(); + + List segments = schema.getSegmentMetadataSnapshot().values() + .stream() + .map(AvailableSegmentMetadata::getSegment) + .collect(Collectors.toList()); + + schema.refresh(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()), Collections.singleton("foo")); + + Assert.assertNull(schema.getDatasource("foo")); + } }