Skip to content

Commit

Permalink
Fix empty datasource schema on the broker
Browse files Browse the repository at this point in the history
  • Loading branch information
findingrish committed Jun 24, 2024
1 parent 4a63cc8 commit 8d5a763
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ services:
service: druid-broker
environment:
- DRUID_INTEGRATION_TEST_GROUP=${DRUID_INTEGRATION_TEST_GROUP}
- druid_sql_planner_metadataRefreshPeriod=PT20S
- druid_sql_planner_metadataRefreshPeriod=PT30S
- druid_sql_planner_disableSegmentMetadataQueries=true
depends_on:
- druid-coordinator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.testng.Assert;

public final class DataLoaderHelper
{
Expand Down Expand Up @@ -50,6 +51,9 @@ public void waitUntilDatasourceIsReady(String datasource)
() -> sqlTestQueryHelper.isDatasourceLoadedInSQL(datasource),
StringUtils.format("Waiting for [%s] to be ready for SQL queries", datasource)
);

Assert.assertTrue(sqlTestQueryHelper.verifyTimeColumnIsPresent(datasource));

LOG.info("Datasource [%s] ready for SQL queries", datasource);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,27 @@ public boolean isDatasourceLoadedInSQL(String datasource)
return false;
}
}

public boolean verifyTimeColumnIsPresent(String datasource)
{
final SqlQuery query = new SqlQuery(
"SELECT __time FROM \"" + datasource + "\" LIMIT 1",
null,
false,
false,
false,
null,
null
);

try {
//noinspection unchecked
queryClient.query(getQueryURL(broker), query);
return true;
}
catch (Exception e) {
LOG.debug(e, "Check query failed");
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,13 @@ public void refresh(final Set<SegmentId> segmentsToRefresh, final Set<String> da
continue;
}

if (rowSignature.getColumnNames().isEmpty()) {
// this case could arise when metadata refresh is disabled on broker
// and a new datasource is added
log.info("datasource [%s] has empty signature.", dataSource);
continue;
}

final PhysicalDatasourceMetadata physicalDatasourceMetadata = dataSourceMetadataFactory.build(dataSource, rowSignature);
updateDSMetadata(dataSource, physicalDatasourceMetadata);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
Expand Down Expand Up @@ -86,6 +87,7 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

import javax.xml.crypto.Data;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -1027,4 +1029,28 @@ public void testInvokeSegmentSchemaAnnounced() throws InterruptedException
buildSchemaMarkAndTableLatch();
serverView.invokeSegmentSchemasAnnouncedDummy();
}

@Test
public void testNoDatasourceSchemaWhenNoSegmentMetadata() throws InterruptedException, IOException
{
BrokerSegmentMetadataCacheConfig config = new BrokerSegmentMetadataCacheConfig();
config.setDisableSegmentMetadataQueries(true);

BrokerSegmentMetadataCache schema = buildSchemaMarkAndTableLatch(
config,
new NoopCoordinatorClient()
);

schema.start();
schema.awaitInitialization();

List<DataSegment> segments = schema.getSegmentMetadataSnapshot().values()
.stream()
.map(AvailableSegmentMetadata::getSegment)
.collect(Collectors.toList());

schema.refresh(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()), Collections.singleton("foo"));

Assert.assertNull(schema.getDatasource("foo"));
}
}

0 comments on commit 8d5a763

Please sign in to comment.