aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorseanm <sean.mcnamara@webtrends.com>2013-05-10 17:34:28 -0600
committerseanm <sean.mcnamara@webtrends.com>2013-05-10 17:34:28 -0600
commitf25282def5826fab6caabff28c82c57a7f3fdcb8 (patch)
treea35530c4fa845eed356026cc35a373bc8922530c /streaming
parent3632980b1b61dbb9ab9a3ab3d92fb415cb7173b9 (diff)
downloadspark-f25282def5826fab6caabff28c82c57a7f3fdcb8.tar.gz
spark-f25282def5826fab6caabff28c82c57a7f3fdcb8.tar.bz2
spark-f25282def5826fab6caabff28c82c57a7f3fdcb8.zip
fixing kafkaStream Java API and adding test
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala10
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java6
2 files changed, 13 insertions, 3 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index 4ad2bdf8a8..b35d9032f1 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -99,18 +99,22 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Create an input stream that pulls messages form a Kafka Broker.
+ * @param typeClass Type of RDD
+ * @param decoderClass Type of kafka decoder
* @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param storageLevel RDD storage level. Defaults to memory-only
*/
- def kafkaStream[T, D <: kafka.serializer.Decoder[_]: Manifest](
+ def kafkaStream[T, D <: kafka.serializer.Decoder[_]](
+ typeClass: Class[T],
+ decoderClass: Class[D],
kafkaParams: JMap[String, String],
topics: JMap[String, JInt],
storageLevel: StorageLevel)
: JavaDStream[T] = {
- implicit val cmt: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cmt: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cmd: Manifest[D] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[D]]
ssc.kafkaStream[T, D](
kafkaParams.toMap,
Map(topics.mapValues(_.intValue()).toSeq: _*),
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 350d0888a3..e5fdbe1b7a 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -1206,6 +1206,12 @@ public class JavaAPISuite implements Serializable {
JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics);
JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics,
StorageLevel.MEMORY_AND_DISK());
+
+ HashMap<String, String> kafkaParams = Maps.newHashMap();
+ kafkaParams.put("zk.connect","localhost:12345");
+ kafkaParams.put("groupid","consumer-group");
+ JavaDStream test3 = ssc.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics,
+ StorageLevel.MEMORY_AND_DISK());
}
@Test