diff --git a/web-console/src/druid-models/ingestion-spec/ingestion-spec.spec.ts b/web-console/src/druid-models/ingestion-spec/ingestion-spec.spec.ts index febc76139189..d58e8c077924 100644 --- a/web-console/src/druid-models/ingestion-spec/ingestion-spec.spec.ts +++ b/web-console/src/druid-models/ingestion-spec/ingestion-spec.spec.ts @@ -24,6 +24,7 @@ import { cleanSpec, guessColumnTypeFromInput, guessColumnTypeFromSampleResponse, + guessKafkaInputFormat, guessSimpleInputFormat, updateSchemaWithSample, upgradeSpec, @@ -669,6 +670,36 @@ describe('ingestion-spec', () => { }); }); }); + + describe('guessKafkaInputFormat', () => { + const sample = [ + { + 'kafka.timestamp': 1710962988515, + 'kafka.topic': 'kttm2', + 'raw': + '{"timestamp":"2019-08-25T00:00:00.031Z","session":"S56194838","number":"16","event":{"type":"PercentClear","percentage":55},"agent":{"type":"Browser","category":"Personal computer","browser":"Chrome","browser_version":"76.0.3809.100","os":"Windows 7","platform":"Windows"},"client_ip":"181.13.41.82","geo_ip":{"continent":"South America","country":"Argentina","region":"Santa Fe","city":"Rosario"},"language":["es","es-419"],"adblock_list":"NoAdblock","app_version":"1.9.6","path":"http://www.koalastothemax.com/","loaded_image":"http://www.koalastothemax.com/img/koalas2.jpg","referrer":"Direct","referrer_host":"Direct","server_ip":"172.31.57.89","screen":"1680x1050","window":"1680x939","session_length":76261,"timezone":"N/A","timezone_offset":"180"}', + }, + { + 'kafka.timestamp': 1710962988518, + 'kafka.topic': 'kttm2', + 'raw': + '{"timestamp":"2019-08-25T00:00:00.059Z","session":"S46093731","number":"24","event":{"type":"PercentClear","percentage":85},"agent":{"type":"Mobile Browser","category":"Smartphone","browser":"Chrome Mobile","browser_version":"50.0.2661.89","os":"Android","platform":"Android"},"client_ip":"177.242.100.0","geo_ip":{"continent":"North America","country":"Mexico","region":"Chihuahua","city":"Nuevo Casas Grandes"},"language":["en","es","es-419","es-MX"],"adblock_list":"NoAdblock","app_version":"1.9.6","path":"https://koalastothemax.com/","loaded_image":"https://koalastothemax.com/img/koalas1.jpg","referrer":"https://www.google.com/","referrer_host":"www.google.com","server_ip":"172.31.11.5","screen":"320x570","window":"540x743","session_length":252689,"timezone":"CDT","timezone_offset":"300"}', + }, + ]; + + it('works when single topic', () => { + expect(guessKafkaInputFormat(sample, false)).toEqual({ type: 'json' }); + }); + + it('works when multi-topic', () => { + expect(guessKafkaInputFormat(sample, true)).toEqual({ + type: 'kafka', + valueFormat: { + type: 'json', + }, + }); + }); + }); }); describe('spec utils', () => { diff --git a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx index ec765b9c3459..205d287ac2bb 100644 --- a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx +++ b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx @@ -2418,7 +2418,10 @@ export function fillInputFormatIfNeeded( spec, 'spec.ioConfig.inputFormat', getSpecType(spec) === 'kafka' - ? guessKafkaInputFormat(filterMap(sampleResponse.data, l => l.input)) + ? guessKafkaInputFormat( + filterMap(sampleResponse.data, l => l.input), + typeof deepGet(spec, 'spec.ioConfig.topicPattern') === 'string', + ) : guessSimpleInputFormat( filterMap(sampleResponse.data, l => l.input?.raw), isStreamingSpec(spec), @@ -2430,15 +2433,27 @@ function noNumbers(xs: string[]): boolean { return xs.every(x => isNaN(Number(x))); } -export function guessKafkaInputFormat(sampleRaw: Record[]): InputFormat { +export function guessKafkaInputFormat( + sampleRaw: Record[], + multiTopic: boolean, +): InputFormat { const hasHeader = sampleRaw.some(x => Object.keys(x).some(k => k.startsWith('kafka.header.'))); const keys = filterMap(sampleRaw, x => x['kafka.key']); - const payloads = filterMap(sampleRaw, x => x.raw); + const valueFormat = guessSimpleInputFormat( + filterMap(sampleRaw, x => x.raw), + true, + ); + + if (!hasHeader && !keys.length && !multiTopic) { + // No headers or keys and just a single topic means do not pick the 'kafka' format by default as it is less performant + return valueFormat; + } + return { type: 'kafka', headerFormat: hasHeader ? { type: 'string' } : undefined, keyFormat: keys.length ? guessSimpleInputFormat(keys, true) : undefined, - valueFormat: guessSimpleInputFormat(payloads, true), + valueFormat, }; }