Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClose so that it blocks if not initialised #12

Open
mdedetrich opened this issue Apr 13, 2023 · 4 comments
Assignees
Labels
enhancement Stale tiered storage Issues related to Tiered Storage

Comments

@mdedetrich
Copy link

mdedetrich commented Apr 13, 2023

After discussion with @gharris1727 and investigation with current implementation of tiered storage and existing problems with the TieredStorageTestHarness, we came to the conclusion that the current design of TopicBasedRemoteLogMetadataManager has a flaw when it comes to initialisation.

For various legitimate reasons, the TopicBasedRemoteLogMetadataManager.configure method is asynchronous. While this is acceptable what is problematic is that other methods that need to be implemented in this interface call TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClose as a check which throws an exception if its not initialized.

Rather than TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClose throwing an IllegalStateException we instead should be blocking the method until initialisation has occurred (i.e. via the use of a CountDownLatch). Note that throwing an IllegalStateException if TopicBasedRemoteLogMetadataManager is currently being closed is legitimate, its initialisation specifically thats the issue.

This issue is inadvertently causing TS tests (via the TieredStorageTestHarness) to fail in https://github.com/aiven/kafka/tree/3.3-2022-10-06-tiered-storage branch because the test harness runs before the initialisation has occurred.

Note that upstream also has this issue (see https://github.com/apache/kafka/blob/34d56dc8d00bd27955eb9bb6ac01d5ae7f134dbd/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java#L492-L501)

@mdedetrich mdedetrich added the tiered storage Issues related to Tiered Storage label Apr 13, 2023
@jeqo
Copy link

jeqo commented Apr 17, 2023

@mdedetrich could we add some stack traces to validate how related are the failures?
In my case it's failing on consumption:

org.opentest4j.AssertionFailedError: Could not consume 3 records of topicA-1 from offset 0 in 60000 ms. 0 message(s) consumed:

	at app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:39)
	at app//org.junit.jupiter.api.Assertions.fail(Assertions.java:134)
	at app//kafka.utils.TestUtils$.pollRecordsUntilTrue(TestUtils.scala:1061)
	at app//kafka.tiered.storage.TieredStorageTestContext.consume(TieredStorageTestContext.scala:151)
	at app//kafka.tiered.storage.ProduceAction.doExecute(TieredStorageTestSpec.scala:282)
	at app//kafka.tiered.storage.TieredStorageTestAction.execute(TieredStorageTestSpec.scala:110)
	at app//kafka.tiered.storage.TieredStorageTestAction.execute$(TieredStorageTestSpec.scala:108)
	at app//kafka.tiered.storage.ProduceAction.execute(TieredStorageTestSpec.scala:216)
	at app//kafka.tiered.storage.TieredStorageTestHarness.$anonfun$executeTieredStorageTest$3(TieredStorageTestHarness.scala:125)
	at app//kafka.tiered.storage.TieredStorageTestHarness.$anonfun$executeTieredStorageTest$3$adapted(TieredStorageTestHarness.scala:125)

My impression is that TopicBasedRemoteLogMetadataManager may not be the root cause of issues on 3.3, but the fetching logic as tests are able to get to the consumption part and then fail.

I have tried adding the following wait logic to TieredStorageTestHarness and still fails:

abstract class TieredStorageTestHarness extends IntegrationTestHarness {
//...
  @BeforeEach
  override def setUp(testInfo: TestInfo): Unit = {
    super.setUp(testInfo)
    contextOpt = Some(new TieredStorageTestContext(zkClient, servers, producerConfig, consumerConfig, securityProtocol))
    waitForRLMMInitialization(servers)
  }
//...
  def waitForRLMMInitialization(brokers: mutable.Buffer[KafkaServer]): Unit = {
    while (true) {
      println("Waiting for RLMM to initialize")
      val ready = brokers.map(_.remoteLogManager.remoteLogMetadataManager.asInstanceOf[TopicBasedRemoteLogMetadataManager])
        .forall(_.isInitialized)
      if (ready) return
      Thread.sleep(5000)
    }
  }

//...
}

@mdedetrich
Copy link
Author

So there are multiple problems at hand here. One of them is the asynchronous initialization (which is what this github issue is referencing) and another problem is what you are pertaining to, i.e. some logic bug in fetch/consumer consumption.

@jeqo Do you want to make another github issue specifically regarding the consumer/fetch problem which you can focus on.

@jeqo
Copy link

jeqo commented Apr 17, 2023

Sure! #15

@mdedetrich
Copy link
Author

Upstream PR created at apache#13689

@github-actions github-actions bot added the Stale label Aug 7, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Stale tiered storage Issues related to Tiered Storage
Projects
None yet
Development

No branches or pull requests

2 participants