Skip to content
This repository has been archived by the owner on Jun 10, 2022. It is now read-only.

bugfix, issue141,179: lost message when consumer restart #188

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions src/Consumer/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -644,21 +644,22 @@ public function succFetch(array $result, int $fd): void
continue;
}

$offset = $assign->getConsumerOffset($topic['topicName'], $part['partition']);

if ($offset === null) {
$consumerOffset = $assign->getConsumerOffset($topic['topicName'], $part['partition']);
if ($consumerOffset === null) {
return; // current is rejoin....
}

$commitOffset = $consumerOffset - 1;
foreach ($part['messages'] as $message) {
$this->messages[$topic['topicName']][$part['partition']][] = $message;

$offset = $message['offset'];
$commitOffset = $message['offset'];
}

$consumerOffset = ($part['highwaterMarkOffset'] > $offset) ? ($offset + 1) : $offset;
$consumerOffset = $commitOffset + 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here shouldn't + 1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

becase the returned offset is the offsetManager stored offset commited blow this groupId

Copy link
Author

@chongchaoyu chongchaoyu Feb 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@noname007 consumer offset is equal to commit offset + 1, so I add 1 to the commit offset, and consumer will fetch the next message. If you don't add 1 to commit offset, you will fetch the same message committed last time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you said right but the pos is wrong ref #189

Copy link
Contributor

@noname007 noname007 Feb 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i use your code but i still lost message, #186 is the condition i meet,but the test case i could not let run as i meet the position......

Copy link
Contributor

@noname007 noname007 Mar 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chongchaoyu https://github.com/weiboad/kafka-php/pull/186/files#diff-abf40442ee37f59d8941040a616cf543R121 这个就是浮现问题的脚本,但是放到测试里面,没有想好怎么设置断言。

本地脚本运行方式:
直接抛异常,然后再消费,观察日志就能发现,数据就丢失了一条,从日志里面也可以看到offset 错乱的问题。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@noname007 I understand that English is not your native language (it's also not mine), but I'd really love if I don't have to translate comments in order to help people...

Copy link
Contributor

@noname007 noname007 Mar 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i said nothing.....just say the script run mode in my machine that could cause the problem:

throw a exception after it run ,and run again ,and will find the offset wrong in the log file .

but the test case i write was not run as i thought because i could write the right assert statement.

because i found he is a Chinese man in the QQ group so i select our native lang to talk... sorry for that.

en , did you use QQ international ? i can pull you into the group hhhhha 😆

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because i found he is a Chinese man in the QQ group so i select our native lang to talk... sorry for that.

No worries, it happens 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lcobucci here is my run scripts coditions. i use the crontable to check the process job active or not ,if it is not then pull up by crobtable


$assign->setConsumerOffset($topic['topicName'], $part['partition'], $consumerOffset);
$assign->setCommitOffset($topic['topicName'], $part['partition'], $offset);
$assign->setCommitOffset($topic['topicName'], $part['partition'], $commitOffset);
}
}

Expand Down