|
25 | 25 | package org.sourcelab.kafka.webview.ui.controller.api;
|
26 | 26 |
|
27 | 27 | import org.sourcelab.kafka.webview.ui.controller.BaseController;
|
| 28 | +import org.sourcelab.kafka.webview.ui.controller.api.exceptions.ApiException; |
| 29 | +import org.sourcelab.kafka.webview.ui.controller.api.exceptions.NotFoundApiException; |
| 30 | +import org.sourcelab.kafka.webview.ui.controller.api.requests.ConsumeRequest; |
| 31 | +import org.sourcelab.kafka.webview.ui.controller.api.requests.CreateTopicRequest; |
| 32 | +import org.sourcelab.kafka.webview.ui.controller.api.requests.ModifyTopicConfigRequest; |
| 33 | +import org.sourcelab.kafka.webview.ui.controller.api.responses.ResultResponse; |
28 | 34 | import org.sourcelab.kafka.webview.ui.manager.kafka.KafkaOperations;
|
29 | 35 | import org.sourcelab.kafka.webview.ui.manager.kafka.KafkaOperationsFactory;
|
30 | 36 | import org.sourcelab.kafka.webview.ui.manager.kafka.SessionIdentifier;
|
@@ -330,6 +336,37 @@ public ResultResponse createTopic(@PathVariable final Long id, @RequestBody fina
|
330 | 336 | }
|
331 | 337 | }
|
332 | 338 |
|
| 339 | + /** |
| 340 | + * POST Modify a topic's configuration on cluster. |
| 341 | + * This should require ADMIN role. |
| 342 | + */ |
| 343 | + @ResponseBody |
| 344 | + @RequestMapping(path = "/cluster/{id}/modify/topic", method = RequestMethod.POST, produces = "application/json") |
| 345 | + public List<ConfigItem> modifyTopicConfig( |
| 346 | + @PathVariable final Long id, |
| 347 | + @RequestBody final ModifyTopicConfigRequest modifyTopicConfigRequest |
| 348 | + ) { |
| 349 | + // Retrieve cluster |
| 350 | + final Cluster cluster = retrieveClusterById(id); |
| 351 | + |
| 352 | + final String name = modifyTopicConfigRequest.getTopic(); |
| 353 | + if (name == null || name.trim().isEmpty()) { |
| 354 | + throw new ApiException("ModifyTopic", "Invalid topic name"); |
| 355 | + } |
| 356 | + |
| 357 | + final Map<String, String> configEntries = modifyTopicConfigRequest.getConfig(); |
| 358 | + if (configEntries == null || configEntries.isEmpty()) { |
| 359 | + throw new ApiException("ModifyTopic", "Invalid configuration defined"); |
| 360 | + } |
| 361 | + |
| 362 | + // Create new Operational Client |
| 363 | + try (final KafkaOperations operations = createOperationsClient(cluster)) { |
| 364 | + return operations.alterTopicConfig(name, configEntries).getConfigEntries(); |
| 365 | + } catch (final Exception e) { |
| 366 | + throw new ApiException("ModifyTopic", e); |
| 367 | + } |
| 368 | + } |
| 369 | + |
333 | 370 | /**
|
334 | 371 | * GET Nodes within a cluster.
|
335 | 372 | */
|
|
0 commit comments