aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/pom.xml28
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java98
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala28
-rw-r--r--project/SparkBuild.scala9
-rw-r--r--streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jarbin1358063 -> 0 bytes
-rw-r--r--streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md51
-rw-r--r--streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.sha11
-rw-r--r--streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom9
-rw-r--r--streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md51
-rw-r--r--streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.sha11
-rw-r--r--streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml12
-rw-r--r--streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.md51
-rw-r--r--streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.sha11
-rw-r--r--streaming/pom.xml32
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala20
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala33
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala61
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java16
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala8
19 files changed, 250 insertions, 110 deletions
diff --git a/examples/pom.xml b/examples/pom.xml
index b8c020a321..b97e6af288 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -32,13 +32,20 @@
<url>http://spark.incubator.apache.org/</url>
<repositories>
- <!-- A repository in the local filesystem for the Kafka JAR, which we modified for Scala 2.9 -->
<repository>
- <id>lib</id>
- <url>file://${project.basedir}/lib</url>
+ <id>apache-repo</id>
+ <name>Apache Repository</name>
+ <url>https://repository.apache.org/content/repositories/releases</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
</repository>
</repositories>
+
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
@@ -81,9 +88,18 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka</artifactId>
- <version>0.7.2-spark</version> <!-- Comes from our in-project repository -->
- <scope>provided</scope>
+ <artifactId>kafka_2.9.2</artifactId>
+ <version>0.8.0-beta1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
new file mode 100644
index 0000000000..9a8e4209ed
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.examples;
+
+import java.util.Map;
+import java.util.HashMap;
+
+import com.google.common.collect.Lists;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import scala.Tuple2;
+
+/**
+ * Consumes messages from one or more topics in Kafka and does wordcount.
+ * Usage: JavaKafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <zkQuorum> is a list of one or more zookeeper servers that make quorum
+ * <group> is the name of kafka consumer group
+ * <topics> is a list of one or more kafka topics to consume from
+ * <numThreads> is the number of threads the kafka consumer should use
+ *
+ * Example:
+ * `./run-example org.apache.spark.streaming.examples.JavaKafkaWordCount local[2] zoo01,zoo02,
+ * zoo03 my-consumer-group topic1,topic2 1`
+ */
+
+public class JavaKafkaWordCount {
+ public static void main(String[] args) {
+ if (args.length < 5) {
+ System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>");
+ System.exit(1);
+ }
+
+ // Create the context with a 1 second batch size
+ JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount",
+ new Duration(2000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+
+ int numThreads = Integer.parseInt(args[4]);
+ Map<String, Integer> topicMap = new HashMap<String, Integer>();
+ String[] topics = args[3].split(",");
+ for (String topic: topics) {
+ topicMap.put(topic, numThreads);
+ }
+
+ JavaPairDStream<String, String> messages = ssc.kafkaStream(args[1], args[2], topicMap);
+
+ JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
+ @Override
+ public String call(Tuple2<String, String> tuple2) throws Exception {
+ return tuple2._2();
+ }
+ });
+
+ JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+ @Override
+ public Iterable<String> call(String x) {
+ return Lists.newArrayList(x.split(" "));
+ }
+ });
+
+ JavaPairDStream<String, Integer> wordCounts = words.map(
+ new PairFunction<String, String, Integer>() {
+ @Override
+ public Tuple2<String, Integer> call(String s) throws Exception {
+ return new Tuple2<String, Integer>(s, 1);
+ }
+ }).reduceByKey(new Function2<Integer, Integer, Integer>() {
+ @Override
+ public Integer call(Integer i1, Integer i2) throws Exception {
+ return i1 + i2;
+ }
+ });
+
+ wordCounts.print();
+ ssc.start();
+ }
+}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
index 12f939d5a7..570ba4c81a 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
@@ -18,13 +18,11 @@
package org.apache.spark.streaming.examples
import java.util.Properties
-import kafka.message.Message
-import kafka.producer.SyncProducerConfig
+
import kafka.producer._
-import org.apache.spark.SparkContext
+
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.util.RawTextHelper._
/**
@@ -54,9 +52,10 @@ object KafkaWordCount {
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
- val lines = ssc.kafkaStream(zkQuorum, group, topicpMap)
+ val lines = ssc.kafkaStream(zkQuorum, group, topicpMap).map(_._2)
val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
+ val wordCounts = words.map(x => (x, 1l))
+ .reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
@@ -68,15 +67,16 @@ object KafkaWordCountProducer {
def main(args: Array[String]) {
if (args.length < 2) {
- System.err.println("Usage: KafkaWordCountProducer <zkQuorum> <topic> <messagesPerSec> <wordsPerMessage>")
+ System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
+ "<messagesPerSec> <wordsPerMessage>")
System.exit(1)
}
- val Array(zkQuorum, topic, messagesPerSec, wordsPerMessage) = args
+ val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
// Zookeper connection properties
val props = new Properties()
- props.put("zk.connect", zkQuorum)
+ props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
val config = new ProducerConfig(props)
@@ -85,11 +85,13 @@ object KafkaWordCountProducer {
// Send some messages
while(true) {
val messages = (1 to messagesPerSec.toInt).map { messageNum =>
- (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ")
+ val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
+ .mkString(" ")
+
+ new KeyedMessage[String, String](topic, str)
}.toArray
- println(messages.mkString(","))
- val data = new ProducerData[String, String](topic, messages)
- producer.send(data)
+
+ producer.send(messages: _*)
Thread.sleep(100)
}
}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 973f1e2f11..b14970942b 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -273,13 +273,16 @@ object SparkBuild extends Build {
def streamingSettings = sharedSettings ++ Seq(
name := "spark-streaming",
resolvers ++= Seq(
- "Akka Repository" at "http://repo.akka.io/releases/"
+ "Akka Repository" at "http://repo.akka.io/releases/",
+ "Apache repo" at "https://repository.apache.org/content/repositories/releases"
),
libraryDependencies ++= Seq(
"org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty, excludeSnappy),
- "com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty),
"org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty),
- "com.typesafe.akka" % "akka-zeromq" % "2.0.5" excludeAll(excludeNetty)
+ "com.typesafe.akka" % "akka-zeromq" % "2.0.5" excludeAll(excludeNetty),
+ "org.apache.kafka" % "kafka_2.9.2" % "0.8.0-beta1"
+ exclude("com.sun.jdmk", "jmxtools")
+ exclude("com.sun.jmx", "jmxri")
)
)
diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar
deleted file mode 100644
index 65f79925a4..0000000000
--- a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar
+++ /dev/null
Binary files differ
diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md5 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md5
deleted file mode 100644
index 29f45f4adb..0000000000
--- a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.md5
+++ /dev/null
@@ -1 +0,0 @@
-18876b8bc2e4cef28b6d191aa49d963f \ No newline at end of file
diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.sha1 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.sha1
deleted file mode 100644
index e3bd62bac0..0000000000
--- a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.jar.sha1
+++ /dev/null
@@ -1 +0,0 @@
-06b27270ffa52250a2c08703b397c99127b72060 \ No newline at end of file
diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom
deleted file mode 100644
index 082d35726a..0000000000
--- a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom
+++ /dev/null
@@ -1,9 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
- <modelVersion>4.0.0</modelVersion>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka</artifactId>
- <version>0.7.2-spark</version>
- <description>POM was created from install:install-file</description>
-</project>
diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md5 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md5
deleted file mode 100644
index 92c4132b5b..0000000000
--- a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.md5
+++ /dev/null
@@ -1 +0,0 @@
-7bc4322266e6032bdf9ef6eebdd8097d \ No newline at end of file
diff --git a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.sha1 b/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.sha1
deleted file mode 100644
index 8a1d8a097a..0000000000
--- a/streaming/lib/org/apache/kafka/kafka/0.7.2-spark/kafka-0.7.2-spark.pom.sha1
+++ /dev/null
@@ -1 +0,0 @@
-d0f79e8eff0db43ca7bcf7dce2c8cd2972685c9d \ No newline at end of file
diff --git a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml
deleted file mode 100644
index 720cd51c2f..0000000000
--- a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml
+++ /dev/null
@@ -1,12 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<metadata>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka</artifactId>
- <versioning>
- <release>0.7.2-spark</release>
- <versions>
- <version>0.7.2-spark</version>
- </versions>
- <lastUpdated>20130121015225</lastUpdated>
- </versioning>
-</metadata>
diff --git a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.md5 b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.md5
deleted file mode 100644
index a4ce5dc9e8..0000000000
--- a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.md5
+++ /dev/null
@@ -1 +0,0 @@
-e2b9c7c5f6370dd1d21a0aae5e8dcd77 \ No newline at end of file
diff --git a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.sha1 b/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.sha1
deleted file mode 100644
index b869eaf2a6..0000000000
--- a/streaming/lib/org/apache/kafka/kafka/maven-metadata-local.xml.sha1
+++ /dev/null
@@ -1 +0,0 @@
-2a4341da936b6c07a09383d17ffb185ac558ee91 \ No newline at end of file
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 3b25fb49fb..14c043175d 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -32,10 +32,16 @@
<url>http://spark.incubator.apache.org/</url>
<repositories>
- <!-- A repository in the local filesystem for the Kafka JAR, which we modified for Scala 2.9 -->
<repository>
- <id>lib</id>
- <url>file://${project.basedir}/lib</url>
+ <id>apache-repo</id>
+ <name>Apache Repository</name>
+ <url>https://repository.apache.org/content/repositories/releases</url>
+ <releases>
+ <enabled>true</enabled>
+ </releases>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
</repository>
</repositories>
@@ -56,9 +62,18 @@
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
- <artifactId>kafka</artifactId>
- <version>0.7.2-spark</version> <!-- Comes from our in-project repository -->
- <scope>provided</scope>
+ <artifactId>kafka_2.9.2</artifactId>
+ <version>0.8.0-beta1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.sun.jmx</groupId>
+ <artifactId>jmxri</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.sun.jdmk</groupId>
+ <artifactId>jmxtools</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
@@ -72,11 +87,6 @@
</exclusions>
</dependency>
<dependency>
- <groupId>com.github.sgroschupf</groupId>
- <artifactId>zkclient</artifactId>
- <version>0.1</version>
- </dependency>
- <dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>3.0.3</version>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 878725c705..dc60046805 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -252,10 +252,14 @@ class StreamingContext private (
groupId: String,
topics: Map[String, Int],
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2
- ): DStream[String] = {
+ ): DStream[(String, String)] = {
val kafkaParams = Map[String, String](
- "zk.connect" -> zkQuorum, "groupid" -> groupId, "zk.connectiontimeout.ms" -> "10000")
- kafkaStream[String, kafka.serializer.StringDecoder](kafkaParams, topics, storageLevel)
+ "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
+ "zookeeper.connection.timeout.ms" -> "10000")
+ kafkaStream[String, String, kafka.serializer.StringDecoder, kafka.serializer.StringDecoder](
+ kafkaParams,
+ topics,
+ storageLevel)
}
/**
@@ -266,12 +270,16 @@ class StreamingContext private (
* in its own thread.
* @param storageLevel Storage level to use for storing the received objects
*/
- def kafkaStream[T: ClassManifest, D <: kafka.serializer.Decoder[_]: Manifest](
+ def kafkaStream[
+ K: ClassManifest,
+ V: ClassManifest,
+ U <: kafka.serializer.Decoder[_]: Manifest,
+ T <: kafka.serializer.Decoder[_]: Manifest](
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
- ): DStream[T] = {
- val inputStream = new KafkaInputDStream[T, D](this, kafkaParams, topics, storageLevel)
+ ): DStream[(K, V)] = {
+ val inputStream = new KafkaInputDStream[K, V, U, T](this, kafkaParams, topics, storageLevel)
registerInputStream(inputStream)
inputStream
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 54ba3e6025..6423b916b0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -141,7 +141,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
zkQuorum: String,
groupId: String,
topics: JMap[String, JInt])
- : JavaDStream[String] = {
+ : JavaPairDStream[String, String] = {
implicit val cmt: ClassManifest[String] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]]
ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
@@ -162,7 +162,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
groupId: String,
topics: JMap[String, JInt],
storageLevel: StorageLevel)
- : JavaDStream[String] = {
+ : JavaPairDStream[String, String] = {
implicit val cmt: ClassManifest[String] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]]
ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
@@ -171,25 +171,34 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Create an input stream that pulls messages form a Kafka Broker.
- * @param typeClass Type of RDD
- * @param decoderClass Type of kafka decoder
+ * @param keyTypeClass Key type of RDD
+ * @param valueTypeClass value type of RDD
+ * @param keyDecoderClass Type of kafka key decoder
+ * @param valueDecoderClass Type of kafka value decoder
* @param kafkaParams Map of kafka configuration paramaters.
* See: http://kafka.apache.org/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel RDD storage level. Defaults to memory-only
*/
- def kafkaStream[T, D <: kafka.serializer.Decoder[_]](
- typeClass: Class[T],
- decoderClass: Class[D],
+ def kafkaStream[K, V, U <: kafka.serializer.Decoder[_], T <: kafka.serializer.Decoder[_]](
+ keyTypeClass: Class[K],
+ valueTypeClass: Class[V],
+ keyDecoderClass: Class[U],
+ valueDecoderClass: Class[T],
kafkaParams: JMap[String, String],
topics: JMap[String, JInt],
storageLevel: StorageLevel)
- : JavaDStream[T] = {
- implicit val cmt: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- implicit val cmd: Manifest[D] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[D]]
- ssc.kafkaStream[T, D](
+ : JavaPairDStream[K, V] = {
+ implicit val keyCmt: ClassManifest[K] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
+ implicit val valueCmt: ClassManifest[V] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+
+ implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
+ implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
+
+ ssc.kafkaStream[K, V, U, T](
kafkaParams.toMap,
Map(topics.mapValues(_.intValue()).toSeq: _*),
storageLevel)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
index 51e913675d..a5de5e1fb5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KafkaInputDStream.scala
@@ -19,22 +19,18 @@ package org.apache.spark.streaming.dstream
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Time, DStreamCheckpointData, StreamingContext}
+import org.apache.spark.streaming.StreamingContext
import java.util.Properties
import java.util.concurrent.Executors
import kafka.consumer._
-import kafka.message.{Message, MessageSet, MessageAndMetadata}
import kafka.serializer.Decoder
-import kafka.utils.{Utils, ZKGroupTopicDirs}
-import kafka.utils.ZkUtils._
+import kafka.utils.VerifiableProperties
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient._
import scala.collection.Map
-import scala.collection.mutable.HashMap
-import scala.collection.JavaConversions._
/**
@@ -46,25 +42,32 @@ import scala.collection.JavaConversions._
* @param storageLevel RDD storage level.
*/
private[streaming]
-class KafkaInputDStream[T: ClassManifest, D <: Decoder[_]: Manifest](
+class KafkaInputDStream[
+ K: ClassManifest,
+ V: ClassManifest,
+ U <: Decoder[_]: Manifest,
+ T <: Decoder[_]: Manifest](
@transient ssc_ : StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
storageLevel: StorageLevel
- ) extends NetworkInputDStream[T](ssc_ ) with Logging {
+ ) extends NetworkInputDStream[(K, V)](ssc_) with Logging {
-
- def getReceiver(): NetworkReceiver[T] = {
- new KafkaReceiver[T, D](kafkaParams, topics, storageLevel)
- .asInstanceOf[NetworkReceiver[T]]
+ def getReceiver(): NetworkReceiver[(K, V)] = {
+ new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
+ .asInstanceOf[NetworkReceiver[(K, V)]]
}
}
private[streaming]
-class KafkaReceiver[T: ClassManifest, D <: Decoder[_]: Manifest](
- kafkaParams: Map[String, String],
- topics: Map[String, Int],
- storageLevel: StorageLevel
+class KafkaReceiver[
+ K: ClassManifest,
+ V: ClassManifest,
+ U <: Decoder[_]: Manifest,
+ T <: Decoder[_]: Manifest](
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ storageLevel: StorageLevel
) extends NetworkReceiver[Any] {
// Handles pushing data into the BlockManager
@@ -83,27 +86,34 @@ class KafkaReceiver[T: ClassManifest, D <: Decoder[_]: Manifest](
// In case we are using multiple Threads to handle Kafka Messages
val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
- logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("groupid"))
+ logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))
// Kafka connection properties
val props = new Properties()
kafkaParams.foreach(param => props.put(param._1, param._2))
// Create the connection to the cluster
- logInfo("Connecting to Zookeper: " + kafkaParams("zk.connect"))
+ logInfo("Connecting to Zookeper: " + kafkaParams("zookeeper.connect"))
val consumerConfig = new ConsumerConfig(props)
consumerConnector = Consumer.create(consumerConfig)
- logInfo("Connected to " + kafkaParams("zk.connect"))
+ logInfo("Connected to " + kafkaParams("zookeeper.connect"))
// When autooffset.reset is defined, it is our responsibility to try and whack the
// consumer group zk node.
- if (kafkaParams.contains("autooffset.reset")) {
- tryZookeeperConsumerGroupCleanup(kafkaParams("zk.connect"), kafkaParams("groupid"))
+ if (kafkaParams.contains("auto.offset.reset")) {
+ tryZookeeperConsumerGroupCleanup(kafkaParams("zookeeper.connect"), kafkaParams("group.id"))
}
// Create Threads for each Topic/Message Stream we are listening
- val decoder = manifest[D].erasure.newInstance.asInstanceOf[Decoder[T]]
- val topicMessageStreams = consumerConnector.createMessageStreams(topics, decoder)
+ val keyDecoder = manifest[U].erasure.getConstructor(classOf[VerifiableProperties])
+ .newInstance(consumerConfig.props)
+ .asInstanceOf[Decoder[K]]
+ val valueDecoder = manifest[T].erasure.getConstructor(classOf[VerifiableProperties])
+ .newInstance(consumerConfig.props)
+ .asInstanceOf[Decoder[V]]
+
+ val topicMessageStreams = consumerConnector.createMessageStreams(
+ topics, keyDecoder, valueDecoder)
// Start the messages handler for each partition
topicMessageStreams.values.foreach { streams =>
@@ -112,11 +122,12 @@ class KafkaReceiver[T: ClassManifest, D <: Decoder[_]: Manifest](
}
// Handles Kafka Messages
- private class MessageHandler[T: ClassManifest](stream: KafkaStream[T]) extends Runnable {
+ private class MessageHandler[K: ClassManifest, V: ClassManifest](stream: KafkaStream[K, V])
+ extends Runnable {
def run() {
logInfo("Starting MessageHandler.")
for (msgAndMetadata <- stream) {
- blockGenerator += msgAndMetadata.message
+ blockGenerator += (msgAndMetadata.key, msgAndMetadata.message)
}
}
}
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index c0d729ff87..dc01f1e8aa 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -1220,14 +1220,20 @@ public class JavaAPISuite implements Serializable {
@Test
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();
- JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics);
- JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics,
+ JavaPairDStream<String, String> test1 = ssc.kafkaStream("localhost:12345", "group", topics);
+ JavaPairDStream<String, String> test2 = ssc.kafkaStream("localhost:12345", "group", topics,
StorageLevel.MEMORY_AND_DISK());
HashMap<String, String> kafkaParams = Maps.newHashMap();
- kafkaParams.put("zk.connect","localhost:12345");
- kafkaParams.put("groupid","consumer-group");
- JavaDStream test3 = ssc.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics,
+ kafkaParams.put("zookeeper.connect","localhost:12345");
+ kafkaParams.put("group.id","consumer-group");
+ JavaPairDStream<String, String> test3 = ssc.kafkaStream(
+ String.class,
+ String.class,
+ StringDecoder.class,
+ StringDecoder.class,
+ kafkaParams,
+ topics,
StorageLevel.MEMORY_AND_DISK());
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 42e3e51e3f..c29b75ece6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -268,8 +268,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK)
// Test specifying decoder
- val kafkaParams = Map("zk.connect"->"localhost:12345","groupid"->"consumer-group")
- val test3 = ssc.kafkaStream[String, kafka.serializer.StringDecoder](kafkaParams, topics, StorageLevel.MEMORY_AND_DISK)
+ val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
+ val test3 = ssc.kafkaStream[
+ String,
+ String,
+ kafka.serializer.StringDecoder,
+ kafka.serializer.StringDecoder](kafkaParams, topics, StorageLevel.MEMORY_AND_DISK)
}
}