Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race condition between ReplicaFetcherThread and RemotePartitionMetadataStore. #33

Closed
HenryCaiHaiying opened this issue Jun 27, 2023 · 10 comments
Assignees
Labels
backport upstream Backport features that are in upstream bug Stale tiered storage Issues related to Tiered Storage

Comments

@HenryCaiHaiying
Copy link

  • Have a cluster with 2 kafka nodes. Create a topic with replication factor of 2. Shutdown broker 2.
  • Produce some data to the topic and wait for the local segment file deletion finished.
  • Bring back broker 2. When broker 2 try to fetch data from segment 1 and handle offset moved to remote storage, the following exception was hit because the idToRemoteLogMetadataCache in RemotePartitionMetadataStore hasn't been populated yet:

[2023-06-26 22:11:04,980] ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error building remote log auxiliary state for topic4-0 (kafka.server.ReplicaFetcherThread)
org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: No resource found for partition: IWXjwq-vSsS8cu9DkGj6EQ:topic4-0

@HenryCaiHaiying
Copy link
Author

@HenryCaiHaiying
Copy link
Author

HenryCaiHaiying commented Jun 27, 2023

Stack trace of the error:

[2023-06-26 22:11:04,980] ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error building remote log auxiliary state for topic4-0 (kafka.server.ReplicaFetcherThread)
org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: No resource found for partition: IWXjwq-vSsS8cu9DkGj6EQ:topic4-0
	at org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.getRemoteLogMetadataCache(RemotePartitionMetadataStore.java:152)
	at org.apache.kafka.server.log.remote.metadata.storage.RemotePartitionMetadataStore.remoteLogSegmentMetadata(RemotePartitionMetadataStore.java:164)
	at org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.remoteLogSegmentMetadata(TopicBasedRemoteLogMetadataManager.java:211)
	at kafka.log.remote.RemoteLogManager.fetchRemoteLogSegmentMetadata(RemoteLogManager.scala:790)
	at kafka.server.ReplicaFetcherThread.$anonfun$buildRemoteLogAuxState$2(ReplicaFetcherThread.scala:192)
	at kafka.server.ReplicaFetcherThread.$anonfun$buildRemoteLogAuxState$2$adapted(ReplicaFetcherThread.scala:188)
	at scala.Option.foreach(Option.scala:437)
	at kafka.server.ReplicaFetcherThread.$anonfun$buildRemoteLogAuxState$1(ReplicaFetcherThread.scala:188)
	at kafka.server.ReplicaFetcherThread.$anonfun$buildRemoteLogAuxState$1$adapted(ReplicaFetcherThread.scala:186)
	at scala.Option.foreach(Option.scala:437)
	at kafka.server.ReplicaFetcherThread.buildRemoteLogAuxState(ReplicaFetcherThread.scala:186)
	at kafka.server.AbstractFetcherThread.$anonfun$fetchOffsetAndBuildRemoteLogAuxState$2(AbstractFetcherThread.scala:734)
	at kafka.server.AbstractFetcherThread.fetchOffsetAndApplyFun(AbstractFetcherThread.scala:707)
	at kafka.server.AbstractFetcherThread.fetchOffsetAndBuildRemoteLogAuxState(AbstractFetcherThread.scala:733)
	at kafka.server.AbstractFetcherThread.handleOffsetMovedToTieredStorage(AbstractFetcherThread.scala:748)
	at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:393)
	at scala.Option.foreach(Option.scala:437)
	at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:329)
	at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:328)
	at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
	at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359)
	at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355)
	at scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309)
	at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:328)
	at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:128)
	at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:127)
	at scala.Option.foreach(Option.scala:437)
	at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:127)
	at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:108)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

@HenryCaiHaiying
Copy link
Author

HenryCaiHaiying commented Jun 27, 2023

[2023-06-26 22:11:04,779] INFO Initializing the resources. (org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager)

[2023-06-26 22:11:04,935] INFO Received leadership notifications with leader partitions [] and follower partitions [bgi8NmhaSHKQnf0nuK7Eaw:topic3-0, IWXjwq-vSsS8cu9DkGj6EQ:topic4-0] (org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager)

[2023-06-26 22:11:04,978] INFO Initialized resources successfully. (org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager)

