aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorseanm <sean.mcnamara@webtrends.com>2013-03-15 23:36:52 -0600
committerseanm <sean.mcnamara@webtrends.com>2013-03-15 23:36:52 -0600
commitd61978d0abad30a148680c8a63df33e40e469525 (patch)
tree86582d05d7bc4a2d7f7f65b414b80dba0ead78b3 /streaming
parent33fa1e7e4aca4d9e0edf65d2b768b569305fd044 (diff)
downloadspark-d61978d0abad30a148680c8a63df33e40e469525.tar.gz
spark-d61978d0abad30a148680c8a63df33e40e469525.tar.bz2
spark-d61978d0abad30a148680c8a63df33e40e469525.zip
keeping JavaStreamingContext in sync with StreamingContext + adding comments for better clarity
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala7
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala6
2 files changed, 7 insertions, 6 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index 2373f4824a..7a8864614c 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -80,6 +80,7 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Create an input stream that pulls messages form a Kafka Broker.
+ * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
* @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
@@ -87,16 +88,14 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param storageLevel RDD storage level. Defaults to memory-only
*/
def kafkaStream[T](
- zkQuorum: String,
- groupId: String,
+ kafkaParams: JMap[String, String],
topics: JMap[String, JInt],
storageLevel: StorageLevel)
: JavaDStream[T] = {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
ssc.kafkaStream[T](
- zkQuorum,
- groupId,
+ kafkaParams.toMap,
Map(topics.mapValues(_.intValue()).toSeq: _*),
storageLevel)
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index c6da1a7f70..85693808d1 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -100,8 +100,10 @@ class KafkaReceiver(kafkaParams: Map[String, String],
}
}
- // Handles cleanup of consumer group znode. Lifted with love from Kafka's
- // ConsumerConsole.scala tryCleanupZookeeper()
+ // Delete consumer group from zookeeper. This effectivly resets the group so we can consume from the beginning again.
+ // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas'
+ // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest':
+ // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
try {
val dir = "/consumers/" + groupId