From 8ecdd9d47c0eae305e7d2c81ced60879d6080f81 Mon Sep 17 00:00:00 2001 From: Isaac Banner Date: Mon, 27 Oct 2014 15:58:45 -0400 Subject: [PATCH 01/13] Modified delete workflow to prevent incomplete deletes. --- app/common/Util.scala | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/app/common/Util.scala b/app/common/Util.scala index 7848ec7..9e5b5b4 100644 --- a/app/common/Util.scala +++ b/app/common/Util.scala @@ -150,19 +150,17 @@ object Util { } def deleteZNode(zNode: ZNode): Future[ZNode] = { - val delNode = twitterToScalaFuture(zNode.getData()).flatMap { d => - twitterToScalaFuture(zNode.delete(d.stat.getVersion)).recover { - case e: NotEmptyException => { - for { - children <- getZChildren(zNode, Seq("*")) - delChildren <- Future.sequence(children.map(n => deleteZNode(n))) - } yield deleteZNode(zNode) - } - case e: NoNodeException => Future(ZNode) - } - } + val deletePromise: Promise[ZNode] = Promise[ZNode] + + getZChildren(zNode, Seq("*")).map(children => + Future.sequence(children.map(n => deleteZNode(n))).onSuccess({ case children => + val delNode = twitterToScalaFuture(zNode.getData()).flatMap { d => + twitterToScalaFuture(zNode.delete(d.stat.getVersion)) } + + delNode.onComplete(zNode => deletePromise complete zNode) + }) + ) - //TODO: investigate why actual type is Future[Object] - delNode.asInstanceOf[Future[ZNode]] + deletePromise.future } } From f6c0377486a802e7bca0dac86eb438c253973fce Mon Sep 17 00:00:00 2001 From: Isaac Banner Date: Tue, 28 Oct 2014 11:29:49 -0400 Subject: [PATCH 02/13] Forgot a failure case --- app/common/Util.scala | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/app/common/Util.scala b/app/common/Util.scala index 9e5b5b4..5c6934b 100644 --- a/app/common/Util.scala +++ b/app/common/Util.scala @@ -152,14 +152,22 @@ object Util { def deleteZNode(zNode: ZNode): Future[ZNode] = { val deletePromise: Promise[ZNode] = Promise[ZNode] - getZChildren(zNode, Seq("*")).map(children => - Future.sequence(children.map(n => deleteZNode(n))).onSuccess({ case children => + Logger.debug(s"Attempting to delete ${zNode.path}") + getZChildren(zNode, Seq("*")).map({ children => + val sequenceFuture = Future.sequence(children.map(n => deleteZNode(n))) + + sequenceFuture.onSuccess({ case children => val delNode = twitterToScalaFuture(zNode.getData()).flatMap { d => - twitterToScalaFuture(zNode.delete(d.stat.getVersion)) } + twitterToScalaFuture(zNode.delete(d.stat.getVersion)) + } delNode.onComplete(zNode => deletePromise complete zNode) }) - ) + + sequenceFuture.onFailure({ case t => + deletePromise failure t + }) + }) deletePromise.future } From cb7097c69f7c85b21292872a531b57337eda0168 Mon Sep 17 00:00:00 2001 From: Isaac Banner Date: Tue, 28 Oct 2014 11:32:07 -0400 Subject: [PATCH 03/13] Forgot to remove a debug line --- app/common/Util.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/app/common/Util.scala b/app/common/Util.scala index 5c6934b..d5943d2 100644 --- a/app/common/Util.scala +++ b/app/common/Util.scala @@ -152,7 +152,6 @@ object Util { def deleteZNode(zNode: ZNode): Future[ZNode] = { val deletePromise: Promise[ZNode] = Promise[ZNode] - Logger.debug(s"Attempting to delete ${zNode.path}") getZChildren(zNode, Seq("*")).map({ children => val sequenceFuture = Future.sequence(children.map(n => deleteZNode(n))) From 8f2c7532391184fb2101069709e2578cf846a3bb Mon Sep 17 00:00:00 2001 From: Isaac Banner Date: Tue, 9 Dec 2014 09:19:08 -0500 Subject: [PATCH 04/13] Fixes #30 --- app/common/Util.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/app/common/Util.scala b/app/common/Util.scala index d5943d2..eefa95f 100644 --- a/app/common/Util.scala +++ b/app/common/Util.scala @@ -26,6 +26,7 @@ import models.Zookeeper import play.api.libs.concurrent.Execution.Implicits.defaultContext import org.apache.zookeeper.KeeperException.{NotEmptyException, NodeExistsException, NoNodeException} import okapies.finagle.Kafka +import okapies.finagle.kafka.Client import kafka.api.OffsetRequest object Util { @@ -76,7 +77,7 @@ object Util { } } - client.close() + clients.map{ a_client: (String, Client) => a_client._2.close() } offset }) } yield partitionsLogSize From 3f98b14b5fc3fcf599c1f5c11d3b1ac4358ccfdc Mon Sep 17 00:00:00 2001 From: Isaac Banner Date: Tue, 9 Dec 2014 16:00:49 -0500 Subject: [PATCH 05/13] Closed on the wrong side of the braces. --- app/common/Util.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/app/common/Util.scala b/app/common/Util.scala index eefa95f..2060e67 100644 --- a/app/common/Util.scala +++ b/app/common/Util.scala @@ -77,9 +77,10 @@ object Util { } } - clients.map{ a_client: (String, Client) => a_client._2.close() } offset }) + + closeClients <- clients.map{ a_client: (String, Client) => a_client._2.close() } } yield partitionsLogSize } From d71cc9e8b5f6264b31d1ec2eeb52d737a2e6ade7 Mon Sep 17 00:00:00 2001 From: Isaac Banner Date: Tue, 9 Dec 2014 16:08:26 -0500 Subject: [PATCH 06/13] Typo. --- app/common/Util.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/common/Util.scala b/app/common/Util.scala index 2060e67..6e3e4bf 100644 --- a/app/common/Util.scala +++ b/app/common/Util.scala @@ -80,7 +80,7 @@ object Util { offset }) - closeClients <- clients.map{ a_client: (String, Client) => a_client._2.close() } + closeClients = clients.map{ a_client: (String, Client) => a_client._2.close() } } yield partitionsLogSize } From 1b611818662f473272b10a8b3049620011113f5d Mon Sep 17 00:00:00 2001 From: Isaac Banner Date: Tue, 16 Dec 2014 16:46:11 -0500 Subject: [PATCH 07/13] Actually fixes #30. --- app/common/Util.scala | 26 +- app/kafka/consumer/LowLevelConsumer.java | 225 ++++++++++++++++++ .../async/AsyncLowLevelConsumer.scala | 31 +++ 3 files changed, 267 insertions(+), 15 deletions(-) create mode 100644 app/kafka/consumer/LowLevelConsumer.java create mode 100644 app/kafka/consumer/async/AsyncLowLevelConsumer.scala diff --git a/app/common/Util.scala b/app/common/Util.scala index 6e3e4bf..58f1e09 100644 --- a/app/common/Util.scala +++ b/app/common/Util.scala @@ -16,6 +16,7 @@ package common +import kafka.consumer.async.AsyncLowLevelConsumer import play.api.Logger import scala.concurrent.{Future, Promise} @@ -63,24 +64,19 @@ object Util { } def getPartitionsLogSize(topicName: String, partitionLeaders: Seq[String]): Future[Seq[Long]] = { - Logger.debug("Getting partition log sizes for topic " + topicName + " from partition leaders " + partitionLeaders.mkString(", ")) - return for { - clients <- Future.sequence(partitionLeaders.map(addr => Future((addr, Kafka.newRichClient(addr))))) - partitionsLogSize <- Future.sequence(clients.zipWithIndex.map { tu => - val addr = tu._1._1 - val client = tu._1._2 - var offset = Future(0L) - - if (!addr.isEmpty) { - offset = twitterToScalaFuture(client.offset(topicName, tu._2, OffsetRequest.LatestTime)).map(_.offsets.head).recover { - case e => Logger.warn("Could not connect to partition leader " + addr + ". Error message: " + e.getMessage); 0L - } - } + // Logger.debug("Getting partition log sizes for topic " + topicName + " from partition leaders " + partitionLeaders.mkString(", ")) + return for { + clients <- Future.sequence(partitionLeaders.zipWithIndex.map {tuple => + val hostAndPort = tuple._1.split(":") + val partition = tuple._2 + AsyncLowLevelConsumer(topicName, partition, hostAndPort(0), hostAndPort(1).toInt) + }) + partitionsLogSize <- Future.sequence(clients.map { client => + val offset = client.offset offset }) - - closeClients = clients.map{ a_client: (String, Client) => a_client._2.close() } + closeClients <- Future.sequence(clients.map(client => client.close)) } yield partitionsLogSize } diff --git a/app/kafka/consumer/LowLevelConsumer.java b/app/kafka/consumer/LowLevelConsumer.java new file mode 100644 index 0000000..302a997 --- /dev/null +++ b/app/kafka/consumer/LowLevelConsumer.java @@ -0,0 +1,225 @@ +/** +* Copyright (C) 2014 the original author or authors. +* See the LICENCE.txt file distributed with this work for additional +* information regarding copyright ownership. +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*/ + +package kafka.consumer; + +import kafka.api.FetchRequest; +import kafka.api.FetchRequestBuilder; +import kafka.api.PartitionOffsetRequestInfo; +import kafka.common.ErrorMapping; +import kafka.common.TopicAndPartition; +import kafka.javaapi.*; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.javaapi.message.ByteBufferMessageSet; +import kafka.message.MessageAndOffset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.*; + +public class LowLevelConsumer { + static final Logger log = LoggerFactory.getLogger(LowLevelConsumer.class); + + private SimpleConsumer consumer; + private List replicaBrokers = new ArrayList<>(); + private String leadBroker; + private String clientName; + private final String topic; + private final int partition; + private final int port; + + public LowLevelConsumer(String topic, int partition, String seedBroker, int port, boolean findLeader) { + this.topic = topic; + this.partition = partition; + List seedBrokers = new ArrayList<>(); + seedBrokers.add(seedBroker); + this.port = port; + replicaBrokers = new ArrayList<>(); + + // find the meta data about the topic and partition we are interested in + PartitionMetadata metadata = findLeader(seedBrokers, port, topic, partition); + if (metadata == null) { + System.out.println("Can't find metadata for Topic and Partition. Exiting"); + return; + } + if (metadata.leader() == null) { + System.out.println("Can't find Leader for Topic and Partition. Exiting"); + return; + } + + clientName = "Client_" + topic + "_" + partition; + if (findLeader) { + leadBroker = metadata.leader().host(); + consumer = new SimpleConsumer(leadBroker, port, 1000000, 64 * 1024, clientName); + } + else { + leadBroker = seedBroker; + consumer = new SimpleConsumer(leadBroker, port, 1000000, 64 * 1024, clientName); + } + } + + public long startingOffset() { + long offset; + try { + offset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientName); + } catch(Exception e) { + e.printStackTrace(); + return 0L; + } + return offset; + } + + public long endingOffset() { + long offset; + try { + offset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName); + } catch(Exception e) { + e.printStackTrace(); + return 0L; + } + return offset; + } + + public Set retrieveData(long offsetToRead, int bytesToRead) { + long readOffset = offsetToRead; + Set messages = new LinkedHashSet<>(); + FetchRequest req = new FetchRequestBuilder() + .clientId(clientName) + .addFetch(topic, partition, readOffset, bytesToRead) + .build(); + FetchResponse fetchResponse = consumer.fetch(req); + + if (fetchResponse.hasError()) { + short code = fetchResponse.errorCode(topic, partition); + log.error("Error fetching data from the Broker:" + leadBroker + " Reason: " + code); + return messages; +// if (code == ErrorMapping.OffsetOutOfRangeCode()) { +// // We asked for an invalid offset. For simple case ask for the last element to reset +// readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName); +// continue; +// } +// try { +// leadBroker = findNewLeader(leadBroker, topic, partition, port); +// } catch (Exception e) { +// KafkaClientTestHarness.log.error("Unable to find new lead broker."); +// return messages; +// } +// continue; + } + + for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) { + long currentOffset = messageAndOffset.offset(); + if (currentOffset < readOffset) { + log.error("Found an old offset: " + currentOffset + " Expecting: " + readOffset); + continue; + } + readOffset = messageAndOffset.nextOffset(); + ByteBuffer payload = messageAndOffset.message().payload(); + + byte[] bytes = new byte[payload.limit()]; + payload.get(bytes); + try { + messages.add(new String(bytes, "UTF-8")); + } catch (Exception e) { + log.error("Failed to append message.", e); + } + } + return messages; + } + + public void closeConsumers() { + if(consumer != null) consumer.close(); + } + + private static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { + TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); + Map requestInfo = new HashMap<>(); + requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1)); + kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest( + requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName); + OffsetResponse response = consumer.getOffsetsBefore(request); + + if (response.hasError()) { + log.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition)); + return 0; + } + long[] offsets = response.offsets(topic, partition); + return offsets[0]; + } + + private String findNewLeader(String oldLeader, String topic, int partition, int port) throws Exception { + for (int i = 0; i < 3; i++) { + PartitionMetadata metadata = findLeader(replicaBrokers, port, topic, partition); + if (metadata != null && + metadata.leader() != null && + !(oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0)) { + + // first time through if the leader hasn't changed give ZooKeeper a second to recover + // second time, assume the broker did recover before failover, or it was a non-Broker issue + + return metadata.leader().host(); + } + + try { + Thread.sleep(500); + } catch (InterruptedException ie) { + ie.printStackTrace(); + } + } + log.error("Unable to find new leader after Broker failure. Exiting"); + throw new Exception("Unable to find new leader after Broker failure. Exiting"); + } + + private PartitionMetadata findLeader(List seedBrokers, int port, String topic, int partition) { + PartitionMetadata returnMetaData = null; + loop: + for (String seed : seedBrokers) { + SimpleConsumer tempConsumer = null; + try { + tempConsumer = new SimpleConsumer(seed, port, 100000, 64 * 1024, "leaderLookup"); + List topics = Collections.singletonList(topic); + TopicMetadataRequest request= new TopicMetadataRequest(topics); + kafka.javaapi.TopicMetadataResponse resp = tempConsumer.send(request); + + List metaData = resp.topicsMetadata(); + for (TopicMetadata item : metaData) { + for (PartitionMetadata part : item.partitionsMetadata()) { + if (part.partitionId() == partition) { + returnMetaData = part; + break loop; + } + } + } + } catch (Exception e) { + log.error("Error communicating with Broker [" + seed + "] to find Leader for [" + topic + + ", " + partition + "]", e); + } finally { + if (tempConsumer != null) + tempConsumer.close(); + } + } + if (returnMetaData != null) { + replicaBrokers.clear(); + for (kafka.cluster.Broker replica : returnMetaData.replicas()) { + replicaBrokers.add(replica.host()); + } + } + return returnMetaData; + } +} \ No newline at end of file diff --git a/app/kafka/consumer/async/AsyncLowLevelConsumer.scala b/app/kafka/consumer/async/AsyncLowLevelConsumer.scala new file mode 100644 index 0000000..2b3fe20 --- /dev/null +++ b/app/kafka/consumer/async/AsyncLowLevelConsumer.scala @@ -0,0 +1,31 @@ +package kafka.consumer.async + +import kafka.consumer.LowLevelConsumer +import scala.concurrent.future +import scala.concurrent.Future +import play.api.libs.concurrent.Execution.Implicits.defaultContext + +/** + * Created by isaacbanner on 12/16/14. + */ + +class AsyncLowLevelConsumer(llc: LowLevelConsumer) { + + val consumer: LowLevelConsumer = llc + + def offset: Future[Long] = future { + consumer.endingOffset() + } + + def close = future { + consumer.closeConsumers() + } + +} + +object AsyncLowLevelConsumer { + def apply(topic: String, partition: Int, seedBroker: String, port: Int, findLeader: Boolean = true) = future { + val llc: LowLevelConsumer = new LowLevelConsumer(topic, partition, seedBroker, port, findLeader) + new AsyncLowLevelConsumer(llc) + } +} From 762a277ca31b7df4ae8b2d3bd88f74ba59e20a14 Mon Sep 17 00:00:00 2001 From: Isaac Banner Date: Tue, 16 Dec 2014 16:53:54 -0500 Subject: [PATCH 08/13] Added author attributions --- app/common/Util.scala | 1 + app/kafka/consumer/LowLevelConsumer.java | 14 ++--------- .../async/AsyncLowLevelConsumer.scala | 24 +++++++++++++++---- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/app/common/Util.scala b/app/common/Util.scala index 58f1e09..4a94a2a 100644 --- a/app/common/Util.scala +++ b/app/common/Util.scala @@ -1,5 +1,6 @@ /* * Copyright 2014 Claude Mamo + * Some changes Copyright 2014 Isaac Banner | ibanner56 * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of diff --git a/app/kafka/consumer/LowLevelConsumer.java b/app/kafka/consumer/LowLevelConsumer.java index 302a997..8b9b695 100644 --- a/app/kafka/consumer/LowLevelConsumer.java +++ b/app/kafka/consumer/LowLevelConsumer.java @@ -14,6 +14,8 @@ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations under * the License. + * + * Author: Andrew Zafft | azafty468 */ package kafka.consumer; @@ -109,18 +111,6 @@ public Set retrieveData(long offsetToRead, int bytesToRead) { short code = fetchResponse.errorCode(topic, partition); log.error("Error fetching data from the Broker:" + leadBroker + " Reason: " + code); return messages; -// if (code == ErrorMapping.OffsetOutOfRangeCode()) { -// // We asked for an invalid offset. For simple case ask for the last element to reset -// readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName); -// continue; -// } -// try { -// leadBroker = findNewLeader(leadBroker, topic, partition, port); -// } catch (Exception e) { -// KafkaClientTestHarness.log.error("Unable to find new lead broker."); -// return messages; -// } -// continue; } for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) { diff --git a/app/kafka/consumer/async/AsyncLowLevelConsumer.scala b/app/kafka/consumer/async/AsyncLowLevelConsumer.scala index 2b3fe20..ac78c26 100644 --- a/app/kafka/consumer/async/AsyncLowLevelConsumer.scala +++ b/app/kafka/consumer/async/AsyncLowLevelConsumer.scala @@ -1,3 +1,23 @@ +/** + * Copyright (C) 2014 the original author or authors. + * See the LICENCE.txt file distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + * Author: Isaac Banner | ibanner56 + */ + package kafka.consumer.async import kafka.consumer.LowLevelConsumer @@ -5,10 +25,6 @@ import scala.concurrent.future import scala.concurrent.Future import play.api.libs.concurrent.Execution.Implicits.defaultContext -/** - * Created by isaacbanner on 12/16/14. - */ - class AsyncLowLevelConsumer(llc: LowLevelConsumer) { val consumer: LowLevelConsumer = llc From d40ab8c2c6192842ec976fb789764721931657a4 Mon Sep 17 00:00:00 2001 From: Isaac Banner Date: Tue, 16 Dec 2014 16:59:04 -0500 Subject: [PATCH 09/13] Removed unused imports --- app/common/Util.scala | 5 +---- app/kafka/consumer/LowLevelConsumer.java | 2 -- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/app/common/Util.scala b/app/common/Util.scala index 4a94a2a..9aa2843 100644 --- a/app/common/Util.scala +++ b/app/common/Util.scala @@ -26,10 +26,7 @@ import com.twitter.zk.{ZNode, ZkClient} import common.Registry.PropertyConstants import models.Zookeeper import play.api.libs.concurrent.Execution.Implicits.defaultContext -import org.apache.zookeeper.KeeperException.{NotEmptyException, NodeExistsException, NoNodeException} -import okapies.finagle.Kafka -import okapies.finagle.kafka.Client -import kafka.api.OffsetRequest +import org.apache.zookeeper.KeeperException.NoNodeException object Util { diff --git a/app/kafka/consumer/LowLevelConsumer.java b/app/kafka/consumer/LowLevelConsumer.java index 8b9b695..2d345a7 100644 --- a/app/kafka/consumer/LowLevelConsumer.java +++ b/app/kafka/consumer/LowLevelConsumer.java @@ -23,11 +23,9 @@ import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; -import kafka.common.ErrorMapping; import kafka.common.TopicAndPartition; import kafka.javaapi.*; import kafka.javaapi.consumer.SimpleConsumer; -import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.MessageAndOffset; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From 112bbdb516685530f339e09a98a7017d3a35e1b4 Mon Sep 17 00:00:00 2001 From: Isaac Banner Date: Tue, 16 Dec 2014 17:02:47 -0500 Subject: [PATCH 10/13] future is deprecated, use Future --- app/kafka/consumer/async/AsyncLowLevelConsumer.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/app/kafka/consumer/async/AsyncLowLevelConsumer.scala b/app/kafka/consumer/async/AsyncLowLevelConsumer.scala index ac78c26..963c7d5 100644 --- a/app/kafka/consumer/async/AsyncLowLevelConsumer.scala +++ b/app/kafka/consumer/async/AsyncLowLevelConsumer.scala @@ -21,7 +21,6 @@ package kafka.consumer.async import kafka.consumer.LowLevelConsumer -import scala.concurrent.future import scala.concurrent.Future import play.api.libs.concurrent.Execution.Implicits.defaultContext @@ -29,18 +28,18 @@ class AsyncLowLevelConsumer(llc: LowLevelConsumer) { val consumer: LowLevelConsumer = llc - def offset: Future[Long] = future { + def offset: Future[Long] = Future { consumer.endingOffset() } - def close = future { + def close = Future { consumer.closeConsumers() } } object AsyncLowLevelConsumer { - def apply(topic: String, partition: Int, seedBroker: String, port: Int, findLeader: Boolean = true) = future { + def apply(topic: String, partition: Int, seedBroker: String, port: Int, findLeader: Boolean = true) = Future { val llc: LowLevelConsumer = new LowLevelConsumer(topic, partition, seedBroker, port, findLeader) new AsyncLowLevelConsumer(llc) } From da12d4fce36069040f0781d029b21053e2064a94 Mon Sep 17 00:00:00 2001 From: Isaac Banner Date: Tue, 16 Dec 2014 17:05:19 -0500 Subject: [PATCH 11/13] Removed unnecessary line. --- app/common/Util.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/app/common/Util.scala b/app/common/Util.scala index 9aa2843..b849faa 100644 --- a/app/common/Util.scala +++ b/app/common/Util.scala @@ -70,10 +70,7 @@ object Util { val partition = tuple._2 AsyncLowLevelConsumer(topicName, partition, hostAndPort(0), hostAndPort(1).toInt) }) - partitionsLogSize <- Future.sequence(clients.map { client => - val offset = client.offset - offset - }) + partitionsLogSize <- Future.sequence(clients.map(client => client.offset)) closeClients <- Future.sequence(clients.map(client => client.close)) } yield partitionsLogSize } From 897411cbe5da7b6f0aaa95ca323aab8175a49122 Mon Sep 17 00:00:00 2001 From: Isaac Banner Date: Tue, 16 Dec 2014 17:06:39 -0500 Subject: [PATCH 12/13] Uncomment a logger statement --- app/common/Util.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/common/Util.scala b/app/common/Util.scala index b849faa..18ab7fc 100644 --- a/app/common/Util.scala +++ b/app/common/Util.scala @@ -62,7 +62,7 @@ object Util { } def getPartitionsLogSize(topicName: String, partitionLeaders: Seq[String]): Future[Seq[Long]] = { - // Logger.debug("Getting partition log sizes for topic " + topicName + " from partition leaders " + partitionLeaders.mkString(", ")) + Logger.debug("Getting partition log sizes for topic " + topicName + " from partition leaders " + partitionLeaders.mkString(", ")) return for { clients <- Future.sequence(partitionLeaders.zipWithIndex.map {tuple => From b66c56aa1a14b99787659d0429652d6623db00db Mon Sep 17 00:00:00 2001 From: Isaac Banner Date: Wed, 17 Dec 2014 10:54:45 -0500 Subject: [PATCH 13/13] Sped up LowLevelConsumer, since we don't need to worry about writes. --- app/kafka/consumer/LowLevelConsumer.java | 174 +++--------------- .../async/AsyncLowLevelConsumer.scala | 17 +- 2 files changed, 35 insertions(+), 156 deletions(-) diff --git a/app/kafka/consumer/LowLevelConsumer.java b/app/kafka/consumer/LowLevelConsumer.java index 2d345a7..2be3a55 100644 --- a/app/kafka/consumer/LowLevelConsumer.java +++ b/app/kafka/consumer/LowLevelConsumer.java @@ -1,84 +1,59 @@ /** -* Copyright (C) 2014 the original author or authors. -* See the LICENCE.txt file distributed with this work for additional -* information regarding copyright ownership. -* -* Licensed under the Apache License, Version 2.0 (the "License"); you may not -* use this file except in compliance with the License. You may obtain a copy of -* the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -* License for the specific language governing permissions and limitations under -* the License. + * Copyright (C) 2014 the original author or authors and Enernoc Inc. + * See the LICENCE.txt file distributed with this work for additional + * information regarding copyright ownership. * - * Author: Andrew Zafft | azafty468 -*/ + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + * + * Authors: Andrew Zafft | azafty468, Isaac Banner | ibanner56 + */ package kafka.consumer; -import kafka.api.FetchRequest; -import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; import kafka.common.TopicAndPartition; import kafka.javaapi.*; import kafka.javaapi.consumer.SimpleConsumer; -import kafka.message.MessageAndOffset; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import java.nio.ByteBuffer; -import java.util.*; +import java.util.Map; +import java.util.HashMap; public class LowLevelConsumer { static final Logger log = LoggerFactory.getLogger(LowLevelConsumer.class); private SimpleConsumer consumer; - private List replicaBrokers = new ArrayList<>(); private String leadBroker; private String clientName; private final String topic; private final int partition; private final int port; - public LowLevelConsumer(String topic, int partition, String seedBroker, int port, boolean findLeader) { + public LowLevelConsumer(String topic, int partition, String seedBroker, int port) { this.topic = topic; this.partition = partition; - List seedBrokers = new ArrayList<>(); - seedBrokers.add(seedBroker); + this.leadBroker = seedBroker; this.port = port; - replicaBrokers = new ArrayList<>(); - - // find the meta data about the topic and partition we are interested in - PartitionMetadata metadata = findLeader(seedBrokers, port, topic, partition); - if (metadata == null) { - System.out.println("Can't find metadata for Topic and Partition. Exiting"); - return; - } - if (metadata.leader() == null) { - System.out.println("Can't find Leader for Topic and Partition. Exiting"); - return; - } - clientName = "Client_" + topic + "_" + partition; - if (findLeader) { - leadBroker = metadata.leader().host(); - consumer = new SimpleConsumer(leadBroker, port, 1000000, 64 * 1024, clientName); - } - else { - leadBroker = seedBroker; - consumer = new SimpleConsumer(leadBroker, port, 1000000, 64 * 1024, clientName); - } + + consumer = new SimpleConsumer(leadBroker, port, 1000000, 64 * 1024, clientName); } public long startingOffset() { long offset; try { offset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientName); - } catch(Exception e) { + } catch (Exception e) { e.printStackTrace(); return 0L; } @@ -89,50 +64,15 @@ public long endingOffset() { long offset; try { offset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName); - } catch(Exception e) { + } catch (Exception e) { e.printStackTrace(); return 0L; } return offset; } - public Set retrieveData(long offsetToRead, int bytesToRead) { - long readOffset = offsetToRead; - Set messages = new LinkedHashSet<>(); - FetchRequest req = new FetchRequestBuilder() - .clientId(clientName) - .addFetch(topic, partition, readOffset, bytesToRead) - .build(); - FetchResponse fetchResponse = consumer.fetch(req); - - if (fetchResponse.hasError()) { - short code = fetchResponse.errorCode(topic, partition); - log.error("Error fetching data from the Broker:" + leadBroker + " Reason: " + code); - return messages; - } - - for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) { - long currentOffset = messageAndOffset.offset(); - if (currentOffset < readOffset) { - log.error("Found an old offset: " + currentOffset + " Expecting: " + readOffset); - continue; - } - readOffset = messageAndOffset.nextOffset(); - ByteBuffer payload = messageAndOffset.message().payload(); - - byte[] bytes = new byte[payload.limit()]; - payload.get(bytes); - try { - messages.add(new String(bytes, "UTF-8")); - } catch (Exception e) { - log.error("Failed to append message.", e); - } - } - return messages; - } - public void closeConsumers() { - if(consumer != null) consumer.close(); + if (consumer != null) consumer.close(); } private static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) { @@ -150,64 +90,4 @@ private static long getLastOffset(SimpleConsumer consumer, String topic, int par long[] offsets = response.offsets(topic, partition); return offsets[0]; } - - private String findNewLeader(String oldLeader, String topic, int partition, int port) throws Exception { - for (int i = 0; i < 3; i++) { - PartitionMetadata metadata = findLeader(replicaBrokers, port, topic, partition); - if (metadata != null && - metadata.leader() != null && - !(oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0)) { - - // first time through if the leader hasn't changed give ZooKeeper a second to recover - // second time, assume the broker did recover before failover, or it was a non-Broker issue - - return metadata.leader().host(); - } - - try { - Thread.sleep(500); - } catch (InterruptedException ie) { - ie.printStackTrace(); - } - } - log.error("Unable to find new leader after Broker failure. Exiting"); - throw new Exception("Unable to find new leader after Broker failure. Exiting"); - } - - private PartitionMetadata findLeader(List seedBrokers, int port, String topic, int partition) { - PartitionMetadata returnMetaData = null; - loop: - for (String seed : seedBrokers) { - SimpleConsumer tempConsumer = null; - try { - tempConsumer = new SimpleConsumer(seed, port, 100000, 64 * 1024, "leaderLookup"); - List topics = Collections.singletonList(topic); - TopicMetadataRequest request= new TopicMetadataRequest(topics); - kafka.javaapi.TopicMetadataResponse resp = tempConsumer.send(request); - - List metaData = resp.topicsMetadata(); - for (TopicMetadata item : metaData) { - for (PartitionMetadata part : item.partitionsMetadata()) { - if (part.partitionId() == partition) { - returnMetaData = part; - break loop; - } - } - } - } catch (Exception e) { - log.error("Error communicating with Broker [" + seed + "] to find Leader for [" + topic - + ", " + partition + "]", e); - } finally { - if (tempConsumer != null) - tempConsumer.close(); - } - } - if (returnMetaData != null) { - replicaBrokers.clear(); - for (kafka.cluster.Broker replica : returnMetaData.replicas()) { - replicaBrokers.add(replica.host()); - } - } - return returnMetaData; - } -} \ No newline at end of file +} diff --git a/app/kafka/consumer/async/AsyncLowLevelConsumer.scala b/app/kafka/consumer/async/AsyncLowLevelConsumer.scala index 963c7d5..599d2fd 100644 --- a/app/kafka/consumer/async/AsyncLowLevelConsumer.scala +++ b/app/kafka/consumer/async/AsyncLowLevelConsumer.scala @@ -1,5 +1,5 @@ /** - * Copyright (C) 2014 the original author or authors. + * Copyright (C) 2014 the original author or authors and Enernoc Inc. * See the LICENCE.txt file distributed with this work for additional * information regarding copyright ownership. * @@ -21,26 +21,25 @@ package kafka.consumer.async import kafka.consumer.LowLevelConsumer +import scala.concurrent.future import scala.concurrent.Future import play.api.libs.concurrent.Execution.Implicits.defaultContext -class AsyncLowLevelConsumer(llc: LowLevelConsumer) { +class AsyncLowLevelConsumer(consumer: LowLevelConsumer) { - val consumer: LowLevelConsumer = llc - - def offset: Future[Long] = Future { + def offset: Future[Long] = future { consumer.endingOffset() } - def close = Future { + def close = future { consumer.closeConsumers() } } object AsyncLowLevelConsumer { - def apply(topic: String, partition: Int, seedBroker: String, port: Int, findLeader: Boolean = true) = Future { - val llc: LowLevelConsumer = new LowLevelConsumer(topic, partition, seedBroker, port, findLeader) + def apply(topic: String, partition: Int, seedBroker: String, port: Int) = future { + val llc: LowLevelConsumer = new LowLevelConsumer(topic, partition, seedBroker, port) new AsyncLowLevelConsumer(llc) } -} +} \ No newline at end of file