[2023-06-26 22:11:04,979] INFO Assign partitions: [bgi8NmhaSHKQnf0nuK7Eaw:topic3-0, IWXjwq-vSsS8cu9DkGj6EQ:topic4-0] (org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager)

[2023-06-26 22:11:04,980] ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error building remote log auxiliary state for topic4-0 (kafka.server.ReplicaFetcherThread)
org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException: No resource found for partition: IWXjwq-vSsS8cu9DkGj6EQ:topic4-0

[2023-06-26 22:11:05,025] INFO Received leadership notifications with leader partitions [27Fe4HEbQ4KUKGO6wupDqg:topic2-0] and follower partitions [] (org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager)

[2023-06-26 22:11:05,025] INFO Assign partitions: [27Fe4HEbQ4KUKGO6wupDqg:topic2-0] (org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager)

@ivanyu ivanyu added bug tiered storage Issues related to Tiered Storage labels Jun 29, 2023
@mdedetrich
Copy link

Thanks for reporting this. I am back from holidays so I am going to start looking into this.

@mdedetrich mdedetrich self-assigned this Jul 3, 2023
@jeqo
Copy link

jeqo commented Jul 10, 2023

Testing with upstream + retention PR, I can reproduce a similar issue:

[2023-07-10 12:32:37,873] ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error building remote log auxiliary state for t3-0 (kafka.server.ReplicaFetcherThread)
org.apache.kafka.server.log.remote.storage.RemoteStorageException: Couldn't build the state from remote store for partition: t3-0, currentLeaderEpoch: 1, leaderLocalLogStartOffset: 54522, leaderLogStartOffset: 0, epoch: 1as the previous remote log segment metadata was not found
        at kafka.server.ReplicaFetcherTierStateMachine.buildRemoteLogAuxState(ReplicaFetcherTierStateMachine.java:252)

and second broker is not able to replicate.

Steps to reproduce:

Using:

Create topic with RF=2

make rf=2 topic=t3 create_topic_ts_by_size
docker exec -e KAFKA_OPTS= kafka-ts \
        kafka-topics \
        --bootstrap-server kafka:29092 \
        --create \
        --config remote.storage.enable=true \
        --config segment.bytes=1000000  \
        --config retention.bytes=20000000  \
        --config local.retention.bytes=5000000  \
        --partitions 1 \
        --replication-factor 2 \
        --topic t3
Created topic t3.

Stop second broker:

dc stop kafka1
[+] Stopping 1/1
 ✔ Container demo-kafka1-1  Stopped                                                                           0.9s

Write to topic:

make topic=t3 fill_topic
docker exec -e KAFKA_OPTS= kafka-ts \
        kafka-producer-perf-test --producer-props bootstrap.servers=kafka:29092 \
        --topic t3 \
        --num-records 60000 \
        --record-size 1024 \
        --throughput 1000
5002 records sent, 1000.4 records/sec (0.98 MB/sec), 1.1 ms avg latency, 216.0 ms max latency.
5002 records sent, 1000.4 records/sec (0.98 MB/sec), 0.4 ms avg latency, 20.0 ms max latency.
5000 records sent, 1000.0 records/sec (0.98 MB/sec), 0.9 ms avg latency, 67.0 ms max latency.
5001 records sent, 1000.0 records/sec (0.98 MB/sec), 0.5 ms avg latency, 21.0 ms max latency.
5001 records sent, 1000.0 records/sec (0.98 MB/sec), 1.1 ms avg latency, 80.0 ms max latency.
5000 records sent, 1000.0 records/sec (0.98 MB/sec), 0.4 ms avg latency, 18.0 ms max latency.
5001 records sent, 1000.2 records/sec (0.98 MB/sec), 0.6 ms avg latency, 39.0 ms max latency.
5001 records sent, 1000.2 records/sec (0.98 MB/sec), 0.4 ms avg latency, 18.0 ms max latency.
5001 records sent, 1000.2 records/sec (0.98 MB/sec), 2.0 ms avg latency, 125.0 ms max latency.
5002 records sent, 1000.0 records/sec (0.98 MB/sec), 0.4 ms avg latency, 19.0 ms max latency.
5002 records sent, 1000.2 records/sec (0.98 MB/sec), 1.2 ms avg latency, 87.0 ms max latency.
60000 records sent, 999.900010 records/sec (0.98 MB/sec), 0.77 ms avg latency, 216.00 ms max latency, 0 ms 50th, 1 ms 95th, 16 ms 99th, 79 ms 99.9th.

