Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Segfault when disconnecting right after consuming #1057

Open
abarisain opened this issue Nov 29, 2023 · 3 comments
Open

Segfault when disconnecting right after consuming #1057

abarisain opened this issue Nov 29, 2023 · 3 comments

Comments

@abarisain
Copy link

Environment Information

  • OS [e.g. Mac, Arch, Windows 10]: macOS 14, M2 Pro
  • Node Version [e.g. 8.2.1]: 20, reproducible on 18 too
  • NPM Version [e.g. 5.4.2]: 10.2.3
  • C++ Toolchain [e.g. Visual Studio, llvm, g++]: llvm
  • node-rdkafka version [e.g. 2.3.3]: 2.8.0

Steps to Reproduce

Hello,

I'm writing automated tests with playwright and need to access kafka from them. This implies connecting and disconnecting many times from multiple brokers during the life of a process.

My consumer wrapper returns a promise that resolves once a message has been consumed on any partition. Before resolving, I disconnect from the brokers as the connection will not be reused.

When I try consuming multiple topics from multiple brokers at once and disconnect, I experience crashes:

PID 48415 received SIGSEGV for address: 0x0
0   segfault-handler.node               0x00000001084a0d48 _ZL16segfault_handleriP9__siginfoPv + 296
1   libsystem_platform.dylib            0x0000000181b9fa24 _sigtramp + 56
2   node-librdkafka.node                0x000000010850da0c _ZNK3Nan8Callback5Call_EPN2v87IsolateENS1_5LocalINS1_6ObjectEEEiPNS4_INS1_5ValueEEEPNS_13AsyncResourceE + 64
3   node-librdkafka.node                0x000000010850a05c _ZNK3Nan8Callback4CallEiPN2v85LocalINS1_5ValueEEE + 192
4   node-librdkafka.node                0x0000000108523ee8 _ZN9NodeKafka7Workers24KafkaConsumerConsumeLoop21HandleMessageCallbackEPN7RdKafka7MessageENS2_9ErrorCodeE + 700
5   node-librdkafka.node                0x000000010852679c _ZN9NodeKafka7Workers13MessageWorker11WorkMessageEv + 112
6   node                                0x000000010353c5b4 uv__async_io + 268
7   node                                0x000000010354e68c uv__io_poll + 1020
8   node                                0x000000010353cb78 uv_run + 476
9   node                                0x0000000102a9d754 _ZN4node21SpinEventLoopInternalEPNS_11EnvironmentE + 256
10  node                                0x0000000102bac8d8 _ZN4node16NodeMainInstance3RunEPNS_8ExitCodeEPNS_11EnvironmentE + 164
11  node                                0x0000000102bac674 _ZN4node16NodeMainInstance3RunEv + 124
12  node                                0x0000000102b37030 _ZN4node5StartEiPPc + 776
13  dyld                                0x00000001817f50e0 start + 2360
fish: Job 1, 'node index.js' terminated by signal SIGSEGV (Address boundary error)

(I can't seem to be able to get unmangled symbols but I'd be happy to help)

It's a tough one to reproduce, as it seems to only happen if you have resolve AND consumer.disconnect() in the consumer.on("message") callback. Moving stuff around seems to fix the issue, but unfortunately I can't really organize my code in another way, nor ask people who write tests to be careful about their sequence. It really looks like a race condition.
Pausing consumption or unassigning topics doesn't seem to work either.

I tried waiting for the "disconnected" event before resolving the promise to make sure, but that did not work either. Plus, it sometimes seem to fail to disconnect and hang. Pausing consumption didn't work either.

The only workaround I found is to tolerate a memory leak by going into node-rdkafka/src/kafka-consumer.cc and comment this out:

    // stop the consume loop
    consumeLoop->Close();

    // cleanup the async worker
    consumeLoop->WorkComplete();
    consumeLoop->Destroy();

Not ideal, but test processes don't stay up for long so it's alright.

I have not yet tried to reproduce this on a Linux computer (I will only be able to test on ARM linux) or on brokers that are not part of our development environment.

I did manage to isolate a small repro case outside of playwright. Pay no attention to most of startConsuming, which is there to assign myself all partitions of a topic. You might need to give it a couple of attempts as it will sometimes just work.

package.json:

{
  "name": "repro_kafka",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "author": "",
  "license": "ISC",
  "dependencies": {
    "node-rdkafka": "^2.18.0",
    "segfault-handler": "^1.3.0"
  }
}

index.js:

const SegfaultHandler = require("segfault-handler");
SegfaultHandler.registerHandler("crash.log");

