Skip to content

Commit

Permalink
Web console: Fixed sampling for delta source in classic data loader a…
Browse files Browse the repository at this point in the history
…nd MSQ (apache#17160)

Added the delta tile to MSQ and made sure it works there and classic data loader. It is special because it always assumes a parquet format so it has to skip some steps in the code. Now the framework is there to add more tiles that force a specific format
  • Loading branch information
vogievetsky authored Sep 27, 2024
1 parent dc223f2 commit 7417ead
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 136 deletions.
4 changes: 2 additions & 2 deletions web-console/script/druid
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ function _build_distribution() {
(
# Add HEAD as an allowed HTTP method since this is how we check when the Druid service is ready.
cd "$(_get_code_root)" \
&& mvn -Pdist,skip-static-checks,skip-tests -Dmaven.javadoc.skip=true -q -T1C install \
&& mvn -Pdist,bundle-contrib-exts,skip-static-checks,skip-tests -Dforbiddenapis.skip=true -Dcheckstyle.skip=true -Dpmd.skip=true -Dmaven.javadoc.skip=true -Danimal.sniffer.skip=true -Denforcer.skip=true -Dcyclonedx.skip=true -q -T1C install \
&& cd distribution/target \
&& tar xzf "apache-druid-$(_get_druid_version)-bin.tar.gz" \
&& cd apache-druid-$(_get_druid_version) \
&& mkdir -p extensions/druid-testing-tools \
&& cp "$(_get_code_root)/extensions-core/testing-tools/target/druid-testing-tools-$(_get_druid_version).jar" extensions/druid-testing-tools/ \
&& mkdir -p extensions/druid-compressed-bigdecimal \
&& cp "$(_get_code_root)/extensions-contrib/compressed-bigdecimal/target/druid-compressed-bigdecimal-$(_get_druid_version).jar" extensions/druid-compressed-bigdecimal/ \
&& echo -e "\n\ndruid.extensions.loadList=[\"druid-hdfs-storage\", \"druid-kafka-indexing-service\", \"druid-multi-stage-query\", \"druid-testing-tools\", \"druid-bloom-filter\", \"druid-datasketches\", \"druid-histogram\", \"druid-stats\", \"druid-compressed-bigdecimal\"]" >> conf/druid/auto/_common/common.runtime.properties \
&& echo -e "\n\ndruid.extensions.loadList=[\"druid-hdfs-storage\", \"druid-kafka-indexing-service\", \"druid-multi-stage-query\", \"druid-testing-tools\", \"druid-bloom-filter\", \"druid-datasketches\", \"druid-histogram\", \"druid-stats\", \"druid-compressed-bigdecimal\", \"druid-parquet-extensions\", \"druid-deltalake-extensions\"]" >> conf/druid/auto/_common/common.runtime.properties \
&& echo -e "\n\ndruid.server.http.allowedHttpMethods=[\"HEAD\"]" >> conf/druid/auto/_common/common.runtime.properties \
&& echo -e "\n\ndruid.export.storage.baseDir=/" >> conf/druid/auto/_common/common.runtime.properties \
)
Expand Down
5 changes: 2 additions & 3 deletions web-console/src/components/json-input/json-input.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ import AceEditor from 'react-ace';

import './json-input.scss';

function parseHjson(str: string) {
// Throwing on empty input is more consistent with how JSON.parse works
if (str.trim() === '') throw new Error('empty hjson');
function parseHjson(str: string): any {
if (str.trim() === '') return;
return Hjson.parse(str);
}

Expand Down
10 changes: 8 additions & 2 deletions web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,10 @@ export function isDruidSource(spec: Partial<IngestionSpec>): boolean {
return deepGet(spec, 'spec.ioConfig.inputSource.type') === 'druid';
}

export function isFixedFormatSource(spec: Partial<IngestionSpec>): boolean {
return oneOf(deepGet(spec, 'spec.ioConfig.inputSource.type'), 'druid', 'delta');
}

export function getPossibleSystemFieldsForSpec(spec: Partial<IngestionSpec>): string[] {
const inputSource = deepGet(spec, 'spec.ioConfig.inputSource');
if (!inputSource) return [];
Expand Down Expand Up @@ -1064,7 +1068,6 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F
label: 'Delta filter',
type: 'json',
placeholder: '{"type": "=", "column": "name", "value": "foo"}',
defaultValue: {},
info: (
<>
<ExternalLink
Expand All @@ -1081,7 +1084,7 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F
label: 'Delta snapshot version',
type: 'number',
placeholder: '(latest)',
defaultValue: {},
zeroMeansUndefined: true,
info: (
<>
The snapshot version to read from the Delta table. By default, the latest snapshot is
Expand Down Expand Up @@ -1616,6 +1619,9 @@ export function guessDataSourceNameFromInputSource(inputSource: InputSource): st
return actualPath ? actualPath.path : uriPath ? filenameFromPath(uriPath) : undefined;
}

case 'delta':
return inputSource.tablePath ? filenameFromPath(inputSource.tablePath) : undefined;

case 'http':
return Array.isArray(inputSource.uris) ? filenameFromPath(inputSource.uris[0]) : undefined;

Expand Down
3 changes: 1 addition & 2 deletions web-console/src/druid-models/input-source/input-source.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,6 @@ export const INPUT_SOURCE_FIELDS: Field<InputSource>[] = [
type: 'json',
placeholder: '{"type": "=", "column": "name", "value": "foo"}',
defined: typeIsKnown(KNOWN_TYPES, 'delta'),
required: false,
info: (
<>
<ExternalLink href={`${getLink('DOCS')}/ingestion/input-sources/#delta-filter-object`}>
Expand All @@ -668,8 +667,8 @@ export const INPUT_SOURCE_FIELDS: Field<InputSource>[] = [
label: 'Delta snapshot version',
type: 'number',
placeholder: '(latest)',
zeroMeansUndefined: true,
defined: typeIsKnown(KNOWN_TYPES, 'delta'),
required: false,
info: (
<>
The snapshot version to read from the Delta table. By default, the latest snapshot is read.
Expand Down
13 changes: 9 additions & 4 deletions web-console/src/utils/sampler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/

import { dedupe, F, SqlExpression, SqlFunction } from '@druid-toolkit/query';
import type { CancelToken } from 'axios';
import * as JSONBig from 'json-bigint-native';

import type {
Expand All @@ -40,6 +41,7 @@ import {
getSpecType,
getTimestampSchema,
isDruidSource,
isFixedFormatSource,
PLACEHOLDER_TIMESTAMP_SPEC,
REINDEX_TIMESTAMP_SPEC,
TIME_COLUMN,
Expand Down Expand Up @@ -187,12 +189,15 @@ export async function getProxyOverlordModules(): Promise<string[]> {
export async function postToSampler(
sampleSpec: SampleSpec,
forStr: string,
cancelToken?: CancelToken,
): Promise<SampleResponse> {
sampleSpec = fixSamplerLookups(fixSamplerTypes(sampleSpec));

let sampleResp: any;
try {
sampleResp = await Api.instance.post(`/druid/indexer/v1/sampler?for=${forStr}`, sampleSpec);
sampleResp = await Api.instance.post(`/druid/indexer/v1/sampler?for=${forStr}`, sampleSpec, {
cancelToken,
});
} catch (e) {
throw new Error(getDruidErrorMessage(e));
}
Expand Down Expand Up @@ -269,8 +274,7 @@ export async function sampleForConnect(
sampleStrategy,
);

const reingestMode = isDruidSource(spec);
if (!reingestMode) {
if (!isFixedFormatSource(spec)) {
ioConfig = deepSet(
ioConfig,
'inputFormat',
Expand All @@ -282,6 +286,7 @@ export async function sampleForConnect(
);
}

const reingestMode = isDruidSource(spec);
const sampleSpec: SampleSpec = {
type: samplerType,
spec: {
Expand All @@ -290,7 +295,7 @@ export async function sampleForConnect(
dataSchema: {
dataSource: 'sample',
timestampSpec: reingestMode ? REINDEX_TIMESTAMP_SPEC : PLACEHOLDER_TIMESTAMP_SPEC,
dimensionsSpec: {},
dimensionsSpec: { useSchemaDiscovery: true },
granularitySpec: {
rollup: false,
},
Expand Down
77 changes: 45 additions & 32 deletions web-console/src/views/load-data-view/load-data-view.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ import {
invalidPartitionConfig,
isDruidSource,
isEmptyIngestionSpec,
isFixedFormatSource,
isKafkaOrKinesis,
isStreamingSpec,
issueWithIoConfig,
Expand Down Expand Up @@ -267,26 +268,27 @@ function showBlankLine(line: SampleEntry): string {

function formatSampleEntries(
sampleEntries: SampleEntry[],
specialSource: undefined | 'druid' | 'kafka' | 'kinesis',
): string {
if (!sampleEntries.length) return 'No data returned from sampler';
specialSource: undefined | 'fixedFormat' | 'druid' | 'kafka' | 'kinesis',
): string[] {
if (!sampleEntries.length) return ['No data returned from sampler'];

switch (specialSource) {
case 'fixedFormat':
return sampleEntries.map(l => JSONBig.stringify(l.parsed));

case 'druid':
return sampleEntries.map(showDruidLine).join('\n');
return sampleEntries.map(showDruidLine);

case 'kafka':
return sampleEntries.map(showKafkaLine).join('\n');
return sampleEntries.map(showKafkaLine);

case 'kinesis':
return sampleEntries.map(showKinesisLine).join('\n');
return sampleEntries.map(showKinesisLine);

default:
return (
sampleEntries.every(l => !l.parsed)
? sampleEntries.map(showBlankLine)
: sampleEntries.map(showRawLine)
).join('\n');
return sampleEntries.every(l => !l.parsed)
? sampleEntries.map(showBlankLine)
: sampleEntries.map(showRawLine);
}
}

Expand Down Expand Up @@ -553,18 +555,19 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat

isStepEnabled(step: Step): boolean {
const { spec, cacheRows } = this.state;
const druidSource = isDruidSource(spec);
const ioConfig: IoConfig = deepGet(spec, 'spec.ioConfig') || EMPTY_OBJECT;

switch (step) {
case 'connect':
return Boolean(spec.type);

case 'parser':
return Boolean(!druidSource && spec.type && !issueWithIoConfig(ioConfig));
return Boolean(!isFixedFormatSource(spec) && spec.type && !issueWithIoConfig(ioConfig));

case 'timestamp':
return Boolean(!druidSource && cacheRows && deepGet(spec, 'spec.dataSchema.timestampSpec'));
return Boolean(
!isDruidSource(spec) && cacheRows && deepGet(spec, 'spec.dataSchema.timestampSpec'),
);

case 'transform':
case 'filter':
Expand Down Expand Up @@ -1258,7 +1261,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
const deltaState: Partial<LoadDataViewState> = {
inputQueryState: new QueryState({ data: sampleResponse }),
};
if (isDruidSource(spec)) {
if (isFixedFormatSource(spec)) {
deltaState.cacheRows = getCacheRowsFromSampleResponse(sampleResponse);
}
this.setState(deltaState as LoadDataViewState);
Expand All @@ -1270,8 +1273,15 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
const specType = getSpecType(spec);
const ioConfig: IoConfig = deepGet(spec, 'spec.ioConfig') || EMPTY_OBJECT;
const inlineMode = deepGet(spec, 'spec.ioConfig.inputSource.type') === 'inline';
const fixedFormatSource = isFixedFormatSource(spec);
const druidSource = isDruidSource(spec);
const specialSource = druidSource ? 'druid' : isKafkaOrKinesis(specType) ? specType : undefined;
const specialSource = druidSource
? 'druid'
: fixedFormatSource
? 'fixedFormat'
: isKafkaOrKinesis(specType)
? specType
: undefined;

let mainFill: JSX.Element | string;
if (inlineMode) {
Expand Down Expand Up @@ -1303,7 +1313,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
<TextArea
className="raw-lines"
readOnly
value={formatSampleEntries(inputData, specialSource)}
value={formatSampleEntries(inputData, specialSource).join('\n')}
/>
)}
{inputQueryState.isLoading() && <Loader />}
Expand Down Expand Up @@ -1375,7 +1385,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
</div>
{this.renderNextBar({
disabled: !inputQueryState.data,
nextStep: druidSource ? 'transform' : 'parser',
nextStep: druidSource ? 'transform' : fixedFormatSource ? 'timestamp' : 'parser',
onNextStep: () => {
if (!inputQueryState.data) return false;
const inputData = inputQueryState.data;
Expand Down Expand Up @@ -1423,6 +1433,15 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
}
}

this.updateSpec(fillDataSourceNameIfNeeded(newSpec));
}
if (fixedFormatSource) {
const newSpec = deepSet(
spec,
'spec.dataSchema.timestampSpec',
getTimestampSpec(inputQueryState.data),
);

this.updateSpec(fillDataSourceNameIfNeeded(newSpec));
} else {
const issue = issueWithSampleData(
Expand Down Expand Up @@ -1675,21 +1694,15 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
disabled: !parserQueryState.data,
onNextStep: () => {
if (!parserQueryState.data) return false;
let possibleTimestampSpec: TimestampSpec;
if (isDruidSource(spec)) {
possibleTimestampSpec = {
column: TIME_COLUMN,
format: 'auto',
};
} else {
possibleTimestampSpec = getTimestampSpec(parserQueryState.data);
}

if (possibleTimestampSpec) {
const newSpec = deepSet(spec, 'spec.dataSchema.timestampSpec', possibleTimestampSpec);
this.updateSpec(newSpec);
}
const possibleTimestampSpec = isDruidSource(spec)
? {
column: TIME_COLUMN,
format: 'auto',
}
: getTimestampSpec(parserQueryState.data);

const newSpec = deepSet(spec, 'spec.dataSchema.timestampSpec', possibleTimestampSpec);
this.updateSpec(newSpec);
return true;
},
})}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ export const SqlDataLoaderView = React.memo(function SqlDataLoaderView(
<TitleFrame title="Load data" subtitle="Select input type">
<InputSourceStep
initInputSource={inputSource}
mode="sampler"
onSet={(inputSource, inputFormat) => {
setExternalConfigStep({ inputSource, inputFormat });
}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ export const ConnectExternalDataDialog = React.memo(function ConnectExternalData
) : (
<InputSourceStep
initInputSource={inputSource}
mode="sampler"
onSet={(inputSource, inputFormat, partitionedByHint) => {
setExternalConfigStep({ inputSource, inputFormat, partitionedByHint });
}}
Expand Down
Loading

0 comments on commit 7417ead

Please sign in to comment.