aboutsummaryrefslogtreecommitdiff
path: root/external/kafka/src/test
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2014-11-14 14:33:37 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-11-14 14:33:37 -0800
commit5930f64bf0d2516304b21bd49eac361a54caabdd (patch)
tree4fc481c652e4c553e8c6ae4a3f87ed329908c73c /external/kafka/src/test
parent0cbdb01e1c817e71c4f80de05c4e5bb11510b368 (diff)
downloadspark-5930f64bf0d2516304b21bd49eac361a54caabdd.tar.gz
spark-5930f64bf0d2516304b21bd49eac361a54caabdd.tar.bz2
spark-5930f64bf0d2516304b21bd49eac361a54caabdd.zip
[SPARK-4062][Streaming]Add ReliableKafkaReceiver in Spark Streaming Kafka connector
Add ReliableKafkaReceiver in Kafka connector to prevent data loss if WAL in Spark Streaming is enabled. Details and design doc can be seen in [SPARK-4062](https://issues.apache.org/jira/browse/SPARK-4062). Author: jerryshao <saisai.shao@intel.com> Author: Tathagata Das <tathagata.das1565@gmail.com> Author: Saisai Shao <saisai.shao@intel.com> Closes #2991 from jerryshao/kafka-refactor and squashes the following commits: 5461f1c [Saisai Shao] Merge pull request #8 from tdas/kafka-refactor3 eae4ad6 [Tathagata Das] Refectored KafkaStreamSuiteBased to eliminate KafkaTestUtils and made Java more robust. fab14c7 [Tathagata Das] minor update. 149948b [Tathagata Das] Fixed mistake 14630aa [Tathagata Das] Minor updates. d9a452c [Tathagata Das] Minor updates. ec2e95e [Tathagata Das] Removed the receiver's locks and essentially reverted to Saisai's original design. 2a20a01 [jerryshao] Address some comments 9f636b3 [Saisai Shao] Merge pull request #5 from tdas/kafka-refactor b2b2f84 [Tathagata Das] Refactored Kafka receiver logic and Kafka testsuites e501b3c [jerryshao] Add Mima excludes b798535 [jerryshao] Fix the missed issue e5e21c1 [jerryshao] Change to while loop ea873e4 [jerryshao] Further address the comments 98f3d07 [jerryshao] Fix comment style 4854ee9 [jerryshao] Address all the comments 96c7a1d [jerryshao] Update the ReliableKafkaReceiver unit test 8135d31 [jerryshao] Fix flaky test a949741 [jerryshao] Address the comments 16bfe78 [jerryshao] Change the ordering of imports 0894aef [jerryshao] Add some comments 77c3e50 [jerryshao] Code refactor and add some unit tests dd9aeeb [jerryshao] Initial commit for reliable Kafka receiver
Diffstat (limited to 'external/kafka/src/test')
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java44
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala216
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala140
3 files changed, 283 insertions, 117 deletions
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index efb0099c7c..6e1abf3f38 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -20,7 +20,10 @@ package org.apache.spark.streaming.kafka;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
+import java.util.Random;
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.Duration;
import scala.Predef;
import scala.Tuple2;
import scala.collection.JavaConverters;
@@ -32,8 +35,6 @@ import kafka.serializer.StringDecoder;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
@@ -42,25 +43,27 @@ import org.junit.Test;
import org.junit.After;
import org.junit.Before;
-public class JavaKafkaStreamSuite extends LocalJavaStreamingContext implements Serializable {
- private transient KafkaStreamSuite testSuite = new KafkaStreamSuite();
+public class JavaKafkaStreamSuite implements Serializable {
+ private transient JavaStreamingContext ssc = null;
+ private transient Random random = new Random();
+ private transient KafkaStreamSuiteBase suiteBase = null;
@Before
- @Override
public void setUp() {
- testSuite.beforeFunction();
+ suiteBase = new KafkaStreamSuiteBase() { };
+ suiteBase.setupKafka();
System.clearProperty("spark.driver.port");
- //System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock");
- ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ SparkConf sparkConf = new SparkConf()
+ .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
+ ssc = new JavaStreamingContext(sparkConf, new Duration(500));
}
@After
- @Override
public void tearDown() {
ssc.stop();
ssc = null;
System.clearProperty("spark.driver.port");
- testSuite.afterFunction();
+ suiteBase.tearDownKafka();
}
@Test
@@ -74,15 +77,15 @@ public class JavaKafkaStreamSuite extends LocalJavaStreamingContext implements S
sent.put("b", 3);
sent.put("c", 10);
- testSuite.createTopic(topic);
+ suiteBase.createTopic(topic);
HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
- testSuite.produceAndSendMessage(topic,
- JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
- Predef.<Tuple2<String, Object>>conforms()));
+ suiteBase.produceAndSendMessage(topic,
+ JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
+ Predef.<Tuple2<String, Object>>conforms()));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
- kafkaParams.put("zookeeper.connect", testSuite.zkHost() + ":" + testSuite.zkPort());
- kafkaParams.put("group.id", "test-consumer-" + KafkaTestUtils.random().nextInt(10000));
+ kafkaParams.put("zookeeper.connect", suiteBase.zkAddress());
+ kafkaParams.put("group.id", "test-consumer-" + random.nextInt(10000));
kafkaParams.put("auto.offset.reset", "smallest");
JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc,
@@ -124,11 +127,16 @@ public class JavaKafkaStreamSuite extends LocalJavaStreamingContext implements S
);
ssc.start();
- ssc.awaitTermination(3000);
-
+ long startTime = System.currentTimeMillis();
+ boolean sizeMatches = false;
+ while (!sizeMatches && System.currentTimeMillis() - startTime < 20000) {
+ sizeMatches = sent.size() == result.size();
+ Thread.sleep(200);
+ }
Assert.assertEquals(sent.size(), result.size());
for (String k : sent.keySet()) {
Assert.assertEquals(sent.get(k).intValue(), result.get(k).intValue());
}
+ ssc.stop();
}
}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index 6943326eb7..b19c053ebf 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -19,51 +19,57 @@ package org.apache.spark.streaming.kafka
import java.io.File
import java.net.InetSocketAddress
-import java.util.{Properties, Random}
+import java.util.Properties
import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.Random
import kafka.admin.CreateTopicCommand
import kafka.common.{KafkaException, TopicAndPartition}
-import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
-import kafka.utils.ZKStringSerializer
+import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import kafka.serializer.{StringDecoder, StringEncoder}
import kafka.server.{KafkaConfig, KafkaServer}
-
+import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.ZkClient
+import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually
-import org.apache.zookeeper.server.ZooKeeperServer
-import org.apache.zookeeper.server.NIOServerCnxnFactory
-
-import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
+import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.util.Utils
-class KafkaStreamSuite extends TestSuiteBase {
- import KafkaTestUtils._
-
- val zkHost = "localhost"
- var zkPort: Int = 0
- val zkConnectionTimeout = 6000
- val zkSessionTimeout = 6000
-
- protected var brokerPort = 9092
- protected var brokerConf: KafkaConfig = _
- protected var zookeeper: EmbeddedZookeeper = _
- protected var zkClient: ZkClient = _
- protected var server: KafkaServer = _
- protected var producer: Producer[String, String] = _
-
- override def useManualClock = false
-
- override def beforeFunction() {
+/**
+ * This is an abstract base class for Kafka testsuites. This has the functionality to set up
+ * and tear down local Kafka servers, and to push data using Kafka producers.
+ */
+abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Logging {
+
+ var zkAddress: String = _
+ var zkClient: ZkClient = _
+
+ private val zkHost = "localhost"
+ private val zkConnectionTimeout = 6000
+ private val zkSessionTimeout = 6000
+ private var zookeeper: EmbeddedZookeeper = _
+ private var zkPort: Int = 0
+ private var brokerPort = 9092
+ private var brokerConf: KafkaConfig = _
+ private var server: KafkaServer = _
+ private var producer: Producer[String, String] = _
+
+ def setupKafka() {
// Zookeeper server startup
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
// Get the actual zookeeper binding port
zkPort = zookeeper.actualPort
+ zkAddress = s"$zkHost:$zkPort"
logInfo("==================== 0 ====================")
- zkClient = new ZkClient(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout,
+ zkClient = new ZkClient(zkAddress, zkSessionTimeout, zkConnectionTimeout,
ZKStringSerializer)
logInfo("==================== 1 ====================")
@@ -71,7 +77,7 @@ class KafkaStreamSuite extends TestSuiteBase {
var bindSuccess: Boolean = false
while(!bindSuccess) {
try {
- val brokerProps = getBrokerConfig(brokerPort, s"$zkHost:$zkPort")
+ val brokerProps = getBrokerConfig()
brokerConf = new KafkaConfig(brokerProps)
server = new KafkaServer(brokerConf)
logInfo("==================== 2 ====================")
@@ -89,53 +95,30 @@ class KafkaStreamSuite extends TestSuiteBase {
Thread.sleep(2000)
logInfo("==================== 4 ====================")
- super.beforeFunction()
}
- override def afterFunction() {
- producer.close()
- server.shutdown()
- brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
-
- zkClient.close()
- zookeeper.shutdown()
-
- super.afterFunction()
- }
-
- test("Kafka input stream") {
- val ssc = new StreamingContext(master, framework, batchDuration)
- val topic = "topic1"
- val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
- createTopic(topic)
- produceAndSendMessage(topic, sent)
+ def tearDownKafka() {
+ if (producer != null) {
+ producer.close()
+ producer = null
+ }
- val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort",
- "group.id" -> s"test-consumer-${random.nextInt(10000)}",
- "auto.offset.reset" -> "smallest")
+ if (server != null) {
+ server.shutdown()
+ server = null
+ }
- val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
- ssc,
- kafkaParams,
- Map(topic -> 1),
- StorageLevel.MEMORY_ONLY)
- val result = new mutable.HashMap[String, Long]()
- stream.map { case (k, v) => v }
- .countByValue()
- .foreachRDD { r =>
- val ret = r.collect()
- ret.toMap.foreach { kv =>
- val count = result.getOrElseUpdate(kv._1, 0) + kv._2
- result.put(kv._1, count)
- }
- }
- ssc.start()
- ssc.awaitTermination(3000)
+ brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
- assert(sent.size === result.size)
- sent.keys.foreach { k => assert(sent(k) === result(k).toInt) }
+ if (zkClient != null) {
+ zkClient.close()
+ zkClient = null
+ }
- ssc.stop()
+ if (zookeeper != null) {
+ zookeeper.shutdown()
+ zookeeper = null
+ }
}
private def createTestMessage(topic: String, sent: Map[String, Int])
@@ -150,58 +133,43 @@ class KafkaStreamSuite extends TestSuiteBase {
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
logInfo("==================== 5 ====================")
// wait until metadata is propagated
- waitUntilMetadataIsPropagated(Seq(server), topic, 0, 1000)
+ waitUntilMetadataIsPropagated(topic, 0)
}
def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
- val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
- producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
+ producer = new Producer[String, String](new ProducerConfig(getProducerConfig()))
producer.send(createTestMessage(topic, sent): _*)
+ producer.close()
logInfo("==================== 6 ====================")
}
-}
-
-object KafkaTestUtils {
- val random = new Random()
- def getBrokerConfig(port: Int, zkConnect: String): Properties = {
+ private def getBrokerConfig(): Properties = {
val props = new Properties()
props.put("broker.id", "0")
props.put("host.name", "localhost")
- props.put("port", port.toString)
+ props.put("port", brokerPort.toString)
props.put("log.dir", Utils.createTempDir().getAbsolutePath)
- props.put("zookeeper.connect", zkConnect)
+ props.put("zookeeper.connect", zkAddress)
props.put("log.flush.interval.messages", "1")
props.put("replica.socket.timeout.ms", "1500")
props
}
- def getProducerConfig(brokerList: String): Properties = {
+ private def getProducerConfig(): Properties = {
+ val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
val props = new Properties()
- props.put("metadata.broker.list", brokerList)
+ props.put("metadata.broker.list", brokerAddr)
props.put("serializer.class", classOf[StringEncoder].getName)
props
}
- def waitUntilTrue(condition: () => Boolean, waitTime: Long): Boolean = {
- val startTime = System.currentTimeMillis()
- while (true) {
- if (condition())
- return true
- if (System.currentTimeMillis() > startTime + waitTime)
- return false
- Thread.sleep(waitTime.min(100L))
+ private def waitUntilMetadataIsPropagated(topic: String, partition: Int) {
+ eventually(timeout(1000 milliseconds), interval(100 milliseconds)) {
+ assert(
+ server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, partition)),
+ s"Partition [$topic, $partition] metadata not propagated after timeout"
+ )
}
- // Should never go to here
- throw new RuntimeException("unexpected error")
- }
-
- def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int,
- timeout: Long) {
- assert(waitUntilTrue(() =>
- servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(
- TopicAndPartition(topic, partition))), timeout),
- s"Partition [$topic, $partition] metadata not propagated after timeout")
}
class EmbeddedZookeeper(val zkConnect: String) {
@@ -227,3 +195,53 @@ object KafkaTestUtils {
}
}
}
+
+
+class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
+ var ssc: StreamingContext = _
+
+ before {
+ setupKafka()
+ }
+
+ after {
+ if (ssc != null) {
+ ssc.stop()
+ ssc = null
+ }
+ tearDownKafka()
+ }
+
+ test("Kafka input stream") {
+ val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
+ ssc = new StreamingContext(sparkConf, Milliseconds(500))
+ val topic = "topic1"
+ val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
+ createTopic(topic)
+ produceAndSendMessage(topic, sent)
+
+ val kafkaParams = Map("zookeeper.connect" -> zkAddress,
+ "group.id" -> s"test-consumer-${Random.nextInt(10000)}",
+ "auto.offset.reset" -> "smallest")
+
+ val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
+ val result = new mutable.HashMap[String, Long]()
+ stream.map(_._2).countByValue().foreachRDD { r =>
+ val ret = r.collect()
+ ret.toMap.foreach { kv =>
+ val count = result.getOrElseUpdate(kv._1, 0) + kv._2
+ result.put(kv._1, count)
+ }
+ }
+ ssc.start()
+ eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
+ assert(sent.size === result.size)
+ sent.keys.foreach { k =>
+ assert(sent(k) === result(k).toInt)
+ }
+ }
+ ssc.stop()
+ }
+}
+
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
new file mode 100644
index 0000000000..64ccc92c81
--- /dev/null
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+
+import java.io.File
+
+import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.Random
+
+import com.google.common.io.Files
+import kafka.serializer.StringDecoder
+import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
+import org.apache.commons.io.FileUtils
+import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+
+class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
+
+ val sparkConf = new SparkConf()
+ .setMaster("local[4]")
+ .setAppName(this.getClass.getSimpleName)
+ .set("spark.streaming.receiver.writeAheadLog.enable", "true")
+ val data = Map("a" -> 10, "b" -> 10, "c" -> 10)
+
+
+ var groupId: String = _
+ var kafkaParams: Map[String, String] = _
+ var ssc: StreamingContext = _
+ var tempDirectory: File = null
+
+ before {
+ setupKafka()
+ groupId = s"test-consumer-${Random.nextInt(10000)}"
+ kafkaParams = Map(
+ "zookeeper.connect" -> zkAddress,
+ "group.id" -> groupId,
+ "auto.offset.reset" -> "smallest"
+ )
+
+ ssc = new StreamingContext(sparkConf, Milliseconds(500))
+ tempDirectory = Files.createTempDir()
+ ssc.checkpoint(tempDirectory.getAbsolutePath)
+ }
+
+ after {
+ if (ssc != null) {
+ ssc.stop()
+ }
+ if (tempDirectory != null && tempDirectory.exists()) {
+ FileUtils.deleteDirectory(tempDirectory)
+ tempDirectory = null
+ }
+ tearDownKafka()
+ }
+
+
+ test("Reliable Kafka input stream with single topic") {
+ var topic = "test-topic"
+ createTopic(topic)
+ produceAndSendMessage(topic, data)
+
+ // Verify whether the offset of this group/topic/partition is 0 before starting.
+ assert(getCommitOffset(groupId, topic, 0) === None)
+
+ val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
+ val result = new mutable.HashMap[String, Long]()
+ stream.map { case (k, v) => v }.foreachRDD { r =>
+ val ret = r.collect()
+ ret.foreach { v =>
+ val count = result.getOrElseUpdate(v, 0) + 1
+ result.put(v, count)
+ }
+ }
+ ssc.start()
+ eventually(timeout(20000 milliseconds), interval(200 milliseconds)) {
+ // A basic process verification for ReliableKafkaReceiver.
+ // Verify whether received message number is equal to the sent message number.
+ assert(data.size === result.size)
+ // Verify whether each message is the same as the data to be verified.
+ data.keys.foreach { k => assert(data(k) === result(k).toInt) }
+ // Verify the offset number whether it is equal to the total message number.
+ assert(getCommitOffset(groupId, topic, 0) === Some(29L))
+ }
+ ssc.stop()
+ }
+
+ test("Reliable Kafka input stream with multiple topics") {
+ val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
+ topics.foreach { case (t, _) =>
+ createTopic(t)
+ produceAndSendMessage(t, data)
+ }
+
+ // Before started, verify all the group/topic/partition offsets are 0.
+ topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === None) }
+
+ // Consuming all the data sent to the broker which will potential commit the offsets internally.
+ val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY)
+ stream.foreachRDD(_ => Unit)
+ ssc.start()
+ eventually(timeout(20000 milliseconds), interval(100 milliseconds)) {
+ // Verify the offset for each group/topic to see whether they are equal to the expected one.
+ topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === Some(29L)) }
+ }
+ ssc.stop()
+ }
+
+
+ /** Getting partition offset from Zookeeper. */
+ private def getCommitOffset(groupId: String, topic: String, partition: Int): Option[Long] = {
+ assert(zkClient != null, "Zookeeper client is not initialized")
+ val topicDirs = new ZKGroupTopicDirs(groupId, topic)
+ val zkPath = s"${topicDirs.consumerOffsetDir}/$partition"
+ ZkUtils.readDataMaybeNull(zkClient, zkPath)._1.map(_.toLong)
+ }
+}