const RDKafka = require("node-rdkafka");

const KAFKA_PORT = 9092;

function startConsuming(brokers, topic) {
    console.log("Starting to consume ", topic);
  const consumer = new RDKafka.KafkaConsumer(
    {
      "group.id": "public-api-tests",
      "metadata.broker.list": brokers.join(","),
      "enable.auto.commit": false,
      "socket.keepalive.enable": true,
      "queued.min.messages": 1,
    },
    {}
  );

  consumer.connect();

  return new Promise((resolve, reject) => {
    consumer.on("ready", () => {
      console.log(`Consumer ready for topic '${topic}'`);

      consumer.getMetadata(
        { topic: topic, allTopics: false },
        async (err, metadata) => {
          console.debug("Topic metadata", metadata);

          if (err) {
            reject(`Could not fetch metadata for topic '${topic}': ${err}`);
            return;
          }
          const partitions = metadata.topics
            .filter((t) => t.name === topic)[0]
            .partitions.map((partition) => {
              return { topic: topic, partition: partition.id };
            });

          if (partitions.length == 0) {
            reject(`Found 0 partitions to assign.`);
            return;
          }
          console.debug(
            `Assigning partitions [${partitions
              .map((a) => a.partition)
              .join(",")}]`
          );

          // Query for all of the offsets of all partitions
          const allOffsetQueries = partitions.map((a) => {
            return new Promise((resolve, reject) => {
              consumer.queryWatermarkOffsets(
                a.topic,
                a.partition,
                2000,
                (err, offsets) => {
                  if (err) {
                    reject(err);
                  } else {
                    console.debug(
                      "Assigning to partition",
                      a.partition,
                      "with offset",
                      offsets.highOffset
                    );
                    resolve({
                      topic: a.topic,
                      partition: a.partition,
                      offset: offsets.highOffset,
                    });
                  }
                }
              );
            });
          });

          const assignmentsWithOffsets = await Promise.all(allOffsetQueries);
          consumer.assign(assignmentsWithOffsets);
          
          consumer.on("disconnected", () => {
            console.log("Disconnected", topic);
          });

          consumer.consume((err, msg) => {
            console.debug(
              "Got kafka message callback. Err:",
              err,
              "message:",
              msg
            );
            consumer.disconnect();
            resolve(consumer);
            
          });

          console.debug("Kafka ready");
          
        }
      );
    });
  });
}

async function waitFor(milliseconds) {
  return new Promise((resolve) => {
    setTimeout(() => {
      resolve();
    }, milliseconds);
  });
}

async function produceMessage() {
    // Here I call an API that produces a message in both topics at the same time
}

async function test() {
    const firstConsumer = startConsuming([cluster1], "topic1");
    const secondConsumer = startConsuming([cluster2], "topic2");
 
    // Wait a bit for consumers to be connected. In real code this is handled much better
   await waitFor(1000);    

    await produceMessage();

    await firstConsumer;
    await secondConsumer;
}

test();
@constantind
Copy link

constantind commented Jun 6, 2024

The issue could be link to nodejs see pull: nodejs/node#48943

@abarisain
Copy link
Author

Nice, thanks. I'll check out node 21

@constantind
Copy link

constantind commented Jun 11, 2024

Actually node21 dont have nan support yet, on Linux in my case this here is called by uv_async_io: https://github.com/Blizzard/node-rdkafka/blob/master/src/workers.h#L156 then first HandleMessageCallback from WorkMessage, then HandleMessageCallback enters callback->Call but the callback is v8::PersistentBase with value nullptr, then in nan.h 1810 carries the null pointer to v8-local-handle.h => and crash. The race in libuv should be fixed in 20+ where node carries 1.45.0+(atomic load) but node 18 seems to be on earlier (busy wait spin) however it is not node issue. This is because of the worker.WorkComplete() added in kafka_consumer.cc/NodeDisconnect in 2.16.1 , can be reproduced on double disconnect or pause and disconnect, connect/disconnect at the same time.

Another note here is that if getMetadata fails it will call disconnect on its own here: https://github.com/Blizzard/node-rdkafka/blob/master/lib/client.js#L165 so if you call disconnect as well it will cause double disconnect or could be GC on javascript side, since v8::Persistent can be GC if the callback passed goes out of scope.

The fix would be to check (callback && !callback->IsEmpty()) here: https://github.com/Blizzard/node-rdkafka/blob/master/src/workers.cc#L770 as that can still run after worker->WorkComplete()

CC: @GaryWilber , @iradul

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants