aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorseanm <sean.mcnamara@webtrends.com>2013-03-24 13:40:19 -0600
committerseanm <sean.mcnamara@webtrends.com>2013-04-16 17:17:16 -0600
commit7e56e99573b4cf161293e648aeb159375c9c0fcb (patch)
tree9fa6d9f686cb39b3bb9a97196818079a71a38b31 /streaming
parent8ac9efba5a435443be9abf8ebbe867806d42c9db (diff)
downloadspark-7e56e99573b4cf161293e648aeb159375c9c0fcb.tar.gz
spark-7e56e99573b4cf161293e648aeb159375c9c0fcb.tar.bz2
spark-7e56e99573b4cf161293e648aeb159375c9c0fcb.zip
Surfacing decoders on KafkaInputDStream
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala11
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala33
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala17
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java6
4 files changed, 43 insertions, 24 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index bb7f216ca7..2c6326943d 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path
import java.util.UUID
import twitter4j.Status
+
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
* information (such as, cluster URL and job name) to internally create a SparkContext, it provides
@@ -207,14 +208,14 @@ class StreamingContext private (
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
- def kafkaStream[T: ClassManifest](
+ def kafkaStream(
zkQuorum: String,
groupId: String,
topics: Map[String, Int],
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2
- ): DStream[T] = {
+ ): DStream[String] = {
val kafkaParams = Map[String, String]("zk.connect" -> zkQuorum, "groupid" -> groupId, "zk.connectiontimeout.ms" -> "10000");
- kafkaStream[T](kafkaParams, topics, storageLevel)
+ kafkaStream[String, kafka.serializer.StringDecoder](kafkaParams, topics, storageLevel)
}
/**
@@ -224,12 +225,12 @@ class StreamingContext private (
* in its own thread.
* @param storageLevel Storage level to use for storing the received objects
*/
- def kafkaStream[T: ClassManifest](
+ def kafkaStream[T: ClassManifest, D <: kafka.serializer.Decoder[_]: Manifest](
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
): DStream[T] = {
- val inputStream = new KafkaInputDStream[T](this, kafkaParams, topics, storageLevel)
+ val inputStream = new KafkaInputDStream[T, D](this, kafkaParams, topics, storageLevel)
registerInputStream(inputStream)
inputStream
}
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index 7a8864614c..13427873ff 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -68,33 +68,50 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
*/
- def kafkaStream[T](
+ def kafkaStream(
zkQuorum: String,
groupId: String,
topics: JMap[String, JInt])
- : JavaDStream[T] = {
- implicit val cmt: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- ssc.kafkaStream[T](zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
+ : JavaDStream[String] = {
+ implicit val cmt: ClassManifest[String] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]]
+ ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), StorageLevel.MEMORY_ONLY_SER_2)
}
/**
* Create an input stream that pulls messages form a Kafka Broker.
- * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
* @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * @param storageLevel RDD storage level. Defaults to memory-only
+ * in its own thread.
+ */
+ def kafkaStream(
+ zkQuorum: String,
+ groupId: String,
+ topics: JMap[String, JInt],
+ storageLevel: StorageLevel)
+ : JavaDStream[String] = {
+ implicit val cmt: ClassManifest[String] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]]
+ ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
+ }
+
+ /**
+ * Create an input stream that pulls messages form a Kafka Broker.
+ * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel RDD storage level. Defaults to memory-only
*/
- def kafkaStream[T](
+ def kafkaStream[T, D <: kafka.serializer.Decoder[_]: Manifest](
kafkaParams: JMap[String, String],
topics: JMap[String, JInt],
storageLevel: StorageLevel)
: JavaDStream[T] = {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- ssc.kafkaStream[T](
+ ssc.kafkaStream[T, D](
kafkaParams.toMap,
Map(topics.mapValues(_.intValue()).toSeq: _*),
storageLevel)
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index 17a5be3420..7bd53fb6dd 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -9,7 +9,7 @@ import java.util.concurrent.Executors
import kafka.consumer._
import kafka.message.{Message, MessageSet, MessageAndMetadata}
-import kafka.serializer.StringDecoder
+import kafka.serializer.Decoder
import kafka.utils.{Utils, ZKGroupTopicDirs}
import kafka.utils.ZkUtils._
import kafka.utils.ZKStringSerializer
@@ -28,7 +28,7 @@ import scala.collection.JavaConversions._
* @param storageLevel RDD storage level.
*/
private[streaming]
-class KafkaInputDStream[T: ClassManifest](
+class KafkaInputDStream[T: ClassManifest, D <: Decoder[_]: Manifest](
@transient ssc_ : StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
@@ -37,15 +37,17 @@ class KafkaInputDStream[T: ClassManifest](
def getReceiver(): NetworkReceiver[T] = {
- new KafkaReceiver(kafkaParams, topics, storageLevel)
+ new KafkaReceiver[T, D](kafkaParams, topics, storageLevel)
.asInstanceOf[NetworkReceiver[T]]
}
}
private[streaming]
-class KafkaReceiver(kafkaParams: Map[String, String],
+class KafkaReceiver[T: ClassManifest, D <: Decoder[_]: Manifest](
+ kafkaParams: Map[String, String],
topics: Map[String, Int],
- storageLevel: StorageLevel) extends NetworkReceiver[Any] {
+ storageLevel: StorageLevel
+ ) extends NetworkReceiver[Any] {
// Handles pushing data into the BlockManager
lazy protected val blockGenerator = new BlockGenerator(storageLevel)
@@ -82,7 +84,8 @@ class KafkaReceiver(kafkaParams: Map[String, String],
}
// Create Threads for each Topic/Message Stream we are listening
- val topicMessageStreams = consumerConnector.createMessageStreams(topics, new StringDecoder())
+ val decoder = manifest[D].erasure.newInstance.asInstanceOf[Decoder[T]]
+ val topicMessageStreams = consumerConnector.createMessageStreams(topics, decoder)
// Start the messages handler for each partition
topicMessageStreams.values.foreach { streams =>
@@ -91,7 +94,7 @@ class KafkaReceiver(kafkaParams: Map[String, String],
}
// Handles Kafka Messages
- private class MessageHandler(stream: KafkaStream[String]) extends Runnable {
+ private class MessageHandler[T: ClassManifest](stream: KafkaStream[T]) extends Runnable {
def run() {
logInfo("Starting MessageHandler.")
for (msgAndMetadata <- stream) {
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 3bed500f73..61e4c0a207 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -23,7 +23,6 @@ import spark.streaming.api.java.JavaPairDStream;
import spark.streaming.api.java.JavaStreamingContext;
import spark.streaming.JavaTestUtils;
import spark.streaming.JavaCheckpointTestUtils;
-import spark.streaming.dstream.KafkaPartitionKey;
import spark.streaming.InputStreamsSuite;
import java.io.*;
@@ -1203,10 +1202,9 @@ public class JavaAPISuite implements Serializable {
@Test
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();
- HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap();
JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics);
- JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, offsets);
- JavaDStream test3 = ssc.kafkaStream("localhost:12345", "group", topics, offsets,
+ JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics);
+ JavaDStream test3 = ssc.kafkaStream("localhost:12345", "group", topics,
StorageLevel.MEMORY_AND_DISK());
}