diff options
author | seanm <sean.mcnamara@webtrends.com> | 2013-03-15 23:36:52 -0600 |
---|---|---|
committer | seanm <sean.mcnamara@webtrends.com> | 2013-03-15 23:36:52 -0600 |
commit | d61978d0abad30a148680c8a63df33e40e469525 (patch) | |
tree | 86582d05d7bc4a2d7f7f65b414b80dba0ead78b3 /streaming | |
parent | 33fa1e7e4aca4d9e0edf65d2b768b569305fd044 (diff) | |
download | spark-d61978d0abad30a148680c8a63df33e40e469525.tar.gz spark-d61978d0abad30a148680c8a63df33e40e469525.tar.bz2 spark-d61978d0abad30a148680c8a63df33e40e469525.zip |
keeping JavaStreamingContext in sync with StreamingContext + adding comments for better clarity
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala | 7 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala | 6 |
2 files changed, 7 insertions, 6 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index 2373f4824a..7a8864614c 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -80,6 +80,7 @@ class JavaStreamingContext(val ssc: StreamingContext) { /** * Create an input stream that pulls messages form a Kafka Broker. + * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). * @param groupId The group id for this consumer. * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed @@ -87,16 +88,14 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param storageLevel RDD storage level. Defaults to memory-only */ def kafkaStream[T]( - zkQuorum: String, - groupId: String, + kafkaParams: JMap[String, String], topics: JMap[String, JInt], storageLevel: StorageLevel) : JavaDStream[T] = { implicit val cmt: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] ssc.kafkaStream[T]( - zkQuorum, - groupId, + kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) } diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala index c6da1a7f70..85693808d1 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -100,8 +100,10 @@ class KafkaReceiver(kafkaParams: Map[String, String], } } - // Handles cleanup of consumer group znode. Lifted with love from Kafka's - // ConsumerConsole.scala tryCleanupZookeeper() + // Delete consumer group from zookeeper. This effectivly resets the group so we can consume from the beginning again. + // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas' + // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest': + // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) { try { val dir = "/consumers/" + groupId |