diff options
author | seanm <sean.mcnamara@webtrends.com> | 2013-05-10 17:34:28 -0600 |
---|---|---|
committer | seanm <sean.mcnamara@webtrends.com> | 2013-05-10 17:34:28 -0600 |
commit | f25282def5826fab6caabff28c82c57a7f3fdcb8 (patch) | |
tree | a35530c4fa845eed356026cc35a373bc8922530c | |
parent | 3632980b1b61dbb9ab9a3ab3d92fb415cb7173b9 (diff) | |
download | spark-f25282def5826fab6caabff28c82c57a7f3fdcb8.tar.gz spark-f25282def5826fab6caabff28c82c57a7f3fdcb8.tar.bz2 spark-f25282def5826fab6caabff28c82c57a7f3fdcb8.zip |
fixing kafkaStream Java API and adding test
-rw-r--r-- | streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala | 10 | ||||
-rw-r--r-- | streaming/src/test/java/spark/streaming/JavaAPISuite.java | 6 |
2 files changed, 13 insertions, 3 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index 4ad2bdf8a8..b35d9032f1 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -99,18 +99,22 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Create an input stream that pulls messages form a Kafka Broker. + * @param typeClass Type of RDD + * @param decoderClass Type of kafka decoder * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. * @param storageLevel RDD storage level. Defaults to memory-only */ - def kafkaStream[T, D <: kafka.serializer.Decoder[_]: Manifest]( + def kafkaStream[T, D <: kafka.serializer.Decoder[_]]( + typeClass: Class[T], + decoderClass: Class[D], kafkaParams: JMap[String, String], topics: JMap[String, JInt], storageLevel: StorageLevel) : JavaDStream[T] = { - implicit val cmt: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cmt: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cmd: Manifest[D] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[D]] ssc.kafkaStream[T, D]( kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 350d0888a3..e5fdbe1b7a 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -1206,6 +1206,12 @@ public class JavaAPISuite implements Serializable { JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics); JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK()); + + HashMap<String, String> kafkaParams = Maps.newHashMap(); + kafkaParams.put("zk.connect","localhost:12345"); + kafkaParams.put("groupid","consumer-group"); + JavaDStream test3 = ssc.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics, + StorageLevel.MEMORY_AND_DISK()); } @Test |