Skip to content

Commit 99dd5e6

Browse files
authored
feat: add more streams acls and option for describe topic acls (#36)
1 parent 1b0163d commit 99dd5e6

26 files changed

+1175
-31
lines changed

src/main/java/com/devshawn/kafka/gitops/MainCommand.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import java.util.concurrent.Callable;
1313

1414
@Command(name = "kafka-gitops",
15-
version = "0.2.8",
15+
version = "0.2.9-SNAPSHOT",
1616
exitCodeOnInvalidInput = 0,
1717
subcommands = {
1818
AccountCommand.class,

src/main/java/com/devshawn/kafka/gitops/StateManager.java

+14-8
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.devshawn.kafka.gitops.config.KafkaGitopsConfigLoader;
66
import com.devshawn.kafka.gitops.config.ManagerConfig;
77
import com.devshawn.kafka.gitops.domain.confluent.ServiceAccount;
8+
import com.devshawn.kafka.gitops.domain.options.GetAclOptions;
89
import com.devshawn.kafka.gitops.domain.plan.DesiredPlan;
910
import com.devshawn.kafka.gitops.domain.state.AclDetails;
1011
import com.devshawn.kafka.gitops.domain.state.CustomAclDetails;
@@ -53,6 +54,8 @@ public class StateManager {
5354
private PlanManager planManager;
5455
private ApplyManager applyManager;
5556

57+
private boolean describeAclEnabled = false;
58+
5659
public StateManager(ManagerConfig managerConfig, ParserService parserService) {
5760
initializeLogger(managerConfig.isVerboseRequested());
5861
this.managerConfig = managerConfig;
@@ -69,6 +72,7 @@ public DesiredStateFile getAndValidateStateFile() {
6972
DesiredStateFile desiredStateFile = parserService.parseStateFile();
7073
validateTopics(desiredStateFile);
7174
validateCustomAcls(desiredStateFile);
75+
this.describeAclEnabled = StateUtil.isDescribeTopicAclEnabled(desiredStateFile);
7276
return desiredStateFile;
7377
}
7478

@@ -107,12 +111,11 @@ public void createServiceAccounts() {
107111
AtomicInteger count = new AtomicInteger();
108112
if (isConfluentCloudEnabled(desiredStateFile)) {
109113
desiredStateFile.getServices().forEach((name, service) -> {
110-
createServiceAccount(name, serviceAccounts, count);
114+
createServiceAccount(name, serviceAccounts, count, false);
111115
});
112116

113117
desiredStateFile.getUsers().forEach((name, user) -> {
114-
String serviceAccountName = String.format("user-%s", name);
115-
createServiceAccount(serviceAccountName, serviceAccounts, count);
118+
createServiceAccount(name, serviceAccounts, count, true);
116119
});
117120
} else {
118121
throw new ConfluentCloudException("Confluent Cloud must be enabled in the state file to use this command.");
@@ -123,9 +126,9 @@ public void createServiceAccounts() {
123126
}
124127
}
125128

126-
private void createServiceAccount(String name, List<ServiceAccount> serviceAccounts, AtomicInteger count) {
129+
private void createServiceAccount(String name, List<ServiceAccount> serviceAccounts, AtomicInteger count, boolean isUser) {
127130
if (serviceAccounts.stream().noneMatch(it -> it.getName().equals(name))) {
128-
confluentCloudService.createServiceAccount(name);
131+
confluentCloudService.createServiceAccount(name, isUser);
129132
LogUtil.printSimpleSuccess(String.format("Successfully created service account: %s", name));
130133
count.getAndIncrement();
131134
}
@@ -169,7 +172,7 @@ private void generateConfluentCloudServiceAcls(DesiredState.Builder desiredState
169172
Optional<ServiceAccount> serviceAccount = serviceAccounts.stream().filter(it -> it.getName().equals(name)).findFirst();
170173
String serviceAccountId = serviceAccount.orElseThrow(() -> new ServiceAccountNotFoundException(name)).getId();
171174

172-
service.getAcls(name).forEach(aclDetails -> {
175+
service.getAcls(buildGetAclOptions(name)).forEach(aclDetails -> {
173176
aclDetails.setPrincipal(String.format("User:%s", serviceAccountId));
174177
desiredState.putAcls(String.format("%s-%s", name, index.getAndSet(index.get() + 1)), aclDetails.build());
175178
});
@@ -213,7 +216,7 @@ private void generateConfluentCloudUserAcls(DesiredState.Builder desiredState, D
213216
private void generateServiceAcls(DesiredState.Builder desiredState, DesiredStateFile desiredStateFile) {
214217
desiredStateFile.getServices().forEach((name, service) -> {
215218
AtomicReference<Integer> index = new AtomicReference<>(0);
216-
service.getAcls(name).forEach(aclDetails -> {
219+
service.getAcls(buildGetAclOptions(name)).forEach(aclDetails -> {
217220
desiredState.putAcls(String.format("%s-%s", name, index.getAndSet(index.get() + 1)), buildAclDetails(name, aclDetails));
218221
});
219222

@@ -274,6 +277,10 @@ private List<String> getPrefixedTopicsToIgnore(DesiredStateFile desiredStateFile
274277
return topics;
275278
}
276279

280+
private GetAclOptions buildGetAclOptions(String serviceName) {
281+
return new GetAclOptions.Builder().setServiceName(serviceName).setDescribeAclEnabled(describeAclEnabled).build();
282+
}
283+
277284
private void validateCustomAcls(DesiredStateFile desiredStateFile) {
278285
desiredStateFile.getCustomServiceAcls().forEach((service, details) -> {
279286
try {
@@ -307,7 +314,6 @@ private void validateTopics(DesiredStateFile desiredStateFile) {
307314
throw new ValidationException("The default replication factor must be a positive integer.");
308315
}
309316
}
310-
311317
}
312318

313319
private boolean isConfluentCloudEnabled(DesiredStateFile desiredStateFile) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.devshawn.kafka.gitops.domain.options;
2+
3+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
4+
import org.inferred.freebuilder.FreeBuilder;
5+
6+
@FreeBuilder
7+
@JsonDeserialize(builder = GetAclOptions.Builder.class)
8+
public interface GetAclOptions {
9+
10+
String getServiceName();
11+
12+
Boolean getDescribeAclEnabled();
13+
14+
class Builder extends GetAclOptions_Builder {
15+
}
16+
}

src/main/java/com/devshawn/kafka/gitops/domain/state/AbstractService.java

+26
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,19 @@ public AclDetails.Builder generateWriteACL(String topic, Optional<String> princi
3030
return builder;
3131
}
3232

33+
public AclDetails.Builder generateDescribeAcl(String topic, Optional<String> principal) {
34+
AclDetails.Builder builder = new AclDetails.Builder()
35+
.setHost("*")
36+
.setName(topic)
37+
.setOperation("DESCRIBE")
38+
.setPermission("ALLOW")
39+
.setPattern("LITERAL")
40+
.setType("TOPIC");
41+
42+
principal.ifPresent(builder::setPrincipal);
43+
return builder;
44+
}
45+
3346
public AclDetails.Builder generatePrefixedTopicACL(String topic, Optional<String> principal, String operation) {
3447
AclDetails.Builder builder = new AclDetails.Builder()
3548
.setHost("*")
@@ -55,4 +68,17 @@ public AclDetails.Builder generateConsumerGroupAcl(String consumerGroupId, Optio
5568
principal.ifPresent(builder::setPrincipal);
5669
return builder;
5770
}
71+
72+
public AclDetails.Builder generateClusterAcl(Optional<String> principal, String operation) {
73+
AclDetails.Builder builder = new AclDetails.Builder()
74+
.setHost("*")
75+
.setName("kafka-cluster")
76+
.setOperation(operation)
77+
.setPermission("ALLOW")
78+
.setPattern("LITERAL")
79+
.setType("CLUSTER");
80+
81+
principal.ifPresent(builder::setPrincipal);
82+
return builder;
83+
}
5884
}

src/main/java/com/devshawn/kafka/gitops/domain/state/ServiceDetails.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.devshawn.kafka.gitops.domain.state;
22

3+
import com.devshawn.kafka.gitops.domain.options.GetAclOptions;
34
import com.devshawn.kafka.gitops.domain.state.service.ApplicationService;
45
import com.devshawn.kafka.gitops.domain.state.service.KafkaConnectService;
56
import com.devshawn.kafka.gitops.domain.state.service.KafkaStreamsService;
@@ -18,7 +19,7 @@ public abstract class ServiceDetails extends AbstractService {
1819

1920
public String type;
2021

21-
public List<AclDetails.Builder> getAcls(String serviceName) {
22+
public List<AclDetails.Builder> getAcls(GetAclOptions options) {
2223
throw new UnsupportedOperationException("Method getAcls is not implemented.");
2324
}
2425
}

src/main/java/com/devshawn/kafka/gitops/domain/state/service/ApplicationService.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package com.devshawn.kafka.gitops.domain.state.service;
22

3+
import com.devshawn.kafka.gitops.domain.options.GetAclOptions;
34
import com.devshawn.kafka.gitops.domain.state.AclDetails;
45
import com.devshawn.kafka.gitops.domain.state.ServiceDetails;
6+
import com.devshawn.kafka.gitops.util.HelperUtil;
57
import com.fasterxml.jackson.annotation.JsonProperty;
68
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
79
import org.inferred.freebuilder.FreeBuilder;
@@ -24,12 +26,18 @@ public abstract class ApplicationService extends ServiceDetails {
2426
public abstract List<String> getConsumes();
2527

2628
@Override
27-
public List<AclDetails.Builder> getAcls(String serviceName) {
29+
public List<AclDetails.Builder> getAcls(GetAclOptions options) {
2830
List<AclDetails.Builder> acls = new ArrayList<>();
2931
getProduces().forEach(topic -> acls.add(generateWriteACL(topic, getPrincipal())));
3032
getConsumes().forEach(topic -> acls.add(generateReadAcl(topic, getPrincipal())));
33+
34+
if (options.getDescribeAclEnabled()) {
35+
List<String> allTopics = HelperUtil.uniqueCombine(getConsumes(), getProduces());
36+
allTopics.forEach(topic -> acls.add(generateDescribeAcl(topic, getPrincipal())));
37+
}
38+
3139
if (!getConsumes().isEmpty()) {
32-
String groupId = getGroupId().isPresent() ? getGroupId().get() : serviceName;
40+
String groupId = getGroupId().isPresent() ? getGroupId().get() : options.getServiceName();
3341
acls.add(generateConsumerGroupAcl(groupId, getPrincipal(), "READ"));
3442
}
3543
return acls;

src/main/java/com/devshawn/kafka/gitops/domain/state/service/KafkaConnectService.java

+12-8
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.devshawn.kafka.gitops.domain.state.service;
22

33

4+
import com.devshawn.kafka.gitops.domain.options.GetAclOptions;
45
import com.devshawn.kafka.gitops.domain.state.AclDetails;
56
import com.devshawn.kafka.gitops.domain.state.ServiceDetails;
67
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -29,18 +30,21 @@ public abstract class KafkaConnectService extends ServiceDetails {
2930
public abstract Map<String, KafkaConnectorDetails> getConnectors();
3031

3132
@Override
32-
public List<AclDetails.Builder> getAcls(String serviceName) {
33+
public List<AclDetails.Builder> getAcls(GetAclOptions options) {
3334
List<AclDetails.Builder> acls = new ArrayList<>();
3435
getProduces().forEach(topic -> acls.add(generateWriteACL(topic, getPrincipal())));
35-
acls.addAll(getConnectWorkerAcls(serviceName));
36+
if (options.getDescribeAclEnabled()) {
37+
getProduces().forEach(topic -> acls.add(generateDescribeAcl(topic, getPrincipal())));
38+
}
39+
acls.addAll(getConnectWorkerAcls(options));
3640
return acls;
3741
}
3842

39-
private List<AclDetails.Builder> getConnectWorkerAcls(String serviceName) {
40-
String groupId = getGroupId().isPresent() ? getGroupId().get() : serviceName;
41-
String configTopic = getConfigTopic(serviceName);
42-
String offsetTopic = getOffsetTopic(serviceName);
43-
String statusTopic = getStatusTopic(serviceName);
43+
private List<AclDetails.Builder> getConnectWorkerAcls(GetAclOptions options) {
44+
String groupId = getGroupId().isPresent() ? getGroupId().get() : options.getServiceName();
45+
String configTopic = getConfigTopic(options.getServiceName());
46+
String offsetTopic = getOffsetTopic(options.getServiceName());
47+
String statusTopic = getStatusTopic(options.getServiceName());
4448

4549
List<AclDetails.Builder> acls = new ArrayList<>();
4650
acls.add(generateReadAcl(configTopic, getPrincipal()));
@@ -50,7 +54,7 @@ private List<AclDetails.Builder> getConnectWorkerAcls(String serviceName) {
5054
acls.add(generateWriteACL(offsetTopic, getPrincipal()));
5155
acls.add(generateWriteACL(statusTopic, getPrincipal()));
5256
acls.add(generateConsumerGroupAcl(groupId, getPrincipal(), "READ"));
53-
getConnectors().forEach((connectorName, connector) -> acls.addAll(connector.getAcls(connectorName, getPrincipal())));
57+
getConnectors().forEach((connectorName, connector) -> acls.addAll(connector.getAcls(connectorName, getPrincipal(), options)));
5458
return acls;
5559
}
5660

src/main/java/com/devshawn/kafka/gitops/domain/state/service/KafkaConnectorDetails.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package com.devshawn.kafka.gitops.domain.state.service;
22

3+
import com.devshawn.kafka.gitops.domain.options.GetAclOptions;
34
import com.devshawn.kafka.gitops.domain.state.AbstractService;
45
import com.devshawn.kafka.gitops.domain.state.AclDetails;
6+
import com.devshawn.kafka.gitops.util.HelperUtil;
57
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
68
import org.inferred.freebuilder.FreeBuilder;
79

@@ -17,10 +19,16 @@ public abstract class KafkaConnectorDetails extends AbstractService {
1719

1820
public abstract List<String> getConsumes();
1921

20-
public List<AclDetails.Builder> getAcls(String connectorName, Optional<String> principal) {
22+
public List<AclDetails.Builder> getAcls(String connectorName, Optional<String> principal, GetAclOptions options) {
2123
List<AclDetails.Builder> acls = new ArrayList<>();
2224
getProduces().forEach(topic -> acls.add(generateWriteACL(topic, principal)));
2325
getConsumes().forEach(topic -> acls.add(generateReadAcl(topic, principal)));
26+
27+
if (options.getDescribeAclEnabled()) {
28+
List<String> allTopics = HelperUtil.uniqueCombine(getConsumes(), getProduces());
29+
allTopics.forEach(topic -> acls.add(generateDescribeAcl(topic, principal)));
30+
}
31+
2432
if (!getConsumes().isEmpty()) {
2533
acls.add(generateConsumerGroupAcl(String.format("connect-%s", connectorName), principal, "READ"));
2634
}

src/main/java/com/devshawn/kafka/gitops/domain/state/service/KafkaStreamsService.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package com.devshawn.kafka.gitops.domain.state.service;
22

3+
import com.devshawn.kafka.gitops.domain.options.GetAclOptions;
34
import com.devshawn.kafka.gitops.domain.state.AclDetails;
45
import com.devshawn.kafka.gitops.domain.state.ServiceDetails;
6+
import com.devshawn.kafka.gitops.util.HelperUtil;
57
import com.fasterxml.jackson.annotation.JsonProperty;
68
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
79
import org.inferred.freebuilder.FreeBuilder;
@@ -24,11 +26,15 @@ public abstract class KafkaStreamsService extends ServiceDetails {
2426
public abstract List<String> getConsumes();
2527

2628
@Override
27-
public List<AclDetails.Builder> getAcls(String serviceName) {
29+
public List<AclDetails.Builder> getAcls(GetAclOptions options) {
2830
List<AclDetails.Builder> acls = new ArrayList<>();
2931
getProduces().forEach(topic -> acls.add(generateWriteACL(topic, getPrincipal())));
3032
getConsumes().forEach(topic -> acls.add(generateReadAcl(topic, getPrincipal())));
31-
acls.addAll(getInternalAcls(serviceName));
33+
if (options.getDescribeAclEnabled()) {
34+
List<String> allTopics = HelperUtil.uniqueCombine(getConsumes(), getProduces());
35+
allTopics.forEach(topic -> acls.add(generateDescribeAcl(topic, getPrincipal())));
36+
}
37+
acls.addAll(getInternalAcls(options.getServiceName()));
3238
return acls;
3339
}
3440

@@ -45,6 +51,8 @@ private List<AclDetails.Builder> getInternalAcls(String serviceName) {
4551
acls.add(generatePrefixedTopicACL(applicationId, getPrincipal(), "DESCRIBE_CONFIGS"));
4652
acls.add(generateConsumerGroupAcl(applicationId, getPrincipal(), "READ"));
4753
acls.add(generateConsumerGroupAcl(applicationId, getPrincipal(), "DESCRIBE"));
54+
acls.add(generateConsumerGroupAcl(applicationId, getPrincipal(), "DELETE"));
55+
acls.add(generateClusterAcl(getPrincipal(), "DESCRIBE_CONFIGS"));
4856
return acls;
4957
}
5058

src/main/java/com/devshawn/kafka/gitops/domain/state/settings/Settings.java

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ public interface Settings {
1313

1414
Optional<SettingsTopics> getTopics();
1515

16+
Optional<SettingsServices> getServices();
17+
1618
Optional<SettingsFiles> getFiles();
1719

1820
class Builder extends Settings_Builder {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.devshawn.kafka.gitops.domain.state.settings;
2+
3+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
4+
import org.inferred.freebuilder.FreeBuilder;
5+
6+
import java.util.Optional;
7+
8+
@FreeBuilder
9+
@JsonDeserialize(builder = SettingsServices.Builder.class)
10+
public interface SettingsServices {
11+
12+
Optional<SettingsServicesAcls> getAcls();
13+
14+
class Builder extends SettingsServices_Builder {
15+
}
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.devshawn.kafka.gitops.domain.state.settings;
2+
3+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
4+
import org.inferred.freebuilder.FreeBuilder;
5+
6+
import java.util.Optional;
7+
8+
@FreeBuilder
9+
@JsonDeserialize(builder = SettingsServicesAcls.Builder.class)
10+
public interface SettingsServicesAcls {
11+
12+
Optional<Boolean> getDescribeTopicEnabled();
13+
14+
class Builder extends SettingsServicesAcls_Builder {
15+
}
16+
}

src/main/java/com/devshawn/kafka/gitops/service/ConfluentCloudService.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,12 @@ public List<ServiceAccount> getServiceAccounts() {
3030
}
3131
}
3232

33-
public ServiceAccount createServiceAccount(String name) {
33+
public ServiceAccount createServiceAccount(String name, boolean isUser) {
3434
log.info("Creating service account {} in Confluent Cloud via ccloud tool.", name);
3535
try {
36-
String description = String.format("Service account: %s", name);
37-
String result = execCmd(new String[]{"ccloud", "service-account", "create", name, "--description", description, "-o", "json"});
36+
String serviceName = isUser ? String.format("user-%s", name) : name;
37+
String description = isUser ? String.format("User: %s", name) : String.format("Service account: %s", name);
38+
String result = execCmd(new String[]{"ccloud", "service-account", "create", serviceName, "--description", description, "-o", "json"});
3839
return objectMapper.readValue(result, ServiceAccount.class);
3940
} catch (IOException ex) {
4041
throw new ConfluentCloudException(String.format("There was an error creating Confluent Cloud service account: %s.", name));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package com.devshawn.kafka.gitops.util;
2+
3+
import java.util.ArrayList;
4+
import java.util.LinkedHashSet;
5+
import java.util.List;
6+
import java.util.Set;
7+
8+
public class HelperUtil {
9+
10+
public static List<String> uniqueCombine(List<String> listOne, List<String> listTwo) {
11+
Set<String> set = new LinkedHashSet<>(listOne);
12+
set.addAll(listTwo);
13+
return new ArrayList<>(set);
14+
}
15+
}

src/main/java/com/devshawn/kafka/gitops/util/StateUtil.java

+7
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,11 @@ public static Optional<Integer> fetchReplication(DesiredStateFile desiredStateFi
1313
}
1414
return Optional.empty();
1515
}
16+
17+
public static boolean isDescribeTopicAclEnabled(DesiredStateFile desiredStateFile) {
18+
return desiredStateFile.getSettings().isPresent() && desiredStateFile.getSettings().get().getServices().isPresent()
19+
&& desiredStateFile.getSettings().get().getServices().get().getAcls().isPresent()
20+
&& desiredStateFile.getSettings().get().getServices().get().getAcls().get().getDescribeTopicEnabled().isPresent()
21+
&& desiredStateFile.getSettings().get().getServices().get().getAcls().get().getDescribeTopicEnabled().get();
22+
}
1623
}

0 commit comments

Comments
 (0)