65
65
import java .util .HashSet ;
66
66
import java .util .List ;
67
67
import java .util .Map ;
68
+ import java .util .Optional ;
68
69
import java .util .Set ;
69
70
70
71
/**
@@ -97,25 +98,11 @@ public KafkaResults consume(
97
98
@ PathVariable final Long id ,
98
99
@ RequestBody final ConsumeRequest consumeRequest ) {
99
100
100
- // How many results per partition to consume
101
- // Null means use whatever is configured in the view.
102
- final Integer resultsPerPartition = consumeRequest .getResultsPerPartition ();
103
-
104
- // Comma separated list of partitions to consume from.
105
- // Null or empty string means use whatever is configured in the View.
106
- final String partitions = consumeRequest .getPartitions ();
107
-
108
101
// Action describes what to consume 'next', 'prev', 'head', 'tail'
109
102
final String action = consumeRequest .getAction ();
110
103
111
- // Any custom configured filters
112
- final List <ConsumeRequest .Filter > requestFilters = consumeRequest .getFilters ();
113
-
114
104
// Retrieve the view definition
115
- final View view = viewRepository .findOne (id );
116
- if (view == null ) {
117
- throw new NotFoundApiException ("Consume" , "Unable to find view" );
118
- }
105
+ final View view = retrieveViewById (id );
119
106
120
107
// Override settings
121
108
final ViewCustomizer viewCustomizer = new ViewCustomizer (view , consumeRequest );
@@ -149,11 +136,8 @@ public KafkaResults consume(
149
136
@ ResponseBody
150
137
@ RequestMapping (path = "/consumer/view/{id}/offsets" , method = RequestMethod .POST , produces = "application/json" )
151
138
public ConsumerState setConsumerOffsets (@ PathVariable final Long id , @ RequestBody final Map <Integer , Long > partitionOffsetMap ) {
152
- // Retrieve the view definition
153
- final View view = viewRepository .findOne (id );
154
- if (view == null ) {
155
- throw new NotFoundApiException ("Offsets" , "Unable to find view" );
156
- }
139
+ // Retrieve View
140
+ final View view = retrieveViewById (id );
157
141
158
142
// Create consumer
159
143
try (final WebKafkaConsumer webKafkaConsumer = setup (view , new HashSet <>())) {
@@ -169,11 +153,8 @@ public ConsumerState setConsumerOffsets(@PathVariable final Long id, @RequestBo
169
153
@ ResponseBody
170
154
@ RequestMapping (path = "/consumer/view/{id}/timestamp/{timestamp}" , method = RequestMethod .POST , produces = "application/json" )
171
155
public ConsumerState setConsumerOffsetsByTimestamp (@ PathVariable final Long id , @ PathVariable final Long timestamp ) {
172
- // Retrieve the view definition
173
- final View view = viewRepository .findOne (id );
174
- if (view == null ) {
175
- throw new NotFoundApiException ("OffsetsByTimestamp" , "Unable to find view" );
176
- }
156
+ // Retrieve View
157
+ final View view = retrieveViewById (id );
177
158
178
159
// Create consumer
179
160
try (final WebKafkaConsumer webKafkaConsumer = setup (view , new HashSet <>())) {
@@ -190,10 +171,7 @@ public ConsumerState setConsumerOffsetsByTimestamp(@PathVariable final Long id,
190
171
@ RequestMapping (path = "/view/{id}/partitions" , method = RequestMethod .GET , produces = "application/json" )
191
172
public Collection <Integer > getPartitionsForView (@ PathVariable final Long id ) {
192
173
// Retrieve View
193
- final View view = viewRepository .findOne (id );
194
- if (view == null ) {
195
- throw new NotFoundApiException ("Partitions" , "Unable to find view" );
196
- }
174
+ final View view = retrieveViewById (id );
197
175
198
176
// If the view has defined partitions, we'll return them
199
177
if (!view .getPartitionsAsSet ().isEmpty ()) {
@@ -221,10 +199,7 @@ public Collection<Integer> getPartitionsForView(@PathVariable final Long id) {
221
199
@ RequestMapping (path = "/cluster/{id}/topics/list" , method = RequestMethod .GET , produces = "application/json" )
222
200
public List <TopicListing > getTopics (@ PathVariable final Long id ) {
223
201
// Retrieve cluster
224
- final Cluster cluster = clusterRepository .findOne (id );
225
- if (cluster == null ) {
226
- throw new NotFoundApiException ("Topics" , "Unable to find cluster" );
227
- }
202
+ final Cluster cluster = retrieveClusterById (id );
228
203
229
204
// Create new Operational Client
230
205
try (final KafkaOperations operations = createOperationsClient (cluster )) {
@@ -242,10 +217,7 @@ public List<TopicListing> getTopics(@PathVariable final Long id) {
242
217
@ RequestMapping (path = "/cluster/{id}/topic/{topic}/details" , method = RequestMethod .GET , produces = "application/json" )
243
218
public TopicDetails getTopicDetails (@ PathVariable final Long id , @ PathVariable final String topic ) {
244
219
// Retrieve cluster
245
- final Cluster cluster = clusterRepository .findOne (id );
246
- if (cluster == null ) {
247
- throw new NotFoundApiException ("TopicDetails" , "Unable to find cluster" );
248
- }
220
+ final Cluster cluster = retrieveClusterById (id );
249
221
250
222
// Create new Operational Client
251
223
try (final KafkaOperations operations = createOperationsClient (cluster )) {
@@ -262,10 +234,7 @@ public TopicDetails getTopicDetails(@PathVariable final Long id, @PathVariable f
262
234
@ RequestMapping (path = "/cluster/{id}/topic/{topic}/config" , method = RequestMethod .GET , produces = "application/json" )
263
235
public List <ConfigItem > getTopicConfig (@ PathVariable final Long id , @ PathVariable final String topic ) {
264
236
// Retrieve cluster
265
- final Cluster cluster = clusterRepository .findOne (id );
266
- if (cluster == null ) {
267
- throw new NotFoundApiException ("TopicConfig" , "Unable to find cluster" );
268
- }
237
+ final Cluster cluster = retrieveClusterById (id );
269
238
270
239
// Create new Operational Client
271
240
try (final KafkaOperations operations = createOperationsClient (cluster )) {
@@ -282,10 +251,7 @@ public List<ConfigItem> getTopicConfig(@PathVariable final Long id, @PathVariabl
282
251
@ RequestMapping (path = "/cluster/{id}/broker/{brokerId}/config" , method = RequestMethod .GET , produces = "application/json" )
283
252
public List <ConfigItem > getBrokerConfig (@ PathVariable final Long id , @ PathVariable final String brokerId ) {
284
253
// Retrieve cluster
285
- final Cluster cluster = clusterRepository .findOne (id );
286
- if (cluster == null ) {
287
- throw new NotFoundApiException ("TopicConfig" , "Unable to find cluster" );
288
- }
254
+ final Cluster cluster = retrieveClusterById (id );
289
255
290
256
// Create new Operational Client
291
257
try (final KafkaOperations operations = createOperationsClient (cluster )) {
@@ -302,10 +268,7 @@ public List<ConfigItem> getBrokerConfig(@PathVariable final Long id, @PathVariab
302
268
@ RequestMapping (path = "/cluster/{id}/topics/details" , method = RequestMethod .GET , produces = "application/json" )
303
269
public Collection <TopicDetails > getAllTopicsDetails (@ PathVariable final Long id ) {
304
270
// Retrieve cluster
305
- final Cluster cluster = clusterRepository .findOne (id );
306
- if (cluster == null ) {
307
- throw new NotFoundApiException ("TopicDetails" , "Unable to find cluster" );
308
- }
271
+ final Cluster cluster = retrieveClusterById (id );
309
272
310
273
// Create new Operational Client
311
274
try (final KafkaOperations operations = createOperationsClient (cluster )) {
@@ -329,10 +292,7 @@ public Collection<TopicDetails> getAllTopicsDetails(@PathVariable final Long id)
329
292
@ RequestMapping (path = "/cluster/{id}/nodes" , method = RequestMethod .GET , produces = "application/json" )
330
293
public List <NodeDetails > getClusterNodes (@ PathVariable final Long id ) {
331
294
// Retrieve cluster
332
- final Cluster cluster = clusterRepository .findOne (id );
333
- if (cluster == null ) {
334
- throw new NotFoundApiException ("ClusterNodes" , "Unable to find cluster" );
335
- }
295
+ final Cluster cluster = retrieveClusterById (id );
336
296
337
297
try (final KafkaOperations operations = createOperationsClient (cluster )) {
338
298
final NodeList nodes = operations .getClusterNodes ();
@@ -349,10 +309,7 @@ public List<NodeDetails> getClusterNodes(@PathVariable final Long id) {
349
309
@ RequestMapping (path = "/filter/{id}/options" , method = RequestMethod .GET , produces = "application/json" )
350
310
public String [] getFilterOptions (@ PathVariable final Long id ) {
351
311
// Retrieve Filter
352
- final Filter filter = filterRepository .findOne (id );
353
- if (filter == null ) {
354
- throw new NotFoundApiException ("FilterOptions" , "Unable to find filter" );
355
- }
312
+ final Filter filter = retrieveFilterById (id );
356
313
final String [] options = filter .getOptions ().split ("," );
357
314
358
315
return options ;
@@ -391,4 +348,54 @@ private WebKafkaConsumer setup(final View view, final Collection<FilterDefinitio
391
348
public void addAttributes (final Model model ) {
392
349
// Do nothing.
393
350
}
351
+
352
+ /**
353
+ * Helper method to retrieve a cluster by its Id. If its not found it will throw the appropriate
354
+ * NotFoundApiException exception.
355
+ *
356
+ * @param id id of cluster to retrieve
357
+ * @return the cluster entity.
358
+ * @throws NotFoundApiException if not found.
359
+ */
360
+ private Cluster retrieveClusterById (final Long id ) throws NotFoundApiException {
361
+ final Optional <Cluster > clusterOptional = clusterRepository .findById (id );
362
+ if (!clusterOptional .isPresent ()) {
363
+ throw new NotFoundApiException ("TopicConfig" , "Unable to find cluster" );
364
+ }
365
+ return clusterOptional .get ();
366
+ }
367
+
368
+ /**
369
+ * Helper method to retrieve a view by its Id. If its not found it will throw the appropriate
370
+ * NotFoundApiException exception.
371
+ *
372
+ * @param id id of view to retrieve
373
+ * @return the view entity.
374
+ * @throws NotFoundApiException if not found.
375
+ */
376
+ private View retrieveViewById (final Long id ) throws NotFoundApiException {
377
+ // Retrieve View
378
+ final Optional <View > viewOptional = viewRepository .findById (id );
379
+ if (!viewOptional .isPresent ()) {
380
+ throw new NotFoundApiException ("Partitions" , "Unable to find view" );
381
+ }
382
+ return viewOptional .get ();
383
+ }
384
+
385
+ /**
386
+ * Helper method to retrieve a filter by its Id. If its not found it will throw the appropriate
387
+ * NotFoundApiException exception.
388
+ *
389
+ * @param id id of filter to retrieve
390
+ * @return the filter entity.
391
+ * @throws NotFoundApiException if not found.
392
+ */
393
+ private Filter retrieveFilterById (final Long id ) throws NotFoundApiException {
394
+ // Retrieve Filter
395
+ final Optional <Filter > filterOptional = filterRepository .findById (id );
396
+ if (!filterOptional .isPresent ()) {
397
+ throw new NotFoundApiException ("FilterOptions" , "Unable to find filter" );
398
+ }
399
+ return filterOptional .get ();
400
+ }
394
401
}
0 commit comments