From f46e2fcc4a249b2649bbf1af6bae3a3c8acf717f Mon Sep 17 00:00:00 2001 From: Justin Date: Thu, 4 Nov 2021 00:56:46 -0700 Subject: [PATCH] Change the location of the configuration to let clusters delete topics. (#42) Co-authored-by: Justin Casol --- .../orion-server-deployment/conf/kafka-server.yaml | 1 - docs/Kafka/README.md | 13 ++++++++++--- .../operator/kafka/BrokersetTopicOperator.java | 13 ++++++------- .../sensor/kafka/KafkaClusterInfoSensor.java | 1 - .../pinterest/orion/core/kafka/KafkaCluster.java | 7 ------- 5 files changed, 16 insertions(+), 19 deletions(-) diff --git a/deployments/orion-server-deployment/conf/kafka-server.yaml b/deployments/orion-server-deployment/conf/kafka-server.yaml index f40b4927..f1e12187 100644 --- a/deployments/orion-server-deployment/conf/kafka-server.yaml +++ b/deployments/orion-server-deployment/conf/kafka-server.yaml @@ -58,7 +58,6 @@ clusterConfigs: configuration: serversetPath: /var/serverset/discovery.testkafka.prod clusterInfoDir: /opt/orion/configs/kafka/clusters/testkafka - enableTopicDeletion: false customApiFactoryClasses: - com.pinterest.orion.server.api.kafka.KafkaApiFactory plugins: diff --git a/docs/Kafka/README.md b/docs/Kafka/README.md index 51369d8a..694937df 100644 --- a/docs/Kafka/README.md +++ b/docs/Kafka/README.md @@ -18,9 +18,16 @@ Description: Creates a Kafka Topic with supplied attributes Class: `com.pinterest.orion.core.actions.kafka.AssignmentDeleteKafkaTopicAction` Description: Delete the specified kafka topic if it includes the field `"delete": true` -AND the server description includes `enableTopicDeletion: false` in the configuration of -the specific cluster that has topics you want to delete. - +AND the server description includes +``` +plugins: + operatorConfigs: + - key: brokersetTopicOperator + enabled: true + configuration: + enableTopicDeletion: true +``` +Under the individual cluster config sections **Partition Reassignment** diff --git a/orion-server/src/main/java/com/pinterest/orion/core/automation/operator/kafka/BrokersetTopicOperator.java b/orion-server/src/main/java/com/pinterest/orion/core/automation/operator/kafka/BrokersetTopicOperator.java index 792589d7..50be5e17 100644 --- a/orion-server/src/main/java/com/pinterest/orion/core/automation/operator/kafka/BrokersetTopicOperator.java +++ b/orion-server/src/main/java/com/pinterest/orion/core/automation/operator/kafka/BrokersetTopicOperator.java @@ -66,10 +66,13 @@ public class BrokersetTopicOperator extends KafkaOperator { private static final String CONF_STEP_SIZE_KEY = "stepSize"; private static final String CONF_REPLICATION_FACTOR = "replicationFactor"; + public static final String ATTR_TOPIC_DELETION_ENABLED = "enableTopicDeletion"; + private int stepSize = 3; private int zookeeperCheckTimeoutSeconds = 5; private static final String CONF_MAX_NUM_STALE_SENSOR_INTERVALS_KEY = "maxNumStaleSensorIntervals"; private long maxNumStaleIntervals = 2; // default 2 times + private boolean enableTopicDeletion = false; @Override public void initialize(Map config) throws PluginConfigurationException { @@ -79,6 +82,9 @@ public void initialize(Map config) throws PluginConfigurationExc maxNumStaleIntervals = Integer .parseInt(config.get(CONF_MAX_NUM_STALE_SENSOR_INTERVALS_KEY).toString()); } + if (config.containsKey(ATTR_TOPIC_DELETION_ENABLED)) { + enableTopicDeletion = Boolean.parseBoolean(config.get(ATTR_TOPIC_DELETION_ENABLED).toString()); + } } @Override @@ -92,13 +98,6 @@ public void operate(KafkaCluster cluster) throws Exception { return; } - boolean enableTopicDeletion; - if (cluster.containsAttribute(KafkaClusterInfoSensor.ATTR_TOPIC_DELETION_ENABLED)) { - enableTopicDeletion = cluster.getAttribute(KafkaClusterInfoSensor.ATTR_TOPIC_DELETION_ENABLED).getValue(); - } else { - enableTopicDeletion = false; - } - Set sensorSet = new HashSet<>(); Attribute brokersetMapAttr = cluster.getAttribute(KafkaClusterInfoSensor.ATTR_BROKERSET_KEY); diff --git a/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/KafkaClusterInfoSensor.java b/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/KafkaClusterInfoSensor.java index cb546ca2..fe648706 100644 --- a/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/KafkaClusterInfoSensor.java +++ b/orion-server/src/main/java/com/pinterest/orion/core/automation/sensor/kafka/KafkaClusterInfoSensor.java @@ -38,7 +38,6 @@ public class KafkaClusterInfoSensor extends KafkaSensor { public static final String ATTR_BROKERSET_KEY = "brokerset"; public static final String CONF_CLUSTER_INFO_DIR_KEY = "clusterInfoDir"; public static final String ATTR_TOPIC_ASSIGNMENTS_KEY = "topicAssignment"; - public static final String ATTR_TOPIC_DELETION_ENABLED = "enableTopicDeletion"; @Override public String getName() { diff --git a/orion-server/src/main/java/com/pinterest/orion/core/kafka/KafkaCluster.java b/orion-server/src/main/java/com/pinterest/orion/core/kafka/KafkaCluster.java index 266043eb..5aa828a2 100644 --- a/orion-server/src/main/java/com/pinterest/orion/core/kafka/KafkaCluster.java +++ b/orion-server/src/main/java/com/pinterest/orion/core/kafka/KafkaCluster.java @@ -56,7 +56,6 @@ import com.pinterest.orion.core.automation.operator.Operator; import com.pinterest.orion.core.automation.sensor.Sensor; import com.pinterest.orion.core.automation.sensor.kafka.KafkaTopicSensor; -import com.pinterest.orion.core.automation.sensor.kafka.KafkaClusterInfoSensor; public class KafkaCluster extends Cluster { @@ -124,12 +123,6 @@ public int getClusterDefaultReplicationFactor() { @Override public void bootstrapClusterInfo(Map config) { props = new Properties(); - if (config.containsKey(KafkaClusterInfoSensor.ATTR_TOPIC_DELETION_ENABLED)) { - setAttribute( - KafkaClusterInfoSensor.ATTR_TOPIC_DELETION_ENABLED, - config.get(KafkaClusterInfoSensor.ATTR_TOPIC_DELETION_ENABLED) - ); - } } public void addBrokerset(Brokerset brokerset) {