aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2013-10-12 15:02:57 +0800
committerjerryshao <saisai.shao@intel.com>2013-10-12 20:00:42 +0800
commitc23cd72b4bbcbf5f615636095c69e9a2e39bfbdd (patch)
tree14a083addc11bef9eeac371c8f383dc0e1c439d5 /streaming/src
parentdca80094d317363e1e0d7e32bc7dfd99faf943cf (diff)
downloadspark-c23cd72b4bbcbf5f615636095c69e9a2e39bfbdd.tar.gz
spark-c23cd72b4bbcbf5f615636095c69e9a2e39bfbdd.tar.bz2
spark-c23cd72b4bbcbf5f615636095c69e9a2e39bfbdd.zip
Upgrade Kafka 0.7.2 to Kafka 0.8.0-beta1 for Spark Streaming
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala20
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala33
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala61
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java16
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala8
5 files changed, 88 insertions, 50 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 878725c705..dc60046805 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -252,10 +252,14 @@ class StreamingContext private (
groupId: String,
topics: Map[String, Int],
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2
- ): DStream[String] = {
+ ): DStream[(String, String)] = {
val kafkaParams = Map[String, String](
- "zk.connect" -> zkQuorum, "groupid" -> groupId, "zk.connectiontimeout.ms" -> "10000")
- kafkaStream[String, kafka.serializer.StringDecoder](kafkaParams, topics, storageLevel)
+ "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
+ "zookeeper.connection.timeout.ms" -> "10000")
+ kafkaStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](
+ kafkaParams,
+ topics,
+ storageLevel)
}
/**
@@ -266,12 +270,16 @@ class StreamingContext private (
* in its own thread.
* @param storageLevel Storage level to use for storing the received objects
*/
- def kafkaStream[T: ClassManifest, D <: kafka.serializer.Decoder[_]: Manifest](
+ def kafkaStream[
+ K: ClassManifest,
+ V: ClassManifest,
+ U <: kafka.serializer.Decoder[_]: Manifest,
+ T <: kafka.serializer.Decoder[_]: Manifest](
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
- ): DStream[T] = {
- val inputStream = new KafkaInputDStream[T, D](this, kafkaParams, topics, storageLevel)
+ ): DStream[(K, V)] = {
+ val inputStream = new KafkaInputDStream[K, V, U, T](this, kafkaParams, topics, storageLevel)
registerInputStream(inputStream)
inputStream
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 54ba3e6025..6423b916b0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -141,7 +141,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
zkQuorum: String,
groupId: String,
topics: JMap[String, JInt])
- : JavaDStream[String] = {
+ : JavaPairDStream[String, String] = {
implicit val cmt: ClassManifest[String] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]]
ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
@@ -162,7 +162,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
groupId: String,
topics: JMap[String, JInt],
storageLevel: StorageLevel)
- : JavaDStream[String] = {
+ : JavaPairDStream[String, String] = {
implicit val cmt: ClassManifest[String] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]]
ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
@@ -171,25 +171,34 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Create an input stream that pulls messages form a Kafka Broker.
- * @param typeClass Type of RDD
- * @param decoderClass Type of kafka decoder
+ * @param keyTypeClass Key type of RDD
+ * @param valueTypeClass value type of RDD
+ * @param keyDecoderClass Type of kafka key decoder
+ * @param valueDecoderClass Type of kafka value decoder
* @param kafkaParams Map of kafka configuration paramaters.
* See: http://kafka.apache.org/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel RDD storage level. Defaults to memory-only
*/
- def kafkaStream[T, D <: kafka.serializer.Decoder[_]](
- typeClass: Class[T],
- decoderClass: Class[D],
+ def kafkaStream[K, V, U <: kafka.serializer.Decoder[_], T <: kafka.serializer.Decoder[_]](
+ keyTypeClass: Class[K],
+ valueTypeClass: Class[V],
+ keyDecoderClass: Class[U],
+ valueDecoderClass: Class[T],
kafkaParams: JMap[String, String],
topics: JMap[String, JInt],
storageLevel: StorageLevel)
- : JavaDStream[T] = {
- implicit val cmt: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- implicit val cmd: Manifest[D] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[D]]
- ssc.kafkaStream[T, D](
+ : JavaPairDStream[K, V] = {
+ implicit val keyCmt: ClassManifest[K] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
+ implicit val valueCmt: ClassManifest[V] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+
+ implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
+ implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
+
+ ssc.kafkaStream[K, V, U, T](
kafkaParams.toMap,
Map(topics.mapValues(_.intValue()).toSeq: _*),
storageLevel)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
index 51e913675d..a5de5e1fb5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
@@ -19,22 +19,18 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Time, DStreamCheckpointData, StreamingContext}
+import org.apache.spark.streaming.StreamingContext
import java.util.Properties
import java.util.concurrent.Executors
import kafka.consumer._
-import kafka.message.{Message, MessageSet, MessageAndMetadata}
import kafka.serializer.Decoder
-import kafka.utils.{Utils, ZKGroupTopicDirs}
-import kafka.utils.ZkUtils._
+import kafka.utils.VerifiableProperties
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient._
import scala.collection.Map
-import scala.collection.mutable.HashMap
-import scala.collection.JavaConversions._
/**
@@ -46,25 +42,32 @@ import scala.collection.JavaConversions._
* @param storageLevel RDD storage level.
*/
private[streaming]
-class KafkaInputDStream[T: ClassManifest, D <: Decoder[_]: Manifest](
+class KafkaInputDStream[
+ K: ClassManifest,
+ V: ClassManifest,
+ U <: Decoder[_]: Manifest,
+ T <: Decoder[_]: Manifest](
@transient ssc_ : StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
- ) extends NetworkInputDStream[T](ssc_ ) with Logging {
+ ) extends NetworkInputDStream[(K, V)](ssc_) with Logging {
-
- def getReceiver(): NetworkReceiver[T] = {
- new KafkaReceiver[T, D](kafkaParams, topics, storageLevel)
- .asInstanceOf[NetworkReceiver[T]]
+ def getReceiver(): NetworkReceiver[(K, V)] = {
+ new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
+ .asInstanceOf[NetworkReceiver[(K, V)]]
}
}
private[streaming]
-class KafkaReceiver[T: ClassManifest, D <: Decoder[_]: Manifest](
- kafkaParams: Map[String, String],
- topics: Map[String, Int],
- storageLevel: StorageLevel
+class KafkaReceiver[
+ K: ClassManifest,
+ V: ClassManifest,
+ U <: Decoder[_]: Manifest,
+ T <: Decoder[_]: Manifest](
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ storageLevel: StorageLevel
) extends NetworkReceiver[Any] {
// Handles pushing data into the BlockManager
@@ -83,27 +86,34 @@ class KafkaReceiver[T: ClassManifest, D <: Decoder[_]: Manifest](
// In case we are using multiple Threads to handle Kafka Messages
val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
- logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("groupid"))
+ logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))
// Kafka connection properties
val props = new Properties()
kafkaParams.foreach(param => props.put(param._1, param._2))
// Create the connection to the cluster
- logInfo("Connecting to Zookeper: " + kafkaParams("zk.connect"))
+ logInfo("Connecting to Zookeper: " + kafkaParams("zookeeper.connect"))
val consumerConfig = new ConsumerConfig(props)
consumerConnector = Consumer.create(consumerConfig)
- logInfo("Connected to " + kafkaParams("zk.connect"))
+ logInfo("Connected to " + kafkaParams("zookeeper.connect"))
// When autooffset.reset is defined, it is our responsibility to try and whack the
// consumer group zk node.
- if (kafkaParams.contains("autooffset.reset")) {
- tryZookeeperConsumerGroupCleanup(kafkaParams("zk.connect"), kafkaParams("groupid"))
+ if (kafkaParams.contains("auto.offset.reset")) {
+ tryZookeeperConsumerGroupCleanup(kafkaParams("zookeeper.connect"), kafkaParams("group.id"))
}
// Create Threads for each Topic/Message Stream we are listening
- val decoder = manifest[D].erasure.newInstance.asInstanceOf[Decoder[T]]
- val topicMessageStreams = consumerConnector.createMessageStreams(topics, decoder)
+ val keyDecoder = manifest[U].erasure.getConstructor(classOf[VerifiableProperties])
+ .newInstance(consumerConfig.props)
+ .asInstanceOf[Decoder[K]]
+ val valueDecoder = manifest[T].erasure.getConstructor(classOf[VerifiableProperties])
+ .newInstance(consumerConfig.props)
+ .asInstanceOf[Decoder[V]]
+
+ val topicMessageStreams = consumerConnector.createMessageStreams(
+ topics, keyDecoder, valueDecoder)
// Start the messages handler for each partition
topicMessageStreams.values.foreach { streams =>
@@ -112,11 +122,12 @@ class KafkaReceiver[T: ClassManifest, D <: Decoder[_]: Manifest](
}
// Handles Kafka Messages
- private class MessageHandler[T: ClassManifest](stream: KafkaStream[T]) extends Runnable {
+ private class MessageHandler[K: ClassManifest, V: ClassManifest](stream: KafkaStream[K, V])
+ extends Runnable {
def run() {
logInfo("Starting MessageHandler.")
for (msgAndMetadata <- stream) {
- blockGenerator += msgAndMetadata.message
+ blockGenerator += (msgAndMetadata.key, msgAndMetadata.message)
}
}
}
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index c0d729ff87..dc01f1e8aa 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -1220,14 +1220,20 @@ public class JavaAPISuite implements Serializable {
@Test
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();
- JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics);
- JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics,
+ JavaPairDStream<String, String> test1 = ssc.kafkaStream("localhost:12345", "group", topics);
+ JavaPairDStream<String, String> test2 = ssc.kafkaStream("localhost:12345", "group", topics,
StorageLevel.MEMORY_AND_DISK());
HashMap<String, String> kafkaParams = Maps.newHashMap();
- kafkaParams.put("zk.connect","localhost:12345");
- kafkaParams.put("groupid","consumer-group");
- JavaDStream test3 = ssc.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics,
+ kafkaParams.put("zookeeper.connect","localhost:12345");
+ kafkaParams.put("group.id","consumer-group");
+ JavaPairDStream<String, String> test3 = ssc.kafkaStream(
+ String.class,
+ String.class,
+ StringDecoder.class,
+ StringDecoder.class,
+ kafkaParams,
+ topics,
StorageLevel.MEMORY_AND_DISK());
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 42e3e51e3f..c29b75ece6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -268,8 +268,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK)
// Test specifying decoder
- val kafkaParams = Map("zk.connect"->"localhost:12345","groupid"->"consumer-group")
- val test3 = ssc.kafkaStream[String, kafka.serializer.StringDecoder](kafkaParams, topics, StorageLevel.MEMORY_AND_DISK)
+ val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
+ val test3 = ssc.kafkaStream[
+ String,
+ String,
+ kafka.serializer.StringDecoder,
+ kafka.serializer.StringDecoder](kafkaParams, topics, StorageLevel.MEMORY_AND_DISK)
}
}