aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala33
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala4
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala282
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java44
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala216
-rw-r--r--external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala140
-rw-r--r--project/MimaExcludes.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala55
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala8
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala8
10 files changed, 651 insertions, 143 deletions
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index 28ac5929df..4d26b640e8 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -17,13 +17,12 @@
package org.apache.spark.streaming.kafka
+import java.util.Properties
+
import scala.collection.Map
import scala.reflect.{classTag, ClassTag}
-import java.util.Properties
-import java.util.concurrent.Executors
-
-import kafka.consumer._
+import kafka.consumer.{KafkaStream, Consumer, ConsumerConfig, ConsumerConnector}
import kafka.serializer.Decoder
import kafka.utils.VerifiableProperties
@@ -32,6 +31,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.Receiver
+import org.apache.spark.util.Utils
/**
* Input stream that pulls messages from a Kafka Broker.
@@ -51,12 +51,16 @@ class KafkaInputDStream[
@transient ssc_ : StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
+ useReliableReceiver: Boolean,
storageLevel: StorageLevel
) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {
def getReceiver(): Receiver[(K, V)] = {
- new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
- .asInstanceOf[Receiver[(K, V)]]
+ if (!useReliableReceiver) {
+ new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
+ } else {
+ new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
+ }
}
}
@@ -69,14 +73,15 @@ class KafkaReceiver[
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
- ) extends Receiver[Any](storageLevel) with Logging {
+ ) extends Receiver[(K, V)](storageLevel) with Logging {
// Connection to Kafka
- var consumerConnector : ConsumerConnector = null
+ var consumerConnector: ConsumerConnector = null
def onStop() {
if (consumerConnector != null) {
consumerConnector.shutdown()
+ consumerConnector = null
}
}
@@ -102,11 +107,11 @@ class KafkaReceiver[
.newInstance(consumerConfig.props)
.asInstanceOf[Decoder[V]]
- // Create Threads for each Topic/Message Stream we are listening
+ // Create threads for each topic/message Stream we are listening
val topicMessageStreams = consumerConnector.createMessageStreams(
topics, keyDecoder, valueDecoder)
- val executorPool = Executors.newFixedThreadPool(topics.values.sum)
+ val executorPool = Utils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")
try {
// Start the messages handler for each partition
topicMessageStreams.values.foreach { streams =>
@@ -117,13 +122,15 @@ class KafkaReceiver[
}
}
- // Handles Kafka Messages
- private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V])
+ // Handles Kafka messages
+ private class MessageHandler(stream: KafkaStream[K, V])
extends Runnable {
def run() {
logInfo("Starting MessageHandler.")
try {
- for (msgAndMetadata <- stream) {
+ val streamIterator = stream.iterator()
+ while (streamIterator.hasNext()) {
+ val msgAndMetadata = streamIterator.next()
store((msgAndMetadata.key, msgAndMetadata.message))
}
} catch {
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
index ec812e1ef3..b4ac929e0c 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala
@@ -70,7 +70,8 @@ object KafkaUtils {
topics: Map[String, Int],
storageLevel: StorageLevel
): ReceiverInputDStream[(K, V)] = {
- new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
+ val walEnabled = ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)
+ new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
}
/**
@@ -99,7 +100,6 @@ object KafkaUtils {
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel RDD storage level.
- *
*/
def createStream(
jssc: JavaStreamingContext,
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
new file mode 100644
index 0000000000..be734b8027
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.util.Properties
+import java.util.concurrent.{ThreadPoolExecutor, ConcurrentHashMap}
+
+import scala.collection.{Map, mutable}
+import scala.reflect.{ClassTag, classTag}
+
+import kafka.common.TopicAndPartition
+import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
+import kafka.message.MessageAndMetadata
+import kafka.serializer.Decoder
+import kafka.utils.{VerifiableProperties, ZKGroupTopicDirs, ZKStringSerializer, ZkUtils}
+import org.I0Itec.zkclient.ZkClient
+
+import org.apache.spark.{Logging, SparkEnv}
+import org.apache.spark.storage.{StorageLevel, StreamBlockId}
+import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver}
+import org.apache.spark.util.Utils
+
+/**
+ * ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
+ * It is turned off by default and will be enabled when
+ * spark.streaming.receiver.writeAheadLog.enable is true. The difference compared to KafkaReceiver
+ * is that this receiver manages topic-partition/offset itself and updates the offset information
+ * after data is reliably stored as write-ahead log. Offsets will only be updated when data is
+ * reliably stored, so the potential data loss problem of KafkaReceiver can be eliminated.
+ *
+ * Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn off automatic offset
+ * commit mechanism in Kafka consumer. So setting this configuration manually within kafkaParams
+ * will not take effect.
+ */
+private[streaming]
+class ReliableKafkaReceiver[
+ K: ClassTag,
+ V: ClassTag,
+ U <: Decoder[_]: ClassTag,
+ T <: Decoder[_]: ClassTag](
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ storageLevel: StorageLevel)
+ extends Receiver[(K, V)](storageLevel) with Logging {
+
+ private val groupId = kafkaParams("group.id")
+ private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
+ private def conf = SparkEnv.get.conf
+
+ /** High level consumer to connect to Kafka. */
+ private var consumerConnector: ConsumerConnector = null
+
+ /** zkClient to connect to Zookeeper to commit the offsets. */
+ private var zkClient: ZkClient = null
+
+ /**
+ * A HashMap to manage the offset for each topic/partition, this HashMap is called in
+ * synchronized block, so mutable HashMap will not meet concurrency issue.
+ */
+ private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null
+
+ /** A concurrent HashMap to store the stream block id and related offset snapshot. */
+ private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null
+
+ /**
+ * Manage the BlockGenerator in receiver itself for better managing block store and offset
+ * commit.
+ */
+ private var blockGenerator: BlockGenerator = null
+
+ /** Thread pool running the handlers for receiving message from multiple topics and partitions. */
+ private var messageHandlerThreadPool: ThreadPoolExecutor = null
+
+ override def onStart(): Unit = {
+ logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
+
+ // Initialize the topic-partition / offset hash map.
+ topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
+
+ // Initialize the stream block id / offset snapshot hash map.
+ blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]()
+
+ // Initialize the block generator for storing Kafka message.
+ blockGenerator = new BlockGenerator(new GeneratedBlockHandler, streamId, conf)
+
+ if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
+ logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
+ "otherwise we will manually set it to false to turn off auto offset commit in Kafka")
+ }
+
+ val props = new Properties()
+ kafkaParams.foreach(param => props.put(param._1, param._2))
+ // Manually set "auto.commit.enable" to "false" no matter user explicitly set it to true,
+ // we have to make sure this property is set to false to turn off auto commit mechanism in
+ // Kafka.
+ props.setProperty(AUTO_OFFSET_COMMIT, "false")
+
+ val consumerConfig = new ConsumerConfig(props)
+
+ assert(!consumerConfig.autoCommitEnable)
+
+ logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}")
+ consumerConnector = Consumer.create(consumerConfig)
+ logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}")
+
+ zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs,
+ consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
+
+ messageHandlerThreadPool = Utils.newDaemonFixedThreadPool(
+ topics.values.sum, "KafkaMessageHandler")
+
+ blockGenerator.start()
+
+ val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
+ .newInstance(consumerConfig.props)
+ .asInstanceOf[Decoder[K]]
+
+ val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
+ .newInstance(consumerConfig.props)
+ .asInstanceOf[Decoder[V]]
+
+ val topicMessageStreams = consumerConnector.createMessageStreams(
+ topics, keyDecoder, valueDecoder)
+
+ topicMessageStreams.values.foreach { streams =>
+ streams.foreach { stream =>
+ messageHandlerThreadPool.submit(new MessageHandler(stream))
+ }
+ }
+ }
+
+ override def onStop(): Unit = {
+ if (messageHandlerThreadPool != null) {
+ messageHandlerThreadPool.shutdown()
+ messageHandlerThreadPool = null
+ }
+
+ if (consumerConnector != null) {
+ consumerConnector.shutdown()
+ consumerConnector = null
+ }
+
+ if (zkClient != null) {
+ zkClient.close()
+ zkClient = null
+ }
+
+ if (blockGenerator != null) {
+ blockGenerator.stop()
+ blockGenerator = null
+ }
+
+ if (topicPartitionOffsetMap != null) {
+ topicPartitionOffsetMap.clear()
+ topicPartitionOffsetMap = null
+ }
+
+ if (blockOffsetMap != null) {
+ blockOffsetMap.clear()
+ blockOffsetMap = null
+ }
+ }
+
+ /** Store a Kafka message and the associated metadata as a tuple. */
+ private def storeMessageAndMetadata(
+ msgAndMetadata: MessageAndMetadata[K, V]): Unit = {
+ val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition)
+ val data = (msgAndMetadata.key, msgAndMetadata.message)
+ val metadata = (topicAndPartition, msgAndMetadata.offset)
+ blockGenerator.addDataWithCallback(data, metadata)
+ }
+
+ /** Update stored offset */
+ private def updateOffset(topicAndPartition: TopicAndPartition, offset: Long): Unit = {
+ topicPartitionOffsetMap.put(topicAndPartition, offset)
+ }
+
+ /**
+ * Remember the current offsets for each topic and partition. This is called when a block is
+ * generated.
+ */
+ private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
+ // Get a snapshot of current offset map and store with related block id.
+ val offsetSnapshot = topicPartitionOffsetMap.toMap
+ blockOffsetMap.put(blockId, offsetSnapshot)
+ topicPartitionOffsetMap.clear()
+ }
+
+ /** Store the ready-to-be-stored block and commit the related offsets to zookeeper. */
+ private def storeBlockAndCommitOffset(
+ blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
+ store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
+ Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
+ blockOffsetMap.remove(blockId)
+ }
+
+ /**
+ * Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's
+ * metadata schema in Zookeeper.
+ */
+ private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = {
+ if (zkClient == null) {
+ val thrown = new IllegalStateException("Zookeeper client is unexpectedly null")
+ stop("Zookeeper client is not initialized before commit offsets to ZK", thrown)
+ return
+ }
+
+ for ((topicAndPart, offset) <- offsetMap) {
+ try {
+ val topicDirs = new ZKGroupTopicDirs(groupId, topicAndPart.topic)
+ val zkPath = s"${topicDirs.consumerOffsetDir}/${topicAndPart.partition}"
+
+ ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString)
+ } catch {
+ case e: Exception =>
+ logWarning(s"Exception during commit offset $offset for topic" +
+ s"${topicAndPart.topic}, partition ${topicAndPart.partition}", e)
+ }
+
+ logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " +
+ s"partition ${topicAndPart.partition}")
+ }
+ }
+
+ /** Class to handle received Kafka message. */
+ private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable {
+ override def run(): Unit = {
+ while (!isStopped) {
+ try {
+ val streamIterator = stream.iterator()
+ while (streamIterator.hasNext) {
+ storeMessageAndMetadata(streamIterator.next)
+ }
+ } catch {
+ case e: Exception =>
+ logError("Error handling message", e)
+ }
+ }
+ }
+ }
+
+ /** Class to handle blocks generated by the block generator. */
+ private final class GeneratedBlockHandler extends BlockGeneratorListener {
+
+ def onAddData(data: Any, metadata: Any): Unit = {
+ // Update the offset of the data that was added to the generator
+ if (metadata != null) {
+ val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)]
+ updateOffset(topicAndPartition, offset)
+ }
+ }
+
+ def onGenerateBlock(blockId: StreamBlockId): Unit = {
+ // Remember the offsets of topics/partitions when a block has been generated
+ rememberBlockOffsets(blockId)
+ }
+
+ def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
+ // Store block and commit the blocks offset
+ storeBlockAndCommitOffset(blockId, arrayBuffer)
+ }
+
+ def onError(message: String, throwable: Throwable): Unit = {
+ reportError(message, throwable)
+ }
+ }
+}
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index efb0099c7c..6e1abf3f38 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -20,7 +20,10 @@ package org.apache.spark.streaming.kafka;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
+import java.util.Random;
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.Duration;
import scala.Predef;
import scala.Tuple2;
import scala.collection.JavaConverters;
@@ -32,8 +35,6 @@ import kafka.serializer.StringDecoder;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.streaming.Duration;
-import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
@@ -42,25 +43,27 @@ import org.junit.Test;
import org.junit.After;
import org.junit.Before;
-public class JavaKafkaStreamSuite extends LocalJavaStreamingContext implements Serializable {
- private transient KafkaStreamSuite testSuite = new KafkaStreamSuite();
+public class JavaKafkaStreamSuite implements Serializable {
+ private transient JavaStreamingContext ssc = null;
+ private transient Random random = new Random();
+ private transient KafkaStreamSuiteBase suiteBase = null;
@Before
- @Override
public void setUp() {
- testSuite.beforeFunction();
+ suiteBase = new KafkaStreamSuiteBase() { };
+ suiteBase.setupKafka();
System.clearProperty("spark.driver.port");
- //System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock");
- ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ SparkConf sparkConf = new SparkConf()
+ .setMaster("local[4]").setAppName(this.getClass().getSimpleName());
+ ssc = new JavaStreamingContext(sparkConf, new Duration(500));
}
@After
- @Override
public void tearDown() {
ssc.stop();
ssc = null;
System.clearProperty("spark.driver.port");
- testSuite.afterFunction();
+ suiteBase.tearDownKafka();
}
@Test
@@ -74,15 +77,15 @@ public class JavaKafkaStreamSuite extends LocalJavaStreamingContext implements S
sent.put("b", 3);
sent.put("c", 10);
- testSuite.createTopic(topic);
+ suiteBase.createTopic(topic);
HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
- testSuite.produceAndSendMessage(topic,
- JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
- Predef.<Tuple2<String, Object>>conforms()));
+ suiteBase.produceAndSendMessage(topic,
+ JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
+ Predef.<Tuple2<String, Object>>conforms()));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
- kafkaParams.put("zookeeper.connect", testSuite.zkHost() + ":" + testSuite.zkPort());
- kafkaParams.put("group.id", "test-consumer-" + KafkaTestUtils.random().nextInt(10000));
+ kafkaParams.put("zookeeper.connect", suiteBase.zkAddress());
+ kafkaParams.put("group.id", "test-consumer-" + random.nextInt(10000));
kafkaParams.put("auto.offset.reset", "smallest");
JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc,
@@ -124,11 +127,16 @@ public class JavaKafkaStreamSuite extends LocalJavaStreamingContext implements S
);
ssc.start();
- ssc.awaitTermination(3000);
-
+ long startTime = System.currentTimeMillis();
+ boolean sizeMatches = false;
+ while (!sizeMatches && System.currentTimeMillis() - startTime < 20000) {
+ sizeMatches = sent.size() == result.size();
+ Thread.sleep(200);
+ }
Assert.assertEquals(sent.size(), result.size());
for (String k : sent.keySet()) {
Assert.assertEquals(sent.get(k).intValue(), result.get(k).intValue());
}
+ ssc.stop();
}
}
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
index 6943326eb7..b19c053ebf 100644
--- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala
@@ -19,51 +19,57 @@ package org.apache.spark.streaming.kafka
import java.io.File
import java.net.InetSocketAddress
-import java.util.{Properties, Random}
+import java.util.Properties
import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.Random
import kafka.admin.CreateTopicCommand
import kafka.common.{KafkaException, TopicAndPartition}
-import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
-import kafka.utils.ZKStringSerializer
+import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import kafka.serializer.{StringDecoder, StringEncoder}
import kafka.server.{KafkaConfig, KafkaServer}
-
+import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.ZkClient
+import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer}
+import org.scalatest.{BeforeAndAfter, FunSuite}
+import org.scalatest.concurrent.Eventually
-import org.apache.zookeeper.server.ZooKeeperServer
-import org.apache.zookeeper.server.NIOServerCnxnFactory
-
-import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
+import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.util.Utils
-class KafkaStreamSuite extends TestSuiteBase {
- import KafkaTestUtils._
-
- val zkHost = "localhost"
- var zkPort: Int = 0
- val zkConnectionTimeout = 6000
- val zkSessionTimeout = 6000
-
- protected var brokerPort = 9092
- protected var brokerConf: KafkaConfig = _
- protected var zookeeper: EmbeddedZookeeper = _
- protected var zkClient: ZkClient = _
- protected var server: KafkaServer = _
- protected var producer: Producer[String, String] = _
-
- override def useManualClock = false
-
- override def beforeFunction() {
+/**
+ * This is an abstract base class for Kafka testsuites. This has the functionality to set up
+ * and tear down local Kafka servers, and to push data using Kafka producers.
+ */
+abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Logging {
+
+ var zkAddress: String = _
+ var zkClient: ZkClient = _
+
+ private val zkHost = "localhost"
+ private val zkConnectionTimeout = 6000
+ private val zkSessionTimeout = 6000
+ private var zookeeper: EmbeddedZookeeper = _
+ private var zkPort: Int = 0
+ private var brokerPort = 9092
+ private var brokerConf: KafkaConfig = _
+ private var server: KafkaServer = _
+ private var producer: Producer[String, String] = _
+
+ def setupKafka() {
// Zookeeper server startup
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
// Get the actual zookeeper binding port
zkPort = zookeeper.actualPort
+ zkAddress = s"$zkHost:$zkPort"
logInfo("==================== 0 ====================")
- zkClient = new ZkClient(s"$zkHost:$zkPort", zkSessionTimeout, zkConnectionTimeout,
+ zkClient = new ZkClient(zkAddress, zkSessionTimeout, zkConnectionTimeout,
ZKStringSerializer)
logInfo("==================== 1 ====================")
@@ -71,7 +77,7 @@ class KafkaStreamSuite extends TestSuiteBase {
var bindSuccess: Boolean = false
while(!bindSuccess) {
try {
- val brokerProps = getBrokerConfig(brokerPort, s"$zkHost:$zkPort")
+ val brokerProps = getBrokerConfig()
brokerConf = new KafkaConfig(brokerProps)
server = new KafkaServer(brokerConf)
logInfo("==================== 2 ====================")
@@ -89,53 +95,30 @@ class KafkaStreamSuite extends TestSuiteBase {
Thread.sleep(2000)
logInfo("==================== 4 ====================")
- super.beforeFunction()
}
- override def afterFunction() {
- producer.close()
- server.shutdown()
- brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
-
- zkClient.close()
- zookeeper.shutdown()
-
- super.afterFunction()
- }
-
- test("Kafka input stream") {
- val ssc = new StreamingContext(master, framework, batchDuration)
- val topic = "topic1"
- val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
- createTopic(topic)
- produceAndSendMessage(topic, sent)
+ def tearDownKafka() {
+ if (producer != null) {
+ producer.close()
+ producer = null
+ }
- val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort",
- "group.id" -> s"test-consumer-${random.nextInt(10000)}",
- "auto.offset.reset" -> "smallest")
+ if (server != null) {
+ server.shutdown()
+ server = null
+ }
- val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
- ssc,
- kafkaParams,
- Map(topic -> 1),
- StorageLevel.MEMORY_ONLY)
- val result = new mutable.HashMap[String, Long]()
- stream.map { case (k, v) => v }
- .countByValue()
- .foreachRDD { r =>
- val ret = r.collect()
- ret.toMap.foreach { kv =>
- val count = result.getOrElseUpdate(kv._1, 0) + kv._2
- result.put(kv._1, count)
- }
- }
- ssc.start()
- ssc.awaitTermination(3000)
+ brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
- assert(sent.size === result.size)
- sent.keys.foreach { k => assert(sent(k) === result(k).toInt) }
+ if (zkClient != null) {
+ zkClient.close()
+ zkClient = null
+ }
- ssc.stop()
+ if (zookeeper != null) {
+ zookeeper.shutdown()
+ zookeeper = null
+ }
}
private def createTestMessage(topic: String, sent: Map[String, Int])
@@ -150,58 +133,43 @@ class KafkaStreamSuite extends TestSuiteBase {
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
logInfo("==================== 5 ====================")
// wait until metadata is propagated
- waitUntilMetadataIsPropagated(Seq(server), topic, 0, 1000)
+ waitUntilMetadataIsPropagated(topic, 0)
}
def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
- val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
- producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
+ producer = new Producer[String, String](new ProducerConfig(getProducerConfig()))
producer.send(createTestMessage(topic, sent): _*)
+ producer.close()
logInfo("==================== 6 ====================")
}
-}
-
-object KafkaTestUtils {
- val random = new Random()
- def getBrokerConfig(port: Int, zkConnect: String): Properties = {
+ private def getBrokerConfig(): Properties = {
val props = new Properties()
props.put("broker.id", "0")
props.put("host.name", "localhost")
- props.put("port", port.toString)
+ props.put("port", brokerPort.toString)
props.put("log.dir", Utils.createTempDir().getAbsolutePath)
- props.put("zookeeper.connect", zkConnect)
+ props.put("zookeeper.connect", zkAddress)
props.put("log.flush.interval.messages", "1")
props.put("replica.socket.timeout.ms", "1500")
props
}
- def getProducerConfig(brokerList: String): Properties = {
+ private def getProducerConfig(): Properties = {
+ val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
val props = new Properties()
- props.put("metadata.broker.list", brokerList)
+ props.put("metadata.broker.list", brokerAddr)
props.put("serializer.class", classOf[StringEncoder].getName)
props
}
- def waitUntilTrue(condition: () => Boolean, waitTime: Long): Boolean = {
- val startTime = System.currentTimeMillis()
- while (true) {
- if (condition())
- return true
- if (System.currentTimeMillis() > startTime + waitTime)
- return false
- Thread.sleep(waitTime.min(100L))
+ private def waitUntilMetadataIsPropagated(topic: String, partition: Int) {
+ eventually(timeout(1000 milliseconds), interval(100 milliseconds)) {
+ assert(
+ server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, partition)),
+ s"Partition [$topic, $partition] metadata not propagated after timeout"
+ )
}
- // Should never go to here
- throw new RuntimeException("unexpected error")
- }
-
- def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int,
- timeout: Long) {
- assert(waitUntilTrue(() =>
- servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(
- TopicAndPartition(topic, partition))), timeout),
- s"Partition [$topic, $partition] metadata not propagated after timeout")
}
class EmbeddedZookeeper(val zkConnect: String) {
@@ -227,3 +195,53 @@ object KafkaTestUtils {
}
}
}
+
+
+class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
+ var ssc: StreamingContext = _
+
+ before {
+ setupKafka()
+ }
+
+ after {
+ if (ssc != null) {
+ ssc.stop()
+ ssc = null
+ }
+ tearDownKafka()
+ }
+
+ test("Kafka input stream") {
+ val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
+ ssc = new StreamingContext(sparkConf, Milliseconds(500))
+ val topic = "topic1"
+ val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
+ createTopic(topic)
+ produceAndSendMessage(topic, sent)
+
+ val kafkaParams = Map("zookeeper.connect" -> zkAddress,
+ "group.id" -> s"test-consumer-${Random.nextInt(10000)}",
+ "auto.offset.reset" -> "smallest")
+
+ val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
+ val result = new mutable.HashMap[String, Long]()
+ stream.map(_._2).countByValue().foreachRDD { r =>
+ val ret = r.collect()
+ ret.toMap.foreach { kv =>
+ val count = result.getOrElseUpdate(kv._1, 0) + kv._2
+ result.put(kv._1, count)
+ }
+ }
+ ssc.start()
+ eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
+ assert(sent.size === result.size)
+ sent.keys.foreach { k =>
+ assert(sent(k) === result(k).toInt)
+ }
+ }
+ ssc.stop()
+ }
+}
+
diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
new file mode 100644
index 0000000000..64ccc92c81
--- /dev/null
+++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+
+import java.io.File
+
+import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.language.postfixOps
+import scala.util.Random
+
+import com.google.common.io.Files
+import kafka.serializer.StringDecoder
+import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
+import org.apache.commons.io.FileUtils
+import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+
+class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
+
+ val sparkConf = new SparkConf()
+ .setMaster("local[4]")
+ .setAppName(this.getClass.getSimpleName)
+ .set("spark.streaming.receiver.writeAheadLog.enable", "true")
+ val data = Map("a" -> 10, "b" -> 10, "c" -> 10)
+
+
+ var groupId: String = _
+ var kafkaParams: Map[String, String] = _
+ var ssc: StreamingContext = _
+ var tempDirectory: File = null
+
+ before {
+ setupKafka()
+ groupId = s"test-consumer-${Random.nextInt(10000)}"
+ kafkaParams = Map(
+ "zookeeper.connect" -> zkAddress,
+ "group.id" -> groupId,
+ "auto.offset.reset" -> "smallest"
+ )
+
+ ssc = new StreamingContext(sparkConf, Milliseconds(500))
+ tempDirectory = Files.createTempDir()
+ ssc.checkpoint(tempDirectory.getAbsolutePath)
+ }
+
+ after {
+ if (ssc != null) {
+ ssc.stop()
+ }
+ if (tempDirectory != null && tempDirectory.exists()) {
+ FileUtils.deleteDirectory(tempDirectory)
+ tempDirectory = null
+ }
+ tearDownKafka()
+ }
+
+
+ test("Reliable Kafka input stream with single topic") {
+ var topic = "test-topic"
+ createTopic(topic)
+ produceAndSendMessage(topic, data)
+
+ // Verify whether the offset of this group/topic/partition is 0 before starting.
+ assert(getCommitOffset(groupId, topic, 0) === None)
+
+ val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
+ val result = new mutable.HashMap[String, Long]()
+ stream.map { case (k, v) => v }.foreachRDD { r =>
+ val ret = r.collect()
+ ret.foreach { v =>
+ val count = result.getOrElseUpdate(v, 0) + 1
+ result.put(v, count)
+ }
+ }
+ ssc.start()
+ eventually(timeout(20000 milliseconds), interval(200 milliseconds)) {
+ // A basic process verification for ReliableKafkaReceiver.
+ // Verify whether received message number is equal to the sent message number.
+ assert(data.size === result.size)
+ // Verify whether each message is the same as the data to be verified.
+ data.keys.foreach { k => assert(data(k) === result(k).toInt) }
+ // Verify the offset number whether it is equal to the total message number.
+ assert(getCommitOffset(groupId, topic, 0) === Some(29L))
+ }
+ ssc.stop()
+ }
+
+ test("Reliable Kafka input stream with multiple topics") {
+ val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
+ topics.foreach { case (t, _) =>
+ createTopic(t)
+ produceAndSendMessage(t, data)
+ }
+
+ // Before started, verify all the group/topic/partition offsets are 0.
+ topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === None) }
+
+ // Consuming all the data sent to the broker which will potential commit the offsets internally.
+ val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
+ ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY)
+ stream.foreachRDD(_ => Unit)
+ ssc.start()
+ eventually(timeout(20000 milliseconds), interval(100 milliseconds)) {
+ // Verify the offset for each group/topic to see whether they are equal to the expected one.
+ topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === Some(29L)) }
+ }
+ ssc.stop()
+ }
+
+
+ /** Getting partition offset from Zookeeper. */
+ private def getCommitOffset(groupId: String, topic: String, partition: Int): Option[Long] = {
+ assert(zkClient != null, "Zookeeper client is not initialized")
+ val topicDirs = new ZKGroupTopicDirs(groupId, topic)
+ val zkPath = s"${topicDirs.consumerOffsetDir}/$partition"
+ ZkUtils.readDataMaybeNull(zkClient, zkPath)._1.map(_.toLong)
+ }
+}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index a94d09be3b..8a2a865867 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -85,6 +85,10 @@ object MimaExcludes {
"org.apache.hadoop.mapred.SparkHadoopMapRedUtil"),
ProblemFilters.exclude[MissingTypesProblem](
"org.apache.spark.rdd.PairRDDFunctions")
+ ) ++ Seq(
+ // SPARK-4062
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.streaming.kafka.KafkaReceiver#MessageHandler.this")
)
case v if v.startsWith("1.1") =>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
index 0316b6862f..55765dc906 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala
@@ -27,9 +27,38 @@ import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
/** Listener object for BlockGenerator events */
private[streaming] trait BlockGeneratorListener {
- /** Called when a new block needs to be pushed */
+ /**
+ * Called after a data item is added into the BlockGenerator. The data addition and this
+ * callback are synchronized with the block generation and its associated callback,
+ * so block generation waits for the active data addition+callback to complete. This is useful
+ * for updating metadata on successful buffering of a data item, specifically that metadata
+ * that will be useful when a block is generated. Any long blocking operation in this callback
+ * will hurt the throughput.
+ */
+ def onAddData(data: Any, metadata: Any)
+
+ /**
+ * Called when a new block of data is generated by the block generator. The block generation
+ * and this callback are synchronized with the data addition and its associated callback, so
+ * the data addition waits for the block generation+callback to complete. This is useful
+ * for updating metadata when a block has been generated, specifically metadata that will
+ * be useful when the block has been successfully stored. Any long blocking operation in this
+ * callback will hurt the throughput.
+ */
+ def onGenerateBlock(blockId: StreamBlockId)
+
+ /**
+ * Called when a new block is ready to be pushed. Callers are supposed to store the block into
+ * Spark in this method. Internally this is called from a single
+ * thread, that is not synchronized with any other callbacks. Hence it is okay to do long
+ * blocking operation in this callback.
+ */
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_])
- /** Called when an error has occurred in BlockGenerator */
+
+ /**
+ * Called when an error has occurred in the BlockGenerator. Can be called form many places
+ * so better to not do any long block operation in this callback.
+ */
def onError(message: String, throwable: Throwable)
}
@@ -80,9 +109,20 @@ private[streaming] class BlockGenerator(
* Push a single data item into the buffer. All received data items
* will be periodically pushed into BlockManager.
*/
- def += (data: Any): Unit = synchronized {
+ def addData (data: Any): Unit = synchronized {
+ waitToPush()
+ currentBuffer += data
+ }
+
+ /**
+ * Push a single data item into the buffer. After buffering the data, the
+ * `BlockGeneratorListnere.onAddData` callback will be called. All received data items
+ * will be periodically pushed into BlockManager.
+ */
+ def addDataWithCallback(data: Any, metadata: Any) = synchronized {
waitToPush()
currentBuffer += data
+ listener.onAddData(data, metadata)
}
/** Change the buffer to which single records are added to. */
@@ -93,14 +133,15 @@ private[streaming] class BlockGenerator(
if (newBlockBuffer.size > 0) {
val blockId = StreamBlockId(receiverId, time - blockInterval)
val newBlock = new Block(blockId, newBlockBuffer)
+ listener.onGenerateBlock(blockId)
blocksForPushing.put(newBlock) // put is blocking when queue is full
logDebug("Last element in " + blockId + " is " + newBlockBuffer.last)
}
} catch {
case ie: InterruptedException =>
logInfo("Block updating timer thread was interrupted")
- case t: Throwable =>
- reportError("Error in block updating thread", t)
+ case e: Exception =>
+ reportError("Error in block updating thread", e)
}
}
@@ -126,8 +167,8 @@ private[streaming] class BlockGenerator(
} catch {
case ie: InterruptedException =>
logInfo("Block pushing thread was interrupted")
- case t: Throwable =>
- reportError("Error in block pushing thread", t)
+ case e: Exception =>
+ reportError("Error in block pushing thread", e)
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 5360412330..3b1233e86c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -27,10 +27,10 @@ import akka.actor.{Actor, Props}
import akka.pattern.ask
import com.google.common.base.Throwables
import org.apache.hadoop.conf.Configuration
+
import org.apache.spark.{Logging, SparkEnv, SparkException}
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.scheduler._
-import org.apache.spark.streaming.util.WriteAheadLogFileSegment
import org.apache.spark.util.{AkkaUtils, Utils}
/**
@@ -99,6 +99,10 @@ private[streaming] class ReceiverSupervisorImpl(
/** Divides received data records into data blocks for pushing in BlockManager. */
private val blockGenerator = new BlockGenerator(new BlockGeneratorListener {
+ def onAddData(data: Any, metadata: Any): Unit = { }
+
+ def onGenerateBlock(blockId: StreamBlockId): Unit = { }
+
def onError(message: String, throwable: Throwable) {
reportError(message, throwable)
}
@@ -110,7 +114,7 @@ private[streaming] class ReceiverSupervisorImpl(
/** Push a single record of received data into block generator. */
def pushSingle(data: Any) {
- blockGenerator += (data)
+ blockGenerator.addData(data)
}
/** Store an ArrayBuffer of received data as a data block into Spark's memory. */
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
index 0f6a9489db..e26c0c6859 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -138,7 +138,7 @@ class ReceiverSuite extends FunSuite with Timeouts {
blockGenerator.start()
var count = 0
while(System.currentTimeMillis - startTime < waitTime) {
- blockGenerator += count
+ blockGenerator.addData(count)
generatedData += count
count += 1
Thread.sleep(10)
@@ -168,7 +168,7 @@ class ReceiverSuite extends FunSuite with Timeouts {
blockGenerator.start()
var count = 0
while(System.currentTimeMillis - startTime < waitTime) {
- blockGenerator += count
+ blockGenerator.addData(count)
generatedData += count
count += 1
Thread.sleep(1)
@@ -299,6 +299,10 @@ class ReceiverSuite extends FunSuite with Timeouts {
val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]]
val errors = new ArrayBuffer[Throwable]
+ def onAddData(data: Any, metadata: Any) { }
+
+ def onGenerateBlock(blockId: StreamBlockId) { }
+
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
val bufferOfInts = arrayBuffer.map(_.asInstanceOf[Int])
arrayBuffers += bufferOfInts