diff --git a/app/controllers/Logkafka.scala b/app/controllers/Logkafka.scala index dbecd01a..abc97478 100644 --- a/app/controllers/Logkafka.scala +++ b/app/controllers/Logkafka.scala @@ -129,6 +129,8 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana LogkafkaNewConfigs.configMaps(Kafka_3_1_1).map{case(k,v) => LKConfig(k,Some(v))}.toList) val kafka_3_2_0_Default = CreateLogkafka("","", LogkafkaNewConfigs.configMaps(Kafka_3_2_0).map{case(k,v) => LKConfig(k,Some(v))}.toList) + val kafka_3_3_0_Default = CreateLogkafka("","", + LogkafkaNewConfigs.configMaps(Kafka_3_3_0).map{case(k,v) => LKConfig(k,Some(v))}.toList) val defaultCreateForm = Form( mapping( @@ -205,6 +207,7 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana case Kafka_3_1_0 => (defaultCreateForm.fill(kafka_3_1_0_Default), clusterContext) case Kafka_3_1_1 => (defaultCreateForm.fill(kafka_3_1_1_Default), clusterContext) case Kafka_3_2_0 => (defaultCreateForm.fill(kafka_3_2_0_Default), clusterContext) + case Kafka_3_3_0 => (defaultCreateForm.fill(kafka_3_3_0_Default), clusterContext) } } } @@ -325,6 +328,7 @@ class Logkafka (val cc: ControllerComponents, val kafkaManagerContext: KafkaMana case Kafka_3_1_0 => LogkafkaNewConfigs.configNames(Kafka_3_1_0).map(n => (n,LKConfig(n,None))).toMap case Kafka_3_1_1 => LogkafkaNewConfigs.configNames(Kafka_3_1_1).map(n => (n,LKConfig(n,None))).toMap case Kafka_3_2_0 => LogkafkaNewConfigs.configNames(Kafka_3_2_0).map(n => (n,LKConfig(n,None))).toMap + case Kafka_3_3_0 => LogkafkaNewConfigs.configNames(Kafka_3_3_0).map(n => (n,LKConfig(n,None))).toMap } val identityOption = li.identityMap.get(log_path) if (identityOption.isDefined) { diff --git a/app/controllers/Topic.scala b/app/controllers/Topic.scala index dae0abbb..0ce42d55 100644 --- a/app/controllers/Topic.scala +++ b/app/controllers/Topic.scala @@ -83,6 +83,7 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager val kafka_3_1_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_3_1_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList) val kafka_3_1_1_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_3_1_1).map{ case (n, h) => TConfig(n,None,Option(h))}.toList) val kafka_3_2_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_3_2_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList) + val kafka_3_3_0_Default = CreateTopic("",1,1,TopicConfigs.configNamesAndDoc(Kafka_3_3_0).map{ case (n, h) => TConfig(n,None,Option(h))}.toList) val defaultCreateForm = Form( mapping( @@ -201,6 +202,7 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager case Kafka_3_1_0 => (defaultCreateForm.fill(kafka_3_1_0_Default), clusterContext) case Kafka_3_1_1 => (defaultCreateForm.fill(kafka_3_1_1_Default), clusterContext) case Kafka_3_2_0 => (defaultCreateForm.fill(kafka_3_2_0_Default), clusterContext) + case Kafka_3_3_0 => (defaultCreateForm.fill(kafka_3_3_0_Default), clusterContext) } } } @@ -467,6 +469,7 @@ class Topic (val cc: ControllerComponents, val kafkaManagerContext: KafkaManager case Kafka_3_1_0 => TopicConfigs.configNamesAndDoc(Kafka_3_1_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) } case Kafka_3_1_1 => TopicConfigs.configNamesAndDoc(Kafka_3_1_1).map { case (n, h) => (n,TConfig(n,None, Option(h))) } case Kafka_3_2_0 => TopicConfigs.configNamesAndDoc(Kafka_3_2_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) } + case Kafka_3_3_0 => TopicConfigs.configNamesAndDoc(Kafka_3_3_0).map { case (n, h) => (n,TConfig(n,None, Option(h))) } } val updatedConfigMap = ti.config.toMap val updatedConfigList = defaultConfigs.map { diff --git a/app/kafka/manager/actor/cluster/KafkaStateActor.scala b/app/kafka/manager/actor/cluster/KafkaStateActor.scala index 4a240c40..ac7de4c7 100644 --- a/app/kafka/manager/actor/cluster/KafkaStateActor.scala +++ b/app/kafka/manager/actor/cluster/KafkaStateActor.scala @@ -186,7 +186,7 @@ class KafkaAdminClient(context: => ActorContext, adminClientActorPath: ActorPath object KafkaManagedOffsetCache { - val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1, Kafka_0_10_1_0, Kafka_0_10_1_1, Kafka_0_10_2_0, Kafka_0_10_2_1, Kafka_0_11_0_0, Kafka_0_11_0_2, Kafka_1_0_0, Kafka_1_0_1, Kafka_1_1_0, Kafka_1_1_1, Kafka_2_0_0, Kafka_2_1_0, Kafka_2_1_1, Kafka_2_2_0, Kafka_2_2_1, Kafka_2_2_2, Kafka_2_3_0, Kafka_2_2_1, Kafka_2_4_0, Kafka_2_4_1, Kafka_2_5_0, Kafka_2_5_1, Kafka_2_6_0, Kafka_2_7_0, Kafka_2_8_0, Kafka_2_8_1, Kafka_3_0_0, Kafka_3_1_0, Kafka_3_1_1, Kafka_3_2_0) + val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1, Kafka_0_10_1_0, Kafka_0_10_1_1, Kafka_0_10_2_0, Kafka_0_10_2_1, Kafka_0_11_0_0, Kafka_0_11_0_2, Kafka_1_0_0, Kafka_1_0_1, Kafka_1_1_0, Kafka_1_1_1, Kafka_2_0_0, Kafka_2_1_0, Kafka_2_1_1, Kafka_2_2_0, Kafka_2_2_1, Kafka_2_2_2, Kafka_2_3_0, Kafka_2_2_1, Kafka_2_4_0, Kafka_2_4_1, Kafka_2_5_0, Kafka_2_5_1, Kafka_2_6_0, Kafka_2_7_0, Kafka_2_8_0, Kafka_2_8_1, Kafka_3_0_0, Kafka_3_1_0, Kafka_3_1_1, Kafka_3_2_0, Kafka_3_3_0) val ConsumerOffsetTopic = "__consumer_offsets" def isSupported(version: KafkaVersion) : Boolean = { diff --git a/app/kafka/manager/model/model.scala b/app/kafka/manager/model/model.scala index 01cb2c85..99a0a80d 100644 --- a/app/kafka/manager/model/model.scala +++ b/app/kafka/manager/model/model.scala @@ -164,6 +164,10 @@ case object Kafka_3_2_0 extends KafkaVersion { override def toString = "3.2.0" } +case object Kafka_3_3_0 extends KafkaVersion { + override def toString = "3.3.0" +} + object KafkaVersion { val supportedVersions: Map[String,KafkaVersion] = Map( "0.8.1.1" -> Kafka_0_8_1_1, @@ -204,7 +208,8 @@ object KafkaVersion { "3.0.0" -> Kafka_3_0_0, "3.1.0" -> Kafka_3_1_0, "3.1.1" -> Kafka_3_1_1, - "3.2.0" -> Kafka_3_2_0 + "3.2.0" -> Kafka_3_2_0, + "3.3.0" -> Kafka_3_3_0 ) val formSelectList : IndexedSeq[(String,String)] = supportedVersions.toIndexedSeq.filterNot(_._1.contains("beta")).map(t => (t._1,t._2.toString)).sortWith((a, b) => sortVersion(a._1, b._1)) diff --git a/app/kafka/manager/utils/LogkafkaNewConfigs.scala b/app/kafka/manager/utils/LogkafkaNewConfigs.scala index 75e1efb6..7f09b90c 100644 --- a/app/kafka/manager/utils/LogkafkaNewConfigs.scala +++ b/app/kafka/manager/utils/LogkafkaNewConfigs.scala @@ -55,7 +55,8 @@ object LogkafkaNewConfigs { Kafka_3_0_0 -> logkafka82.LogConfig, Kafka_3_1_0 -> logkafka82.LogConfig, Kafka_3_1_1 -> logkafka82.LogConfig, - Kafka_3_2_0 -> logkafka82.LogConfig + Kafka_3_2_0 -> logkafka82.LogConfig, + Kafka_3_3_0 -> logkafka82.LogConfig ) def configNames(version: KafkaVersion) : Set[String] = { diff --git a/app/kafka/manager/utils/TopicConfigs.scala b/app/kafka/manager/utils/TopicConfigs.scala index 54739ec7..4c575836 100644 --- a/app/kafka/manager/utils/TopicConfigs.scala +++ b/app/kafka/manager/utils/TopicConfigs.scala @@ -58,7 +58,8 @@ object TopicConfigs { Kafka_3_0_0 -> two40.LogConfig, Kafka_3_1_0 -> two40.LogConfig, Kafka_3_1_1 -> two40.LogConfig, - Kafka_3_2_0 -> two40.LogConfig + Kafka_3_2_0 -> two40.LogConfig, + Kafka_3_3_0 -> two40.LogConfig ) def configNames(version: KafkaVersion): Seq[String] = { diff --git a/build.sbt b/build.sbt index 0ec18d26..b30e62d9 100644 --- a/build.sbt +++ b/build.sbt @@ -5,7 +5,7 @@ name := """cmak""" /* For packaging purposes, -SNAPSHOT MUST contain a digit */ -version := "3.0.0.7" +version := "3.0.0.8" scalaVersion := "2.12.10" diff --git a/test/kafka/manager/TestKafkaManagerActor.scala b/test/kafka/manager/TestKafkaManagerActor.scala index 3afc0adb..92fd2481 100644 --- a/test/kafka/manager/TestKafkaManagerActor.scala +++ b/test/kafka/manager/TestKafkaManagerActor.scala @@ -69,7 +69,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { } test("add cluster") { - val cc = ClusterConfig("dev","3.2.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) + val cc = ClusterConfig("dev","3.3.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) withKafkaManagerActor(KMAddCluster(cc)) { result: KMCommandResult => result.result.get Thread.sleep(1000) @@ -80,7 +80,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { } test("update cluster zkhost") { - val cc2 = ClusterConfig("dev","3.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) + val cc2 = ClusterConfig("dev","3.3.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult => result.result.get Thread.sleep(3000) @@ -112,7 +112,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { } test("update cluster version") { - val cc2 = ClusterConfig("dev","3.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) + val cc2 = ClusterConfig("dev","3.3.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult => result.result.get Thread.sleep(3000) @@ -139,7 +139,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { println(result) result.msg.contains("dev") } - val cc2 = ClusterConfig("dev","3.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) + val cc2 = ClusterConfig("dev","3.3.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) withKafkaManagerActor(KMAddCluster(cc2)) { result: KMCommandResult => result.result.get Thread.sleep(1000) @@ -156,7 +156,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { } test("update cluster logkafka enabled") { - val cc2 = ClusterConfig("dev","3.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) + val cc2 = ClusterConfig("dev","3.3.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None) withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult => result.result.get Thread.sleep(3000) @@ -168,7 +168,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { test("update cluster tuning") { val newTuning = getClusterTuning(3, 101, 11, 10000, 10000, 1) - val cc2 = ClusterConfig("dev","3.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, + val cc2 = ClusterConfig("dev","3.3.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(newTuning), securityProtocol="PLAINTEXT", saslMechanism = None, jaasConfig = None ) withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult => @@ -185,7 +185,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest { } test("update cluster security protocol") { - val cc2 = ClusterConfig("dev","3.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah")) + val cc2 = ClusterConfig("dev","3.3.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah")) withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult => result.result.get Thread.sleep(3000) diff --git a/test/kafka/manager/model/KafkaVersionTest.scala b/test/kafka/manager/model/KafkaVersionTest.scala index beab9330..14d743cb 100644 --- a/test/kafka/manager/model/KafkaVersionTest.scala +++ b/test/kafka/manager/model/KafkaVersionTest.scala @@ -50,7 +50,8 @@ class KafkaVersionTest extends FunSuite { "3.0.0" -> Kafka_3_0_0, "3.1.0" -> Kafka_3_1_0, "3.1.1" -> Kafka_3_1_1, - "3.2.0" -> Kafka_3_2_0 + "3.2.0" -> Kafka_3_2_0, + "3.3.0" -> Kafka_3_3_0 ) test("apply method: supported version.") { @@ -107,7 +108,8 @@ class KafkaVersionTest extends FunSuite { ("3.0.0","3.0.0"), ("3.1.0","3.1.0"), ("3.1.1","3.1.1"), - ("3.2.0","3.2.0") + ("3.2.0","3.2.0"), + ("3.3.0","3.3.0") ) assertResult(expected)(KafkaVersion.formSelectList) } diff --git a/test/kafka/manager/utils/TestClusterConfig.scala b/test/kafka/manager/utils/TestClusterConfig.scala index 1f5c80bf..63acaebd 100644 --- a/test/kafka/manager/utils/TestClusterConfig.scala +++ b/test/kafka/manager/utils/TestClusterConfig.scala @@ -356,4 +356,12 @@ class TestClusterConfig extends FunSuite with Matchers { assert(deserialize.isSuccess === true) assert(cc == deserialize.get) } + + test("serialize and deserialize 3.3.0") { + val cc = ClusterConfig("qa", "3.3.0", "localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = None, securityProtocol = "SASL_PLAINTEXT", saslMechanism = Option("PLAIN"), jaasConfig = Option("blah")) + val serialize: String = ClusterConfig.serialize(cc) + val deserialize = ClusterConfig.deserialize(serialize) + assert(deserialize.isSuccess === true) + assert(cc == deserialize.get) + } }