Skip to content

Commit

Permalink
Change the location of the configuration to let clusters delete topic…
Browse files Browse the repository at this point in the history
…s. (pinterest#42)

Co-authored-by: Justin Casol <[email protected]>
  • Loading branch information
NavalOrange and Justin Casol authored Nov 4, 2021
1 parent 805fc21 commit f46e2fc
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 19 deletions.
1 change: 0 additions & 1 deletion deployments/orion-server-deployment/conf/kafka-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ clusterConfigs:
configuration:
serversetPath: /var/serverset/discovery.testkafka.prod
clusterInfoDir: /opt/orion/configs/kafka/clusters/testkafka
enableTopicDeletion: false
customApiFactoryClasses:
- com.pinterest.orion.server.api.kafka.KafkaApiFactory
plugins:
Expand Down
13 changes: 10 additions & 3 deletions docs/Kafka/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@ Description: Creates a Kafka Topic with supplied attributes
Class: `com.pinterest.orion.core.actions.kafka.AssignmentDeleteKafkaTopicAction`

Description: Delete the specified kafka topic if it includes the field `"delete": true`
AND the server description includes `enableTopicDeletion: false` in the configuration of
the specific cluster that has topics you want to delete.

AND the server description includes
```
plugins:
operatorConfigs:
- key: brokersetTopicOperator
enabled: true
configuration:
enableTopicDeletion: true
```
Under the individual cluster config sections

**Partition Reassignment**

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@ public class BrokersetTopicOperator extends KafkaOperator {

private static final String CONF_STEP_SIZE_KEY = "stepSize";
private static final String CONF_REPLICATION_FACTOR = "replicationFactor";
public static final String ATTR_TOPIC_DELETION_ENABLED = "enableTopicDeletion";

private int stepSize = 3;
private int zookeeperCheckTimeoutSeconds = 5;
private static final String CONF_MAX_NUM_STALE_SENSOR_INTERVALS_KEY = "maxNumStaleSensorIntervals";
private long maxNumStaleIntervals = 2; // default 2 times
private boolean enableTopicDeletion = false;

@Override
public void initialize(Map<String, Object> config) throws PluginConfigurationException {
Expand All @@ -79,6 +82,9 @@ public void initialize(Map<String, Object> config) throws PluginConfigurationExc
maxNumStaleIntervals = Integer
.parseInt(config.get(CONF_MAX_NUM_STALE_SENSOR_INTERVALS_KEY).toString());
}
if (config.containsKey(ATTR_TOPIC_DELETION_ENABLED)) {
enableTopicDeletion = Boolean.parseBoolean(config.get(ATTR_TOPIC_DELETION_ENABLED).toString());
}
}

@Override
Expand All @@ -92,13 +98,6 @@ public void operate(KafkaCluster cluster) throws Exception {
return;
}

boolean enableTopicDeletion;
if (cluster.containsAttribute(KafkaClusterInfoSensor.ATTR_TOPIC_DELETION_ENABLED)) {
enableTopicDeletion = cluster.getAttribute(KafkaClusterInfoSensor.ATTR_TOPIC_DELETION_ENABLED).getValue();
} else {
enableTopicDeletion = false;
}

Set<String> sensorSet = new HashSet<>();

Attribute brokersetMapAttr = cluster.getAttribute(KafkaClusterInfoSensor.ATTR_BROKERSET_KEY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class KafkaClusterInfoSensor extends KafkaSensor {
public static final String ATTR_BROKERSET_KEY = "brokerset";
public static final String CONF_CLUSTER_INFO_DIR_KEY = "clusterInfoDir";
public static final String ATTR_TOPIC_ASSIGNMENTS_KEY = "topicAssignment";
public static final String ATTR_TOPIC_DELETION_ENABLED = "enableTopicDeletion";

@Override
public String getName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import com.pinterest.orion.core.automation.operator.Operator;
import com.pinterest.orion.core.automation.sensor.Sensor;
import com.pinterest.orion.core.automation.sensor.kafka.KafkaTopicSensor;
import com.pinterest.orion.core.automation.sensor.kafka.KafkaClusterInfoSensor;


public class KafkaCluster extends Cluster {
Expand Down Expand Up @@ -124,12 +123,6 @@ public int getClusterDefaultReplicationFactor() {
@Override
public void bootstrapClusterInfo(Map<String, Object> config) {
props = new Properties();
if (config.containsKey(KafkaClusterInfoSensor.ATTR_TOPIC_DELETION_ENABLED)) {
setAttribute(
KafkaClusterInfoSensor.ATTR_TOPIC_DELETION_ENABLED,
config.get(KafkaClusterInfoSensor.ATTR_TOPIC_DELETION_ENABLED)
);
}
}

public void addBrokerset(Brokerset brokerset) {
Expand Down

0 comments on commit f46e2fc

Please sign in to comment.