diff --git a/src/main/java/com/sohu/jafka/common/ConnectionRefusedException.java b/src/main/java/com/sohu/jafka/common/ConnectionRefusedException.java new file mode 100644 index 0000000..d58c1d9 --- /dev/null +++ b/src/main/java/com/sohu/jafka/common/ConnectionRefusedException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.sohu.jafka.common; + +public class ConnectionRefusedException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + public ConnectionRefusedException() { + super(); + } + + public ConnectionRefusedException(String message, Throwable cause) { + super(message, cause); + } + + public ConnectionRefusedException(String message) { + super(message); + } + + public ConnectionRefusedException(Throwable cause) { + super(cause); + } + +} diff --git a/src/main/java/com/sohu/jafka/producer/SyncProducer.java b/src/main/java/com/sohu/jafka/producer/SyncProducer.java index d74708b..7fd3e3d 100644 --- a/src/main/java/com/sohu/jafka/producer/SyncProducer.java +++ b/src/main/java/com/sohu/jafka/producer/SyncProducer.java @@ -19,6 +19,7 @@ import com.sohu.jafka.api.MultiProducerRequest; import com.sohu.jafka.api.ProducerRequest; +import com.sohu.jafka.common.ConnectionRefusedException; import com.sohu.jafka.common.annotations.ThreadSafe; import com.sohu.jafka.message.ByteBufferMessageSet; import com.sohu.jafka.mx.SyncProducerStats; @@ -102,7 +103,7 @@ private BlockingChannel connect() { try { blockingChannel.connect(); } catch (IOException ioe) { - throw new RuntimeException(ioe.getMessage(), ioe); + throw new ConnectionRefusedException(host + ":" + port, ioe); } finally { if (!blockingChannel.isConnected()) { disconnect(); diff --git a/src/main/java/com/sohu/jafka/producer/async/AbstractCallbackHandler.java b/src/main/java/com/sohu/jafka/producer/async/AbstractCallbackHandler.java index 9d46141..54dad1e 100644 --- a/src/main/java/com/sohu/jafka/producer/async/AbstractCallbackHandler.java +++ b/src/main/java/com/sohu/jafka/producer/async/AbstractCallbackHandler.java @@ -40,6 +40,10 @@ public List> lastBatchBeforeClose() { return Collections.emptyList(); } + @Override + public void connectionRefused(String broker, List> unsentData) { + } + @Override public void close() { } diff --git a/src/main/java/com/sohu/jafka/producer/async/CallbackHandler.java b/src/main/java/com/sohu/jafka/producer/async/CallbackHandler.java index 8a66b17..52c12b6 100644 --- a/src/main/java/com/sohu/jafka/producer/async/CallbackHandler.java +++ b/src/main/java/com/sohu/jafka/producer/async/CallbackHandler.java @@ -86,6 +86,14 @@ public interface CallbackHandler { */ List> lastBatchBeforeClose(); + /** + * Callback to process when the connection is refused + * + * @param broker the refused broker info + * @param unsentData the batched data that didn't send to the broker + */ + void connectionRefused(String broker, List> unsentData); + /** * Cleans up and shuts down the callback handler */ diff --git a/src/main/java/com/sohu/jafka/producer/async/ProducerSendThread.java b/src/main/java/com/sohu/jafka/producer/async/ProducerSendThread.java index f6b4fb0..a542ad0 100644 --- a/src/main/java/com/sohu/jafka/producer/async/ProducerSendThread.java +++ b/src/main/java/com/sohu/jafka/producer/async/ProducerSendThread.java @@ -167,6 +167,13 @@ private void tryToHandle(List> events) { this.eventHandler.handle(events, underlyingProducer, serializer); } catch (RuntimeException e) { logger.error("Error in handling batch of " + events.size() + " events", e); + List> unsentData = new ArrayList>(events); + while (queue.size() > 0) { + unsentData.add(queue.poll()); + } + if (this.callbackHandler != null) { + this.callbackHandler.connectionRefused(e.getMessage(), unsentData); + } } } }