Skip to content

Commit a5382a3

Browse files
bbejeckguozhangwang
authored andcommitted
KAFKA-2902: streaming config use get base consumer configs.
Changes made for using getBaseConsumerConfigs from StreamingConfig.getConsumerConfigs. Author: bbejeck <[email protected]> Author: Bill Bejeck <[email protected]> Reviewers: Guozhang Wang Closes apache#596 from bbejeck/KAFKA-2902-StreamingConfig-use-getBaseConsumerConfigs
1 parent 9fb1e25 commit a5382a3

File tree

2 files changed

+71
-1
lines changed

2 files changed

+71
-1
lines changed

streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ public StreamingConfig(Map<?, ?> props) {
222222
}
223223

224224
public Map<String, Object> getConsumerConfigs(StreamThread streamThread) {
225-
Map<String, Object> props = getRestoreConsumerConfigs();
225+
Map<String, Object> props = getBaseConsumerConfigs();
226226
props.put(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(StreamingConfig.NUM_STANDBY_REPLICAS_CONFIG));
227227
props.put(StreamingConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
228228
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, KafkaStreamingPartitionAssignor.class.getName());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
* <p>
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
* <p>
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.streams;
19+
20+
import org.apache.kafka.common.serialization.IntegerDeserializer;
21+
import org.apache.kafka.common.serialization.IntegerSerializer;
22+
import org.apache.kafka.common.serialization.StringDeserializer;
23+
import org.apache.kafka.common.serialization.StringSerializer;
24+
import org.apache.kafka.streams.examples.WallclockTimestampExtractor;
25+
import org.apache.kafka.streams.processor.internals.StreamThread;
26+
import org.junit.Before;
27+
import org.junit.Test;
28+
29+
import java.util.Map;
30+
import java.util.Properties;
31+
import static org.junit.Assert.assertEquals;
32+
import static org.junit.Assert.assertNull;
33+
34+
35+
36+
public class StreamingConfigTest {
37+
38+
private Properties props = new Properties();
39+
private StreamingConfig streamingConfig;
40+
private StreamThread streamThreadPlaceHolder = null;
41+
42+
43+
@Before
44+
public void setUp() {
45+
props.put(StreamingConfig.CLIENT_ID_CONFIG, "Example-Processor-Job");
46+
props.put("group.id", "test-consumer-group");
47+
props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
48+
props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
49+
props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
50+
props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
51+
props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
52+
props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
53+
streamingConfig = new StreamingConfig(props);
54+
}
55+
56+
57+
58+
@Test
59+
public void testGetConsumerConfigs() throws Exception {
60+
Map<String, Object> returnedProps = streamingConfig.getConsumerConfigs(streamThreadPlaceHolder);
61+
assertEquals(returnedProps.get("group.id"), "test-consumer-group");
62+
63+
}
64+
65+
@Test
66+
public void testGetRestoreConsumerConfigs() throws Exception {
67+
Map<String, Object> returnedProps = streamingConfig.getRestoreConsumerConfigs();
68+
assertNull(returnedProps.get("group.id"));
69+
}
70+
}

0 commit comments

Comments
 (0)