Check directories:

Broker 0:

make topic=t3 kafka_container=kafka-ts show_local_data | grep '.log$'
-rw-r--r--. 1 appuser appuser 976K Jul 10 12:22 00000000000000054522.log
-rw-r--r--. 1 appuser appuser 975K Jul 10 12:22 00000000000000055464.log
-rw-r--r--. 1 appuser appuser 976K Jul 10 12:23 00000000000000056403.log
-rw-r--r--. 1 appuser appuser 976K Jul 10 12:23 00000000000000057343.log
-rw-r--r--. 1 appuser appuser 976K Jul 10 12:23 00000000000000058283.log
-rw-r--r--. 1 appuser appuser 807K Jul 10 12:23 00000000000000059223.log

Broker 1:

make topic=t3 kafka_container=demo-kafka1-1 show_local_data | grep '.log$'
Error response from daemon: Container 00cbed84d7a8a46e0f6610e76c5d7dedd4b57a3016549aec6a73d1bdc8e15452 is not running

Then, start Broker 1, observe logs and directory:

[2023-07-10 12:32:37,873] ERROR [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error building remote log auxiliary state for t3-0 (kafka.server.ReplicaFetcherThread)
org.apache.kafka.server.log.remote.storage.RemoteStorageException: Couldn't build the state from remote store for partition: t3-0, currentLeaderEpoch: 1, leaderLocalLogStartOffset: 54522, leaderLogStartOffset: 0, epoch: 1as the previous remote log segment metadata was not found
        at kafka.server.ReplicaFetcherTierStateMachine.buildRemoteLogAuxState(ReplicaFetcherTierStateMachine.java:252)
        at kafka.server.ReplicaFetcherTierStateMachine.start(ReplicaFetcherTierStateMachine.java:102)
        at kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:761)
        at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:412)
        at scala.Option.foreach(Option.scala:437)
        at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:332)
        at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:331)
        at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
        at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:407)
        at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:403)
        at scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:321)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:331)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
        at scala.Option.foreach(Option.scala:437)
        at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
        at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
        at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:130)

@jeqo
Copy link

jeqo commented Jul 11, 2023

Update: there seem to be a misconfiguration on my environment (upstream) that lead to this issue.

User topic was configured with RF of 2, but internal topics were not, TBRLMM was complaining:

kafka-ts  | [2023-07-11 08:20:58,884] ERROR [Controller id=0 epoch=1] Controller 0 epoch 1 failed to change state for partition __remote_log_metadata-19 from OfflinePartition to OnlinePartition (state.change.logger)
kafka-ts  | [2023-07-11 08:20:58,884] ERROR [Controller id=0 epoch=1] Controller 0 epoch 1 failed to change state for partition __remote_log_metadata-15 from OfflinePartition to OnlinePartition (state.change.logger)
kafka-ts  | [2023-07-11 08:20:58,884] ERROR [Controller id=0 epoch=1] Controller 0 epoch 1 failed to change state for partition __remote_log_metadata-29 from OfflinePartition to OnlinePartition (state.change.logger)
kafka-ts  | [2023-07-11 08:20:58,884] ERROR [Controller id=0 epoch=1] Controller 0 epoch 1 failed to change state for partition __remote_log_metadata-9 from OfflinePartition to OnlinePartition (state.change.logger)
kafka-ts  | [2023-07-11 08:20:58,884] ERROR [Controller id=0 epoch=1] Controller 0 epoch 1 failed to change state for partition __remote_log_metadata-45 from OfflinePartition to OnlinePartition (state.change.logger)
kafka-ts  | [2023-07-11 08:20:58,887] ERROR [Controller id=0 epoch=1] Controller 0 epoch 1 failed to change state for partition __remote_log_metadata-35 from OfflinePartition to OnlinePartition (state.change.logger)
kafka-ts  | [2023-07-11 08:20:58,888] ERROR [Controller id=0 epoch=1] Controller 0 epoch 1 failed to change state for partition __remote_log_metadata-17 from OfflinePartition to OnlinePartition (state.change.logger)
kafka-ts  | [2023-07-11 08:20:58,888] ERROR [Controller id=0 epoch=1] Controller 0 epoch 1 failed to change state for partition __remote_log_metadata-31 from OfflinePartition to OnlinePartition (state.change.logger)
kafka-ts  | [2023-07-11 08:20:58,888] ERROR [Controller id=0 epoch=1] Controller 0 epoch 1 failed to change state for partition __remote_log_metadata-13 from OfflinePartition to OnlinePartition (state.change.logger)
kafka-ts  | [2023-07-11 08:20:58,888] ERROR [Controller id=0 epoch=1] Controller 0 epoch 1 failed to change state for partition __remote_log_metadata-25 from OfflinePartition to OnlinePartition (state.change.logger)
kafka-ts  | [2023-07-11 08:20:58,888] ERROR [Controller id=0 epoch=1] Controller 0 epoch 1 failed to change state for partition __remote_log_metadata-47 from OfflinePartition to OnlinePartition (state.change.logger)

