aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-07 13:59:18 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-07 13:59:18 -0800
commitd55e3aa467ab7d406739255bd8dc3dfc60f3cb16 (patch)
treeb2f270f0d47815a99717d9e4322a360a432e0b74
parentc6b2f765d3f62393818db44a41ab0d438f68cea6 (diff)
downloadspark-d55e3aa467ab7d406739255bd8dc3dfc60f3cb16.tar.gz
spark-d55e3aa467ab7d406739255bd8dc3dfc60f3cb16.tar.bz2
spark-d55e3aa467ab7d406739255bd8dc3dfc60f3cb16.zip
Updated JavaStreamingContext with updated kafkaStream API.
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala26
1 files changed, 9 insertions, 17 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index f82e6a37cc..70d6bd2b1b 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -45,27 +45,24 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Create an input stream that pulls messages form a Kafka Broker.
- * @param hostname Zookeper hostname.
- * @param port Zookeper port.
+ * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
*/
def kafkaStream[T](
- hostname: String,
- port: Int,
+ zkQuorum: String,
groupId: String,
topics: JMap[String, JInt])
: JavaDStream[T] = {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- ssc.kafkaStream[T](hostname, port, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
+ ssc.kafkaStream[T](zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
}
/**
* Create an input stream that pulls messages form a Kafka Broker.
- * @param hostname Zookeper hostname.
- * @param port Zookeper port.
+ * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
@@ -73,8 +70,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* By default the value is pulled from zookeper.
*/
def kafkaStream[T](
- hostname: String,
- port: Int,
+ zkQuorum: String,
groupId: String,
topics: JMap[String, JInt],
initialOffsets: JMap[KafkaPartitionKey, JLong])
@@ -82,8 +78,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
ssc.kafkaStream[T](
- hostname,
- port,
+ zkQuorum,
groupId,
Map(topics.mapValues(_.intValue()).toSeq: _*),
Map(initialOffsets.mapValues(_.longValue()).toSeq: _*))
@@ -91,8 +86,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Create an input stream that pulls messages form a Kafka Broker.
- * @param hostname Zookeper hostname.
- * @param port Zookeper port.
+ * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
@@ -101,8 +95,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param storageLevel RDD storage level. Defaults to memory-only
*/
def kafkaStream[T](
- hostname: String,
- port: Int,
+ zkQuorum: String,
groupId: String,
topics: JMap[String, JInt],
initialOffsets: JMap[KafkaPartitionKey, JLong],
@@ -111,8 +104,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
ssc.kafkaStream[T](
- hostname,
- port,
+ zkQuorum,
groupId,
Map(topics.mapValues(_.intValue()).toSeq: _*),
Map(initialOffsets.mapValues(_.longValue()).toSeq: _*),