diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala index 6215e35d38ea6..0ce6471023943 100644 --- a/core/src/main/scala/kafka/tools/StorageTool.scala +++ b/core/src/main/scala/kafka/tools/StorageTool.scala @@ -124,6 +124,10 @@ object StorageTool extends Logging { case None => Option(config.originals.get(ReplicationConfigs.INTER_BROKER_PROTOCOL_VERSION_CONFIG)). foreach(v => formatter.setReleaseVersion(MetadataVersion.fromVersionString(v.toString))) } + Option(namespace.getList[String]("feature")).foreach( + featureNamesAndLevels(_).foreach { + kv => formatter.setFeatureLevel(kv._1, kv._2) + }) Option(namespace.getString("initial_controllers")). foreach(v => formatter.setInitialControllers(DynamicVoters.parse(v))) if (namespace.getBoolean("standalone")) { @@ -311,4 +315,28 @@ object StorageTool extends Logging { } } } + + def parseNameAndLevel(input: String): (String, java.lang.Short) = { + val equalsIndex = input.indexOf("=") + if (equalsIndex < 0) + throw new RuntimeException("Can't parse feature=level string " + input + ": equals sign not found.") + val name = input.substring(0, equalsIndex).trim + val levelString = input.substring(equalsIndex + 1).trim + try { + (name, levelString.toShort) + } catch { + case _: Throwable => + throw new RuntimeException("Can't parse feature=level string " + input + ": " + "unable to parse " + levelString + " as a short.") + } + } + + def featureNamesAndLevels(features: java.util.List[String]): Map[String, java.lang.Short] = { + val scalaFeatures = new mutable.ArrayBuffer[String] + features.forEach(scalaFeatures += _) + scalaFeatures.map { (feature: String) => + // Ensure the feature exists + val nameAndLevel = parseNameAndLevel(feature) + (nameAndLevel._1, nameAndLevel._2) + }.toMap + } } diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index 42ee07c50eac1..139d752c77dd2 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -329,6 +329,31 @@ Found problem: "Failed to find content in output: " + stream.toString()) } + @Test + def testFormatWithInvalidFeature(): Unit = { + val availableDirs = Seq(TestUtils.tempDir()) + val properties = new Properties() + properties.putAll(defaultStaticQuorumProperties) + properties.setProperty("log.dirs", availableDirs.mkString(",")) + assertEquals("Unsupported feature: non.existent.feature. Supported features are: " + + "group.version, kraft.version, transaction.version", + assertThrows(classOf[FormatterException], () => + runFormatCommand(new ByteArrayOutputStream(), properties, + Seq("--feature", "non.existent.feature=20"))).getMessage) + } + + @Test + def testFormatWithInvalidKRaftVersionLevel(): Unit = { + val availableDirs = Seq(TestUtils.tempDir()) + val properties = new Properties() + properties.putAll(defaultDynamicQuorumProperties) + properties.setProperty("log.dirs", availableDirs.mkString(",")) + assertEquals("No feature:kraft.version with feature level 999", + assertThrows(classOf[IllegalArgumentException], () => + runFormatCommand(new ByteArrayOutputStream(), properties, + Seq("--feature", "kraft.version=999", "--standalone"))).getMessage) + } + @Test def testFormatWithReleaseVersionAndKRaftVersion(): Unit = { val availableDirs = Seq(TestUtils.tempDir()) @@ -532,4 +557,25 @@ Found problem: "SCRAM is only supported in metadata.version 3.5-IV2 or later.", assertThrows(classOf[FormatterException], () => runFormatCommand(stream, properties, arguments.toSeq)).getMessage) } + + @Test + def testParseNameAndLevel(): Unit = { + assertEquals(("foo.bar", 56.toShort), StorageTool.parseNameAndLevel("foo.bar=56")) + } + + @Test + def testParseNameAndLevelWithNoEquals(): Unit = { + assertEquals("Can't parse feature=level string kraft.version5: equals sign not found.", + assertThrows(classOf[RuntimeException], + () => StorageTool.parseNameAndLevel("kraft.version5")). + getMessage) + } + + @Test + def testParseNameAndLevelWithNoNumber(): Unit = { + assertEquals("Can't parse feature=level string kraft.version=foo: unable to parse foo as a short.", + assertThrows(classOf[RuntimeException], + () => StorageTool.parseNameAndLevel("kraft.version=foo")). + getMessage) + } }