By applying this fix (Aiven-Open/tiered-storage-for-apache-kafka@2a36ac0) I haven't been able to reproduce this issue, and replication started to work as expected:

Screencast.from.2023-07-11.11-42-48.webm

There's still more scenarios to test about replication in general, but wanted to note here that at least this scenario is not reproducible with upstream. @HenryCaiHaiying could you confirm if the internal topics (__remote_log_metadata specifically) have the proper replication factor on your environment?

@jeqo
Copy link

jeqo commented Jul 17, 2023

Please, ignore my latest comments. The race condition seems to be possible in upstream as well -- following up in this issue: https://issues.apache.org/jira/browse/KAFKA-15181

@mdedetrich
Copy link

Upstream PR at apache#14012

@mdedetrich mdedetrich added the backport upstream Backport features that are in upstream label Jul 20, 2023
@HenryCaiHaiying
Copy link
Author

I was able to reproduce the similar ERROR:

org.apache.kafka.server.log.remote.storage.RemoteStorageException: Couldn't build the state from remote store for partition: t3-0, currentLeaderEpoch: 1, leaderLocalLogStartOffset: 54522, leaderLogStartOffset: 0, epoch: 1as the previous remote log segment metadata was not found

Under this test setup: Kafka node-3 died, launched a replacement broker: node-4 and reassign the partitions from node-3 to node-4. Node-4 broker starts up prints out the above ERROR repeatedly. In my test up, the topic has a shorter retention.ms with only a few hours (for both local and remote), because there was no new incoming data to the topic, all remote segments were deleted. When PrimaryConsumerTask is handling DELETE_SEGMENT_FINISHED metadata event, it removes idToSegmentMetadata for the given segment in RemoteLogMetadataCache. This caused the lookup failed in the following line in TopicBasedRemoteLogMetadataManager:

  override protected def buildRemoteLogAuxState(partition: TopicPartition,
                                                currentLeaderEpoch: Int,
                                                leaderLocalLogStartOffset: Long,
                                                leaderLogStartOffset: Long): Unit = {
    replicaMgr.localLog(partition).foreach { log =>
      if (log.remoteLogEnabled()) {
        replicaMgr.remoteLogManager.foreach { rlm =>
          var rlsMetadata: Optional[RemoteLogSegmentMetadata] = Optional.empty()
          val epoch = log.leaderEpochCache.flatMap(cache => cache.epochForOffset(leaderLocalLogStartOffset))
          if (epoch.isDefined) {
            rlsMetadata = rlm.fetchRemoteLogSegmentMetadata(partition, epoch.get, leaderLocalLogStartOffset)

Not sure about the best way to fix this, maybe keeping remote log segment retention really long? Or always keep the last segment in RemoteLogMetadataCache?

@mdedetrich
Copy link

New upstream bug is at apache#14127

@github-actions github-actions bot added the Stale label Nov 7, 2023
@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Sep 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport upstream Backport features that are in upstream bug Stale tiered storage Issues related to Tiered Storage
Projects
None yet
Development

No branches or pull requests

4 participants