From 93dc27bf09d677a7b08e67ab90d4823a1c246919 Mon Sep 17 00:00:00 2001 From: Arya <90748009+aryamohanan@users.noreply.github.com> Date: Tue, 10 Sep 2024 21:11:01 +0530 Subject: [PATCH] fix(kafka): enforced string format for Kafka trace headers and dropped binary support (#1296) BREAKING CHANGE: - Removed the ability to configure the header format; headers will always be sent in 'string' format. - Removed support for 'binary' format and code related to sending headers in 'binary' or 'both' formats. refs INSTA-809 --- packages/aws-fargate/test/using_api/test.js | 16 +- .../src/announceCycle/unannounced.js | 7 +- .../test/announceCycle/unannounced_test.js | 9 +- packages/collector/test/apps/agentStub.js | 8 +- .../collector/test/apps/agentStubControls.js | 3 - .../test/tracing/messaging/kafkajs/test.js | 315 +++++++----------- .../tracing/messaging/node-rdkafka/test.js | 9 +- packages/core/src/tracing/cls.js | 3 +- packages/core/src/tracing/constants.js | 25 +- packages/core/src/tracing/index.js | 1 - .../instrumentation/messaging/kafkaJs.js | 125 +------ .../instrumentation/messaging/rdkafka.js | 46 +-- packages/core/src/util/normalizeConfig.js | 34 +- packages/core/test/tracing/index_test.js | 3 +- .../core/test/util/normalizeConfig_test.js | 57 ---- .../google-cloud-run/test/using_api/test.js | 18 +- 16 files changed, 148 insertions(+), 531 deletions(-) diff --git a/packages/aws-fargate/test/using_api/test.js b/packages/aws-fargate/test/using_api/test.js index f917b26d17..e21682b165 100644 --- a/packages/aws-fargate/test/using_api/test.js +++ b/packages/aws-fargate/test/using_api/test.js @@ -106,20 +106,10 @@ describe('Using the API', function () { return retry(async () => { expect(response).to.be.an('object'); expect(response.message).to.equal('Hello Fargate!'); - - // During phase 1 of the Kafka header migration (October 2022 - October 2023) there will be a debug log about - // ignoring the option 'both' for rdkafka. We do not care about that log message in this test. - const debug = response.logs.debug.filter(msg => !msg.includes('Ignoring configuration or default value')); - - // As part of the Kafka header migration phase 2, we have added warning logs regarding the removal of the option - // to configure Kafka header formats. This test skips the warning message, and the warning itself will be removed - // in the next major release. - const warn = response.logs.warn.filter(msg => !msg.includes('Kafka header format')); - expect(debug).to.contain('Sending data to Instana (/serverless/metrics).'); - expect(debug).to.contain('Sent data to Instana (/serverless/metrics).'); - + expect(response.logs.debug).to.contain('Sending data to Instana (/serverless/metrics).'); + expect(response.logs.debug).to.contain('Sent data to Instana (/serverless/metrics).'); expect(response.logs.info).to.be.empty; - expect(warn).to.deep.equal([ + expect(response.logs.warn).to.deep.equal([ 'INSTANA_DISABLE_CA_CHECK is set, which means that the server certificate will not be verified against the ' + 'list of known CAs. This makes your service vulnerable to MITM attacks when connecting to Instana. This ' + 'setting should never be used in production, unless you use our on-premises product and are unable to ' + diff --git a/packages/collector/src/announceCycle/unannounced.js b/packages/collector/src/announceCycle/unannounced.js index 7f010e804d..dc22d6850a 100644 --- a/packages/collector/src/announceCycle/unannounced.js +++ b/packages/collector/src/announceCycle/unannounced.js @@ -49,7 +49,6 @@ const maxRetryDelay = 60 * 1000; // one minute /** * @typedef {Object} KafkaTracingConfig * @property {boolean} [trace-correlation] - * @property {string} [header-format] */ module.exports = { @@ -192,11 +191,7 @@ function applyKafkaTracingConfiguration(agentResponse) { traceCorrelation: kafkaTracingConfigFromAgent['trace-correlation'] != null ? kafkaTracingConfigFromAgent['trace-correlation'] - : tracingConstants.kafkaTraceCorrelationDefault, - headerFormat: - kafkaTracingConfigFromAgent['header-format'] != null - ? kafkaTracingConfigFromAgent['header-format'] - : tracingConstants.kafkaHeaderFormatDefault + : tracingConstants.kafkaTraceCorrelationDefault }; ensureNestedObjectExists(agentOpts.config, ['tracing', 'kafka']); agentOpts.config.tracing.kafka = kafkaTracingConfig; diff --git a/packages/collector/test/announceCycle/unannounced_test.js b/packages/collector/test/announceCycle/unannounced_test.js index c5bfc3f132..37fbf9e790 100644 --- a/packages/collector/test/announceCycle/unannounced_test.js +++ b/packages/collector/test/announceCycle/unannounced_test.js @@ -153,8 +153,7 @@ describe('unannounced state', () => { expect(agentOptsStub.config).to.deep.equal({ tracing: { kafka: { - traceCorrelation: constants.kafkaTraceCorrelationDefault, - headerFormat: constants.kafkaHeaderFormatDefault + traceCorrelation: constants.kafkaTraceCorrelationDefault } } }); @@ -167,8 +166,7 @@ describe('unannounced state', () => { prepareAnnounceResponse({ tracing: { kafka: { - 'trace-correlation': false, - 'header-format': 'string' + 'trace-correlation': false } } }); @@ -177,8 +175,7 @@ describe('unannounced state', () => { expect(agentOptsStub.config).to.deep.equal({ tracing: { kafka: { - traceCorrelation: false, - headerFormat: 'string' + traceCorrelation: false } } }); diff --git a/packages/collector/test/apps/agentStub.js b/packages/collector/test/apps/agentStub.js index a5d0d649e2..dd0269484c 100644 --- a/packages/collector/test/apps/agentStub.js +++ b/packages/collector/test/apps/agentStub.js @@ -38,7 +38,6 @@ const enableSpanBatching = process.env.ENABLE_SPANBATCHING === 'true'; const kafkaTraceCorrelation = process.env.KAFKA_TRACE_CORRELATION ? process.env.KAFKA_TRACE_CORRELATION === 'true' : null; -const kafkaHeaderFormat = process.env.KAFKA_HEADER_FORMAT; let discoveries = {}; let rejectAnnounceAttempts = 0; @@ -87,21 +86,18 @@ app.put('/com.instana.plugin.nodejs.discovery', (req, res) => { } }; - if (kafkaTraceCorrelation != null || kafkaHeaderFormat || extraHeaders.length > 0 || enableSpanBatching) { + if (kafkaTraceCorrelation != null || extraHeaders.length > 0 || enableSpanBatching) { response.tracing = {}; if (extraHeaders.length > 0) { response.tracing['extra-http-headers'] = extraHeaders; } - if (kafkaTraceCorrelation != null || kafkaHeaderFormat) { + if (kafkaTraceCorrelation != null) { response.tracing.kafka = {}; if (kafkaTraceCorrelation != null) { response.tracing.kafka['trace-correlation'] = kafkaTraceCorrelation; } - if (kafkaHeaderFormat) { - response.tracing.kafka['header-format'] = kafkaHeaderFormat; - } } if (enableSpanBatching) { diff --git a/packages/collector/test/apps/agentStubControls.js b/packages/collector/test/apps/agentStubControls.js index 930c393358..2d59175437 100644 --- a/packages/collector/test/apps/agentStubControls.js +++ b/packages/collector/test/apps/agentStubControls.js @@ -42,9 +42,6 @@ class AgentStubControls { if (opts.kafkaConfig.traceCorrelation != null) { env.KAFKA_TRACE_CORRELATION = opts.kafkaConfig.traceCorrelation.toString(); } - if (opts.kafkaConfig.headerFormat) { - env.KAFKA_HEADER_FORMAT = opts.kafkaConfig.headerFormat; - } } this.agentStub = spawn('node', [path.join(__dirname, 'agentStub.js')], { diff --git a/packages/collector/test/tracing/messaging/kafkajs/test.js b/packages/collector/test/tracing/messaging/kafkajs/test.js index 63ba89b63b..950f8b6077 100644 --- a/packages/collector/test/tracing/messaging/kafkajs/test.js +++ b/packages/collector/test/tracing/messaging/kafkajs/test.js @@ -37,136 +37,127 @@ mochaSuiteFn('tracing/kafkajs', function () { const nextUseEachBatch = getCircularList([false, true]); const nextError = getCircularList([false, 'consumer']); - ['binary', 'string', 'both'].forEach(headerFormat => { - describe(`header format: ${headerFormat}`, function () { - [false, true].forEach(useSendBatch => { - const useEachBatch = nextUseEachBatch(); - const error = nextError(); - - describe( - `kafkajs (header format: ${headerFormat}, ${useSendBatch ? 'sendBatch' : 'sendMessage'} => ` + - `${useEachBatch ? 'eachBatch' : 'eachMessage'}, error: ${error})`, - () => { - let producerControls; - let consumerControls; - - before(async () => { - consumerControls = new ProcessControls({ - appPath: path.join(__dirname, 'consumer'), - useGlobalAgent: true - }); - - producerControls = new ProcessControls({ - appPath: path.join(__dirname, 'producer'), - useGlobalAgent: true, - env: { - INSTANA_KAFKA_HEADER_FORMAT: headerFormat - } - }); - - await consumerControls.startAndWaitForAgentConnection(); - await producerControls.startAndWaitForAgentConnection(); - }); + [false, true].forEach(useSendBatch => { + const useEachBatch = nextUseEachBatch(); + const error = nextError(); - beforeEach(async () => { - await agentControls.clearReceivedTraceData(); - }); + describe( + `kafkajs, ${useSendBatch ? 'sendBatch' : 'sendMessage'} => ` + + `${useEachBatch ? 'eachBatch' : 'eachMessage'}, error: ${error})`, + () => { + let producerControls; + let consumerControls; - after(async () => { - await producerControls.stop(); - await consumerControls.stop(); - }); + before(async () => { + consumerControls = new ProcessControls({ + appPath: path.join(__dirname, 'consumer'), + useGlobalAgent: true + }); - beforeEach(async () => { - await resetMessages(consumerControls); - }); + producerControls = new ProcessControls({ + appPath: path.join(__dirname, 'producer'), + useGlobalAgent: true + }); - afterEach(async () => { - await resetMessages(consumerControls); - }); + await consumerControls.startAndWaitForAgentConnection(); + await producerControls.startAndWaitForAgentConnection(); + }); + + beforeEach(async () => { + await agentControls.clearReceivedTraceData(); + }); + + after(async () => { + await producerControls.stop(); + await consumerControls.stop(); + }); + + beforeEach(async () => { + await resetMessages(consumerControls); + }); + + afterEach(async () => { + await resetMessages(consumerControls); + }); + + it(`must trace sending and receiving and keep trace continuity, ${ + useSendBatch ? 'sendBatch' : 'sendMessage' + } => ${useEachBatch ? 'eachBatch' : 'eachMessage'}, error: ${error})`, async () => { + const parameters = { + error, + useSendBatch, + useEachBatch + }; - it(`must trace sending and receiving and keep trace continuity (header format: ${headerFormat}, ${ - useSendBatch ? 'sendBatch' : 'sendMessage' - } => ${useEachBatch ? 'eachBatch' : 'eachMessage'}, error: ${error})`, async () => { - const parameters = { - headerFormat, + await producerControls.sendRequest({ + method: 'POST', + path: '/send-messages', + simple: true, + body: JSON.stringify({ + key: 'someKey', + value: 'someMessage', + error, + useSendBatch, + useEachBatch + }), + headers: { + 'Content-Type': 'application/json' + } + }); + + await retry(async () => { + const messages = await getMessages(consumerControls); + checkMessages(messages, parameters); + const spans = await agentControls.getSpans(); + const httpEntry = verifyHttpEntry(spans); + verifyKafkaExits(spans, httpEntry, parameters); + verifyFollowUpHttpExit(spans, httpEntry); + }); + }); + + if (error === false) { + // we do not need dedicated suppression tests for error conditions + it('must not trace when suppressed', async () => { + const parameters = { error, useSendBatch, useEachBatch }; + + await producerControls.sendRequest({ + method: 'POST', + path: '/send-messages', + simple: true, + suppressTracing: true, + body: JSON.stringify({ + key: 'someKey', + value: 'someMessage', error, useSendBatch, useEachBatch - }; - - await producerControls.sendRequest({ - method: 'POST', - path: '/send-messages', - simple: true, - body: JSON.stringify({ - key: 'someKey', - value: 'someMessage', - error, - useSendBatch, - useEachBatch - }), - headers: { - 'Content-Type': 'application/json' - } - }); - - await retry(async () => { - const messages = await getMessages(consumerControls); - checkMessages(messages, parameters); - const spans = await agentControls.getSpans(); - const httpEntry = verifyHttpEntry(spans); - verifyKafkaExits(spans, httpEntry, parameters); - verifyFollowUpHttpExit(spans, httpEntry); - }); + }), + headers: { + 'Content-Type': 'application/json' + } }); - if (error === false) { - // we do not need dedicated suppression tests for error conditions - it(`must not trace when suppressed (header format: ${headerFormat})`, async () => { - const parameters = { headerFormat, error, useSendBatch, useEachBatch }; - - await producerControls.sendRequest({ - method: 'POST', - path: '/send-messages', - simple: true, - suppressTracing: true, - body: JSON.stringify({ - key: 'someKey', - value: 'someMessage', - error, - useSendBatch, - useEachBatch - }), - headers: { - 'Content-Type': 'application/json' - } - }); - - await retry(async () => { - const messages = await getMessages(consumerControls); - checkMessages(messages, parameters); - await delay(1000); - const spans = await agentControls.getSpans(); - expect(spans).to.have.lengthOf(0); - }); - }); - } - } - ); - }); - }); + await retry(async () => { + const messages = await getMessages(consumerControls); + checkMessages(messages, parameters); + await delay(1000); + const spans = await agentControls.getSpans(); + expect(spans).to.have.lengthOf(0); + }); + }); + } + } + ); }); }); describe('with error in producer ', function () { - const headerFormat = 'string'; const error = 'producer'; const useEachBatch = false; [false, true].forEach(useSendBatch => { describe( - `kafkajs (header format: ${headerFormat}, ${useSendBatch ? 'sendBatch' : 'sendMessage'} => ` + + `kafkajs, ${useSendBatch ? 'sendBatch' : 'sendMessage'} => ` + `${useEachBatch ? 'eachBatch' : 'eachMessage'}, error: ${error})`, () => { let producerControls; @@ -174,10 +165,7 @@ mochaSuiteFn('tracing/kafkajs', function () { before(async () => { producerControls = new ProcessControls({ appPath: path.join(__dirname, 'producer'), - useGlobalAgent: true, - env: { - INSTANA_KAFKA_HEADER_FORMAT: headerFormat - } + useGlobalAgent: true }); await producerControls.startAndWaitForAgentConnection(); @@ -195,7 +183,6 @@ mochaSuiteFn('tracing/kafkajs', function () { useSendBatch ? 'sendBatch' : 'sendMessage' }, error: ${error})`, async () => { const parameters = { - headerFormat, error, useSendBatch, useEachBatch @@ -275,7 +262,7 @@ mochaSuiteFn('tracing/kafkajs', function () { it('must trace sending and receiving but will not keep trace continuity', async () => { const parameters = { - headerFormat: 'correlation-disabled', + kafkaCorrelation: 'correlation-disabled', useSendBatch, useEachBatch }; @@ -306,7 +293,7 @@ mochaSuiteFn('tracing/kafkajs', function () { }); it('must not trace Kafka exits when suppressed (but will trace Kafka entries)', async () => { - const parameters = { headerFormat: 'correlation-disabled', useSendBatch, useEachBatch }; + const parameters = { kafkaCorrelation: 'correlation-disabled', useSendBatch, useEachBatch }; await producerControls.sendRequest({ method: 'POST', @@ -341,69 +328,6 @@ mochaSuiteFn('tracing/kafkajs', function () { }); }); - describe('header format from agent config', function () { - const headerFormat = 'string'; - const customAgentControls = new AgentStubControls(); - let consumerControls; - let producerControls; - - before(async () => { - await customAgentControls.startAgent({ - kafkaConfig: { headerFormat } - }); - - consumerControls = new ProcessControls({ - appPath: path.join(__dirname, 'consumer'), - agentControls: customAgentControls - }); - producerControls = new ProcessControls({ - appPath: path.join(__dirname, 'producer'), - agentControls: customAgentControls - }); - - await consumerControls.startAndWaitForAgentConnection(); - await producerControls.startAndWaitForAgentConnection(); - }); - - beforeEach(async () => { - await customAgentControls.clearReceivedTraceData(); - }); - - after(async () => { - await customAgentControls.stopAgent(); - await producerControls.stop(); - await consumerControls.stop(); - }); - - it( - `must trace sending and receiving and keep trace continuity (header format ${headerFormat} ` + - 'from agent config)', - async () => { - await producerControls.sendRequest({ - method: 'POST', - path: '/send-messages', - simple: true, - body: JSON.stringify({ - key: 'someKey', - value: 'someMessage' - }), - headers: { - 'Content-Type': 'application/json' - } - }); - - await retry(async () => { - const messages = await getMessages(consumerControls); - checkMessages(messages, { headerFormat }); - const spans = await customAgentControls.getSpans(); - const httpEntry = verifyHttpEntry(spans); - verifyKafkaExits(spans, httpEntry, { headerFormat }); - verifyFollowUpHttpExit(spans, httpEntry); - }); - } - ); - }); - describe('disable trace correlation from agent config', function () { const customAgentControls = new AgentStubControls(); @@ -438,7 +362,7 @@ mochaSuiteFn('tracing/kafkajs', function () { await consumerControls.stop(); }); - const headerFormat = 'correlation-disabled'; + const kafkaCorrelation = 'correlation-disabled'; it( 'must trace sending and receiving but will not keep trace continuity ' + @@ -459,10 +383,10 @@ mochaSuiteFn('tracing/kafkajs', function () { await retry(async () => { const messages = await getMessages(consumerControls); - checkMessages(messages, { headerFormat }); + checkMessages(messages, { kafkaCorrelation }); const spans = await customAgentControls.getSpans(); const httpEntry = verifyHttpEntry(spans); - verifyKafkaExits(spans, httpEntry, { headerFormat }); + verifyKafkaExits(spans, httpEntry, { kafkaCorrelation }); verifyFollowUpHttpExit(spans, httpEntry); }); } @@ -508,7 +432,7 @@ mochaSuiteFn('tracing/kafkajs', function () { it('must not trace when disabled', async () => { const parameters = { - headerFormat: 'tracing-disabled', + kafkaCorrelation: 'tracing-disabled', error: false, useSendBatch: false, useEachBatch: false @@ -553,10 +477,7 @@ mochaSuiteFn('tracing/kafkajs', function () { }); producerControls = new ProcessControls({ appPath: path.join(__dirname, 'producer'), - useGlobalAgent: true, - env: { - INSTANA_KAFKA_HEADER_FORMAT: 'both' - } + useGlobalAgent: true }); await consumerControls.startAndWaitForAgentConnection(); @@ -719,7 +640,7 @@ mochaSuiteFn('tracing/kafkajs', function () { } function verifyKafkaEntries(spans, parentKafkaExit, parameters) { - const { headerFormat, error, useSendBatch, useEachBatch } = parameters; + const { kafkaCorrelation, error, useSendBatch, useEachBatch } = parameters; if (error === 'producer') { return; } @@ -731,7 +652,11 @@ mochaSuiteFn('tracing/kafkajs', function () { span => expect(span.data.kafka.access).to.equal('consume'), span => expect(span.data.kafka.service).to.equal(`${topicPrefix}-1`) ]; - expectationsFirstKafkaEntry = addParentChildExpectation(expectationsFirstKafkaEntry, parentKafkaExit, headerFormat); + expectationsFirstKafkaEntry = addParentChildExpectation( + expectationsFirstKafkaEntry, + parentKafkaExit, + kafkaCorrelation + ); if (error === 'consumer') { expectationsFirstKafkaEntry.push(span => expect(span.ec).to.equal(1)); } else { @@ -764,7 +689,7 @@ mochaSuiteFn('tracing/kafkajs', function () { expectationsSecondKafkaEntry = addParentChildExpectation( expectationsSecondKafkaEntry, parentKafkaExit, - headerFormat + kafkaCorrelation ); if (error === 'consumer') { expectationsSecondKafkaEntry.push(span => expect(span.ec).to.equal(1)); @@ -791,7 +716,7 @@ mochaSuiteFn('tracing/kafkajs', function () { expectationsThirdKafkaEntry = addParentChildExpectation( expectationsThirdKafkaEntry, parentKafkaExit, - headerFormat + kafkaCorrelation ); if (error === 'consumer') { expectationsThirdKafkaEntry.push(span => expect(span.ec).to.equal(1)); @@ -813,8 +738,8 @@ mochaSuiteFn('tracing/kafkajs', function () { } } - function addParentChildExpectation(expectations, parentKafkaExit, headerFormat) { - if (headerFormat !== 'correlation-disabled') { + function addParentChildExpectation(expectations, parentKafkaExit, kafkaCorrelation) { + if (kafkaCorrelation !== 'correlation-disabled') { // With correlation headers enabled (default), Kafka entries will be the child span of a Kafka exit. expectations = expectations.concat([ span => expect(span.t).to.equal(parentKafkaExit.t), diff --git a/packages/collector/test/tracing/messaging/node-rdkafka/test.js b/packages/collector/test/tracing/messaging/node-rdkafka/test.js index d369dd8018..a6cc1db853 100644 --- a/packages/collector/test/tracing/messaging/node-rdkafka/test.js +++ b/packages/collector/test/tracing/messaging/node-rdkafka/test.js @@ -309,10 +309,7 @@ mochaSuiteFn('tracing/messaging/node-rdkafka', function () { before(async () => { producerControls = new ProcessControls({ appPath: path.join(__dirname, 'producer'), - useGlobalAgent: true, - env: { - INSTANA_KAFKA_HEADER_FORMAT: 'string' - } + useGlobalAgent: true }); consumerControls = new ProcessControls({ appPath: path.join(__dirname, 'consumer'), @@ -364,9 +361,7 @@ mochaSuiteFn('tracing/messaging/node-rdkafka', function () { let consumerControls; before(async () => { - await customAgentControls.startAgent({ - kafkaConfig: { headerFormat: 'string' } - }); + await customAgentControls.startAgent(); producerControls = new ProcessControls({ appPath: path.join(__dirname, 'producer'), diff --git a/packages/core/src/tracing/cls.js b/packages/core/src/tracing/cls.js index f62175f5cb..1583307493 100644 --- a/packages/core/src/tracing/cls.js +++ b/packages/core/src/tracing/cls.js @@ -243,8 +243,7 @@ function startSpan(spanName, kind, traceId, parentSpanId, w3cTraceContext) { // If the client code has specified a trace ID/parent ID, use the provided IDs. if (traceId) { - // The incoming trace ID/span ID from an upstream tracer could be shorter than the standard length. Some of our code - // (in particular, the binary Kafka trace correlation header X_INSTANA_C) assumes the standard length. We normalize + // The incoming trace ID/span ID from an upstream tracer could be shorter than the standard length. We normalize // both IDs here by left-padding with 0 characters. // Maintenance note (128-bit-trace-ids): When we switch to 128 bit trace IDs, we need to left-pad the trace ID to 32 diff --git a/packages/core/src/tracing/constants.js b/packages/core/src/tracing/constants.js index 3e795a328d..2624e0dac4 100644 --- a/packages/core/src/tracing/constants.js +++ b/packages/core/src/tracing/constants.js @@ -17,35 +17,20 @@ exports.traceLevelHeaderNameLowerCase = exports.traceLevelHeaderName.toLowerCase exports.syntheticHeaderName = 'X-INSTANA-SYNTHETIC'; exports.syntheticHeaderNameLowerCase = exports.syntheticHeaderName.toLowerCase(); -// legacy kafka trace correlation (binary values) -exports.kafkaLegacyTraceContextHeaderName = 'X_INSTANA_C'; -exports.kafkaLegacyTraceLevelHeaderName = 'X_INSTANA_L'; -exports.kafkaLegacyTraceLevelValueSuppressed = Buffer.from([0]); -exports.kafkaLegacyTraceLevelValueInherit = Buffer.from([1]); - -// New kafka trace correlation (string values). Available as opt-in since 2021-10, and send out together with the legacy -// binary headers by default starting in 2022-10. We will switch over to these headers completely (omitting the legacy -// headers approximately in 2023-10. +// New Kafka trace correlation (string values) was introduced as an opt-in feature in 2021-10. Initially, it was sent +// out along with the legacy binary headers by default starting in 2022-10. However, as of 2024-10, only string headers +// are supported, and the legacy binary headers are no longer supported. + exports.kafkaTraceIdHeaderName = 'X_INSTANA_T'; exports.kafkaSpanIdHeaderName = 'X_INSTANA_S'; exports.kafkaTraceLevelHeaderName = 'X_INSTANA_L_S'; -/** - * @typedef {'binary' | 'string' | 'both'} KafkaTraceCorrelationFormat - */ - -// With the current phase 1 of the Kafka header format migration, 'both', is the default. -// With phase 2 (starting approximately October 2023) it will no longer be configurable and will always use 'string'. -/** @type {KafkaTraceCorrelationFormat} */ -exports.kafkaHeaderFormatDefault = 'both'; exports.kafkaTraceCorrelationDefault = true; exports.allInstanaKafkaHeaders = [ exports.kafkaTraceIdHeaderName, exports.kafkaSpanIdHeaderName, - exports.kafkaTraceLevelHeaderName, - exports.kafkaLegacyTraceContextHeaderName, - exports.kafkaLegacyTraceLevelHeaderName + exports.kafkaTraceLevelHeaderName ]; exports.w3cTraceParent = 'traceparent'; diff --git a/packages/core/src/tracing/index.js b/packages/core/src/tracing/index.js index 6546cc18c5..844b806795 100644 --- a/packages/core/src/tracing/index.js +++ b/packages/core/src/tracing/index.js @@ -123,7 +123,6 @@ if (customInstrumentations.length > 0) { /** * @typedef {Object} KafkaTracingConfig * @property {boolean} [traceCorrelation] - * @property {string} [headerFormat] */ /** @type {Array.} */ diff --git a/packages/core/src/tracing/instrumentation/messaging/kafkaJs.js b/packages/core/src/tracing/instrumentation/messaging/kafkaJs.js index bb3c501878..5c525faf54 100644 --- a/packages/core/src/tracing/instrumentation/messaging/kafkaJs.js +++ b/packages/core/src/tracing/instrumentation/messaging/kafkaJs.js @@ -18,21 +18,17 @@ logger = require('../../../logger').getLogger('tracing/kafkajs', newLogger => { }); let traceCorrelationEnabled = constants.kafkaTraceCorrelationDefault; -let headerFormat = constants.kafkaHeaderFormatDefault; let isActive = false; exports.init = function init(config) { hook.onFileLoad(/\/kafkajs\/src\/producer\/messageProducer\.js/, instrumentProducer); hook.onFileLoad(/\/kafkajs\/src\/consumer\/runner\.js/, instrumentConsumer); - hook.onModuleLoad('kafkajs', logWarningForKafkaHeaderFormat); traceCorrelationEnabled = config.tracing.kafka.traceCorrelation; - headerFormat = config.tracing.kafka.headerFormat; }; exports.updateConfig = function updateConfig(config) { traceCorrelationEnabled = config.tracing.kafka.traceCorrelation; - headerFormat = config.tracing.kafka.headerFormat; }; exports.activate = function activate(extraConfig) { @@ -40,9 +36,6 @@ exports.activate = function activate(extraConfig) { if (extraConfig.tracing.kafka.traceCorrelation != null) { traceCorrelationEnabled = extraConfig.tracing.kafka.traceCorrelation; } - if (typeof extraConfig.tracing.kafka.headerFormat === 'string') { - headerFormat = extraConfig.tracing.kafka.headerFormat; - } } isActive = true; }; @@ -254,19 +247,6 @@ function instrumentedEachMessage(originalEachMessage) { if (message.headers[constants.kafkaTraceLevelHeaderName]) { level = String(message.headers[constants.kafkaTraceLevelHeaderName]); } - // Only fall back to legacy binary trace correlation headers if no new header is present. - if (traceId == null && parentSpanId == null && level == null) { - // The newer string header format has not been found, fall back to legacy binary headers. - if (message.headers[constants.kafkaLegacyTraceContextHeaderName]) { - const traceContextBuffer = message.headers[constants.kafkaLegacyTraceContextHeaderName]; - if (Buffer.isBuffer(traceContextBuffer) && traceContextBuffer.length === 24) { - const traceContext = tracingUtil.readTraceContextFromBuffer(traceContextBuffer); - traceId = traceContext.t; - parentSpanId = traceContext.s; - } - } - level = readTraceLevelBinary(message); - } } removeInstanaHeadersFromMessage(message); @@ -349,27 +329,6 @@ function instrumentedEachBatch(originalEachBatch) { } } - if (traceId == null && parentSpanId == null && level == null) { - // The newer string header format has not been found, fall back to legacy binary headers. - for (let msgIdx = 0; msgIdx < batch.messages.length; msgIdx++) { - if ( - batch.messages[msgIdx].headers && - batch.messages[msgIdx].headers[constants.kafkaLegacyTraceContextHeaderName] - ) { - const traceContextBuffer = batch.messages[msgIdx].headers[constants.kafkaLegacyTraceContextHeaderName]; - if (Buffer.isBuffer(traceContextBuffer) && traceContextBuffer.length === 24) { - const traceContext = tracingUtil.readTraceContextFromBuffer(traceContextBuffer); - traceId = traceContext.t; - parentSpanId = traceContext.s; - } - } - level = readTraceLevelBinary(batch.messages[msgIdx]); - if (traceId != null || parentSpanId != null || level != null) { - break; - } - } - } - for (let msgIdx = 0; msgIdx < batch.messages.length; msgIdx++) { removeInstanaHeadersFromMessage(batch.messages[msgIdx]); } @@ -411,51 +370,13 @@ function isSuppressed(level) { return level === '0'; } -function readTraceLevelBinary(message) { - if (message.headers[constants.kafkaLegacyTraceLevelHeaderName]) { - const traceLevelBuffer = message.headers[constants.kafkaLegacyTraceLevelHeaderName]; - if (Buffer.isBuffer(traceLevelBuffer) && traceLevelBuffer.length >= 1) { - return String(traceLevelBuffer.readInt8()); - } - } - return '1'; -} - function addTraceContextHeaderToAllMessages(messages, span) { if (!traceCorrelationEnabled) { return; } - switch (headerFormat) { - case 'binary': - addLegacyTraceContextHeaderToAllMessages(messages, span); - break; - case 'string': - addTraceIdSpanIdToAllMessages(messages, span); - break; - case 'both': - // fall through (both is the default) - default: - addLegacyTraceContextHeaderToAllMessages(messages, span); - addTraceIdSpanIdToAllMessages(messages, span); - } -} - -function addLegacyTraceContextHeaderToAllMessages(messages, span) { - if (Array.isArray(messages)) { - for (let msgIdx = 0; msgIdx < messages.length; msgIdx++) { - if (messages[msgIdx].headers == null) { - messages[msgIdx].headers = { - [constants.kafkaLegacyTraceContextHeaderName]: tracingUtil.renderTraceContextToBuffer(span), - [constants.kafkaLegacyTraceLevelHeaderName]: constants.kafkaLegacyTraceLevelValueInherit - }; - } else if (messages[msgIdx].headers && typeof messages[msgIdx].headers === 'object') { - messages[msgIdx].headers[constants.kafkaLegacyTraceContextHeaderName] = - tracingUtil.renderTraceContextToBuffer(span); - messages[msgIdx].headers[constants.kafkaLegacyTraceLevelHeaderName] = - constants.kafkaLegacyTraceLevelValueInherit; - } - } - } + // Add trace ID and span ID headers to all Kafka messages for trace correlation. + // 'string' headers are used by default starting from v4. + addTraceIdSpanIdToAllMessages(messages, span); } function addTraceIdSpanIdToAllMessages(messages, span) { @@ -481,34 +402,8 @@ function addTraceLevelSuppressionToAllMessages(messages) { if (!traceCorrelationEnabled) { return; } - switch (headerFormat) { - case 'binary': - addTraceLevelSuppressionToAllMessagesBinary(messages); - break; - case 'string': - addTraceLevelSuppressionToAllMessagesString(messages); - break; - case 'both': - // fall through (both is the default) - default: - addTraceLevelSuppressionToAllMessagesBinary(messages); - addTraceLevelSuppressionToAllMessagesString(messages); - } -} - -function addTraceLevelSuppressionToAllMessagesBinary(messages) { - if (Array.isArray(messages)) { - for (let msgIdx = 0; msgIdx < messages.length; msgIdx++) { - if (messages[msgIdx].headers == null) { - messages[msgIdx].headers = { - [constants.kafkaLegacyTraceLevelHeaderName]: constants.kafkaLegacyTraceLevelValueSuppressed - }; - } else if (messages[msgIdx].headers && typeof messages[msgIdx].headers === 'object') { - messages[msgIdx].headers[constants.kafkaLegacyTraceLevelHeaderName] = - constants.kafkaLegacyTraceLevelValueSuppressed; - } - } - } + // Since v4, only 'string' format is supported by default. + addTraceLevelSuppressionToAllMessagesString(messages); } function addTraceLevelSuppressionToAllMessagesString(messages) { @@ -533,13 +428,3 @@ function removeInstanaHeadersFromMessage(message) { }); } } - -// Note: This function can be removed as soon as we finish the Kafka header migration phase2. -// Might happen in major release v4. -function logWarningForKafkaHeaderFormat() { - logger.warn( - '[Deprecation Warning] The configuration option for specifying the Kafka header format will be removed in the ' + - 'next major release as the format will no longer be configurable and Instana tracers will only send string ' + - 'headers. More details see: https://ibm.biz/kafka-trace-correlation-header.' - ); -} diff --git a/packages/core/src/tracing/instrumentation/messaging/rdkafka.js b/packages/core/src/tracing/instrumentation/messaging/rdkafka.js index 33b033d653..88bcd4e1ad 100644 --- a/packages/core/src/tracing/instrumentation/messaging/rdkafka.js +++ b/packages/core/src/tracing/instrumentation/messaging/rdkafka.js @@ -14,18 +14,12 @@ const shimmer = require('../../shimmer'); const { getFunctionArguments } = require('../../../util/function_arguments'); let traceCorrelationEnabled = constants.kafkaTraceCorrelationDefault; -let logger; -logger = require('../../../logger').getLogger('tracing/rdkafka', newLogger => { - logger = newLogger; -}); - let isActive = false; exports.init = function init(config) { hook.onFileLoad(/\/node-rdkafka\/lib\/producer\.js/, instrumentProducer); hook.onFileLoad(/\/node-rdkafka\/lib\/kafka-consumer-stream\.js/, instrumentConsumerAsStream); hook.onModuleLoad('node-rdkafka', instrumentConsumer); - hook.onModuleLoad('node-rdkafka', logWarningForKafkaHeaderFormat); traceCorrelationEnabled = config.tracing.kafka.traceCorrelation; }; @@ -40,7 +34,6 @@ exports.activate = function activate(extraConfig) { traceCorrelationEnabled = extraConfig.tracing.kafka.traceCorrelation; } } - isActive = true; }; @@ -48,17 +41,6 @@ exports.deactivate = function deactivate() { isActive = false; }; -// Note: This function can be removed as soon as we finish the Kafka header migration phase 2 and remove the ability to -// configure the header format. Might happen in major release v4. -function logWarningForKafkaHeaderFormat() { - logger.warn( - '[Deprecation Warning] The Kafka header format configuration will be removed in the next major release. ' + - 'Instana tracers will only support string headers, as binary headers are not compatible with node-rdkafka. ' + - 'For more information, see the GitHub issue: https://github.com/Blizzard/node-rdkafka/pull/968, and review our ' + - 'official documentation on Kafka header configuration: https://ibm.biz/kafka-trace-correlation-header.' - ); -} - function instrumentProducer(ProducerClass) { shimmer.wrap(ProducerClass.prototype, 'produce', shimProduce); } @@ -312,16 +294,6 @@ function instrumentedConsumerEmit(ctx, originalEmit, originalArgs) { }); } -function readTraceLevelBinary(instanaHeadersAsObject) { - if (instanaHeadersAsObject[constants.kafkaLegacyTraceLevelHeaderName]) { - const traceLevelBuffer = instanaHeadersAsObject[constants.kafkaLegacyTraceLevelHeaderName]; - if (Buffer.isBuffer(traceLevelBuffer) && traceLevelBuffer.length >= 1) { - return String(traceLevelBuffer.readInt8()); - } - } - return '1'; -} - function addTraceContextHeader(headers, span) { if (!traceCorrelationEnabled) { return headers; @@ -369,7 +341,7 @@ function findInstanaHeaderValues(instanaHeadersAsObject) { let parentSpanId; let level; - // CASE: Look for the the newer string header format first. + // Since v4, only 'string' format is supported. if (instanaHeadersAsObject[constants.kafkaTraceIdHeaderName]) { traceId = String(instanaHeadersAsObject[constants.kafkaTraceIdHeaderName]); if (traceId) { @@ -385,21 +357,5 @@ function findInstanaHeaderValues(instanaHeadersAsObject) { level = String(instanaHeadersAsObject[constants.kafkaTraceLevelHeaderName]); } - // CASE: Only fall back to legacy binary trace correlation headers if no new header is present. - if (traceId == null && parentSpanId == null && level == null) { - // The newer string header format has not been found, fall back to legacy binary headers. - if (instanaHeadersAsObject[constants.kafkaLegacyTraceContextHeaderName]) { - const traceContextBuffer = instanaHeadersAsObject[constants.kafkaLegacyTraceContextHeaderName]; - - if (Buffer.isBuffer(traceContextBuffer) && traceContextBuffer.length === 24) { - const traceContext = tracingUtil.readTraceContextFromBuffer(traceContextBuffer); - traceId = traceContext.t; - parentSpanId = traceContext.s; - } - } - - level = readTraceLevelBinary(instanaHeadersAsObject); - } - return { level, traceId, longTraceId, parentSpanId }; } diff --git a/packages/core/src/util/normalizeConfig.js b/packages/core/src/util/normalizeConfig.js index 94734376c9..4106b45882 100644 --- a/packages/core/src/util/normalizeConfig.js +++ b/packages/core/src/util/normalizeConfig.js @@ -36,7 +36,6 @@ const constants = require('../tracing/constants'); /** * @typedef {Object} KafkaTracingOptions * @property {boolean} [traceCorrelation] - * @property {import('../tracing/constants').KafkaTraceCorrelationFormat} [headerFormat] */ /** @@ -81,7 +80,6 @@ const constants = require('../tracing/constants'); /** * @typedef {Object} AgentTracingKafkaConfig * @property {boolean} [traceCorrelation] - * @property {string} [headerFormat] */ /** @type {import('../core').GenericLogger} */ @@ -116,8 +114,7 @@ const defaults = { spanBatchingEnabled: false, disableW3cTraceCorrelation: false, kafka: { - traceCorrelation: true, - headerFormat: constants.kafkaHeaderFormatDefault + traceCorrelation: true } }, secrets: { @@ -126,8 +123,6 @@ const defaults = { } }; -const validKafkaHeaderFormats = ['binary', 'string', 'both']; - const validSecretsMatcherModes = ['equals-ignore-case', 'equals', 'contains-ignore-case', 'contains', 'regex', 'none']; /** @@ -548,33 +543,6 @@ function normalizeTracingKafka(config) { } else { config.tracing.kafka.traceCorrelation = defaults.tracing.kafka.traceCorrelation; } - - // @ts-ignore - config.tracing.kafka.headerFormat = - config.tracing.kafka.headerFormat || process.env.INSTANA_KAFKA_HEADER_FORMAT || defaults.tracing.kafka.headerFormat; - if (typeof config.tracing.kafka.headerFormat !== 'string') { - logger.warn( - `The value of config.tracing.kafka.headerFormat ("${config.tracing.kafka.headerFormat}") is not a string. ` + - `Assuming the default value "${defaults.tracing.kafka.headerFormat}".` - ); - config.tracing.kafka.headerFormat = defaults.tracing.kafka.headerFormat; - return; - } - // @ts-ignore - config.tracing.kafka.headerFormat = config.tracing.kafka.headerFormat.toLowerCase(); - if (validKafkaHeaderFormats.indexOf(config.tracing.kafka.headerFormat) < 0) { - logger.warn( - 'The value of config.tracing.kafka.headerFormat (or the value of INSTANA_KAFKA_HEADER_FORMAT) ' + - `("${config.tracing.kafka.headerFormat}") is not a supported header format. Assuming the default ` + - `value "${defaults.tracing.kafka.headerFormat}".` - ); - config.tracing.kafka.headerFormat = defaults.tracing.kafka.headerFormat; - return; - } - - if (config.tracing.kafka.headerFormat !== defaults.tracing.kafka.headerFormat) { - logger.info(`Kafka trace correlation header format has been set to "${config.tracing.kafka.headerFormat}".`); - } } /** diff --git a/packages/core/test/tracing/index_test.js b/packages/core/test/tracing/index_test.js index f97f291eb0..593d7b7fac 100644 --- a/packages/core/test/tracing/index_test.js +++ b/packages/core/test/tracing/index_test.js @@ -118,8 +118,7 @@ mochaSuiteFn('[UNIT] tracing/index', function () { const extraConfigFromAgent = { tracing: { kafka: { - traceCorrelation: false, - headerFormat: 'string' + traceCorrelation: false } } }; diff --git a/packages/core/test/util/normalizeConfig_test.js b/packages/core/test/util/normalizeConfig_test.js index 40587b3037..2aa1cd1336 100644 --- a/packages/core/test/util/normalizeConfig_test.js +++ b/packages/core/test/util/normalizeConfig_test.js @@ -29,7 +29,6 @@ describe('util.normalizeConfig', () => { delete process.env.INSTANA_DISABLE_SPANBATCHING; delete process.env.INSTANA_DISABLE_W3C_TRACE_CORRELATION; delete process.env.INSTANA_KAFKA_TRACE_CORRELATION; - delete process.env.INSTANA_KAFKA_HEADER_FORMAT; delete process.env.INSTANA_PACKAGE_JSON_PATH; } @@ -337,47 +336,6 @@ describe('util.normalizeConfig', () => { expect(config.tracing.kafka.traceCorrelation).to.be.false; }); - it('should set Kafka header format to binary', () => { - const config = normalizeConfig({ tracing: { kafka: { headerFormat: 'binary' } } }); - expect(config.tracing.kafka.headerFormat).to.equal('binary'); - }); - - it('should set Kafka header format to string', () => { - const config = normalizeConfig({ tracing: { kafka: { headerFormat: 'string' } } }); - expect(config.tracing.kafka.headerFormat).to.equal('string'); - }); - - it('should set Kafka header format to both', () => { - const config = normalizeConfig({ tracing: { kafka: { headerFormat: 'both' } } }); - expect(config.tracing.kafka.headerFormat).to.equal('both'); - }); - - it('should ignore non-string Kafka header format', () => { - const config = normalizeConfig({ tracing: { kafka: { headerFormat: 13 } } }); - // During phase 1 of the migration, 'both' will be the default value. In phase 2, the ability to configure the - // format will be removed and we will only use the 'string' format. - expect(config.tracing.kafka.headerFormat).to.equal('both'); - }); - - it('should ignore invalid Kafka header format', () => { - const config = normalizeConfig({ tracing: { kafka: { headerFormat: 'whatever' } } }); - // During phase 1 of the migration, 'both' will be the default value. In phase 2, the ability to configure the - // format will be removed and we will only use the 'string' format. - expect(config.tracing.kafka.headerFormat).to.equal('both'); - }); - - it('should set Kafka header format to binary via INSTANA_KAFKA_HEADER_FORMAT', () => { - process.env.INSTANA_KAFKA_HEADER_FORMAT = 'binary'; - const config = normalizeConfig(); - expect(config.tracing.kafka.headerFormat).to.equal('binary'); - }); - - it('should set Kafka header format to string via INSTANA_KAFKA_HEADER_FORMAT', () => { - process.env.INSTANA_KAFKA_HEADER_FORMAT = 'string'; - const config = normalizeConfig(); - expect(config.tracing.kafka.headerFormat).to.equal('string'); - }); - it('should disable opentelemetry if config is set', () => { const config = normalizeConfig({ tracing: { useOpentelemetry: false } @@ -404,20 +362,6 @@ describe('util.normalizeConfig', () => { expect(config.tracing.useOpentelemetry).to.equal(true); }); - it('should set Kafka header format to both via INSTANA_KAFKA_HEADER_FORMAT', () => { - process.env.INSTANA_KAFKA_HEADER_FORMAT = 'both'; - const config = normalizeConfig(); - expect(config.tracing.kafka.headerFormat).to.equal('both'); - }); - - it('should ignore invalid Kafka header format in INSTANA_KAFKA_HEADER_FORMAT', () => { - process.env.INSTANA_KAFKA_HEADER_FORMAT = 'whatever'; - const config = normalizeConfig(); - // During phase 1 of the migration, 'both' will be the default value. In phase 2, the ability to configure the - // format will be removed and we will only use the 'string' format. - expect(config.tracing.kafka.headerFormat).to.equal('both'); - }); - it('should accept custom secrets config', () => { const config = normalizeConfig({ secrets: { @@ -528,7 +472,6 @@ describe('util.normalizeConfig', () => { expect(config.tracing.spanBatchingEnabled).to.be.false; expect(config.tracing.disableW3cTraceCorrelation).to.be.false; expect(config.tracing.kafka.traceCorrelation).to.be.true; - expect(config.tracing.kafka.headerFormat).to.equal('both'); expect(config.tracing.useOpentelemetry).to.equal(true); expect(config.secrets).to.be.an('object'); diff --git a/packages/google-cloud-run/test/using_api/test.js b/packages/google-cloud-run/test/using_api/test.js index c230cca5de..529ad85dd6 100644 --- a/packages/google-cloud-run/test/using_api/test.js +++ b/packages/google-cloud-run/test/using_api/test.js @@ -101,22 +101,10 @@ describe('Using the API', function () { function verify(control, response) { expect(response).to.be.an('object'); expect(response.message).to.equal('Hello Cloud Run!'); - - // During phase 1 of the Kafka header migration (October 2022 - October 2023) there will be a debug log about - // ignoring the option 'both' for rdkafka. We do not care about that log message in this test. - - const debug = response.logs.debug.filter(msg => !msg.includes('Ignoring configuration or default value')); - - // As part of the Kafka header migration phase 2, we have added warning logs regarding the removal of the option to - // configure Kafka header formats. This test skips the warning message, and the warning itself will be removed - // in the next major release. - const warn = response.logs.warn.filter(msg => !msg.includes('Kafka header format')); - - expect(debug).to.contain('Sending data to Instana (/serverless/metrics).'); - expect(debug).to.contain('Sent data to Instana (/serverless/metrics).'); - + expect(response.logs.debug).to.contain('Sending data to Instana (/serverless/metrics).'); + expect(response.logs.debug).to.contain('Sent data to Instana (/serverless/metrics).'); expect(response.logs.info).to.be.empty; - expect(warn).to.deep.equal([ + expect(response.logs.warn).to.deep.equal([ 'INSTANA_DISABLE_CA_CHECK is set, which means that the server certificate will not be verified against the ' + 'list of known CAs. This makes your service vulnerable to MITM attacks when connecting to Instana. This ' + 'setting should never be used in production, unless you use our on-premises product and are unable to ' +