diff options
author | seanm <sean.mcnamara@webtrends.com> | 2013-03-14 23:32:52 -0600 |
---|---|---|
committer | seanm <sean.mcnamara@webtrends.com> | 2013-03-15 00:10:13 -0600 |
commit | 33fa1e7e4aca4d9e0edf65d2b768b569305fd044 (patch) | |
tree | 3c3b2c54cc8c778fbcc45052d679d865ad3b90e1 /streaming | |
parent | d06928321194b11e082986cd2bb2737d9bc3b698 (diff) | |
download | spark-33fa1e7e4aca4d9e0edf65d2b768b569305fd044.tar.gz spark-33fa1e7e4aca4d9e0edf65d2b768b569305fd044.tar.bz2 spark-33fa1e7e4aca4d9e0edf65d2b768b569305fd044.zip |
removing dependency on ZookeeperConsumerConnector + purging last relic of kafka reliability that never solidified (ie- setOffsets)
Diffstat (limited to 'streaming')
3 files changed, 6 insertions, 59 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index 4e1732adf5..bb7f216ca7 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -204,8 +204,6 @@ class StreamingContext private ( * @param groupId The group id for this consumer. * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. - * @param initialOffsets Optional initial offsets for each of the partitions to consume. - * By default the value is pulled from zookeper. * @param storageLevel Storage level to use for storing the received objects * (default: StorageLevel.MEMORY_AND_DISK_SER_2) */ @@ -213,11 +211,10 @@ class StreamingContext private ( zkQuorum: String, groupId: String, topics: Map[String, Int], - initialOffsets: Map[KafkaPartitionKey, Long] = Map[KafkaPartitionKey, Long](), storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2 ): DStream[T] = { val kafkaParams = Map[String, String]("zk.connect" -> zkQuorum, "groupid" -> groupId, "zk.connectiontimeout.ms" -> "10000"); - kafkaStream[T](kafkaParams, topics, initialOffsets, storageLevel) + kafkaStream[T](kafkaParams, topics, storageLevel) } /** @@ -225,16 +222,14 @@ class StreamingContext private ( * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. - * @param initialOffsets Optional initial offsets for each of the partitions to consume. * @param storageLevel Storage level to use for storing the received objects */ def kafkaStream[T: ClassManifest]( kafkaParams: Map[String, String], topics: Map[String, Int], - initialOffsets: Map[KafkaPartitionKey, Long], storageLevel: StorageLevel ): DStream[T] = { - val inputStream = new KafkaInputDStream[T](this, kafkaParams, topics, initialOffsets, storageLevel) + val inputStream = new KafkaInputDStream[T](this, kafkaParams, topics, storageLevel) registerInputStream(inputStream) inputStream } diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index f3b40b5b88..2373f4824a 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -84,39 +84,12 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param groupId The group id for this consumer. * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. - * @param initialOffsets Optional initial offsets for each of the partitions to consume. - * By default the value is pulled from zookeper. - */ - def kafkaStream[T]( - zkQuorum: String, - groupId: String, - topics: JMap[String, JInt], - initialOffsets: JMap[KafkaPartitionKey, JLong]) - : JavaDStream[T] = { - implicit val cmt: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] - ssc.kafkaStream[T]( - zkQuorum, - groupId, - Map(topics.mapValues(_.intValue()).toSeq: _*), - Map(initialOffsets.mapValues(_.longValue()).toSeq: _*)) - } - - /** - * Create an input stream that pulls messages form a Kafka Broker. - * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer. - * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param initialOffsets Optional initial offsets for each of the partitions to consume. - * By default the value is pulled from zookeper. * @param storageLevel RDD storage level. Defaults to memory-only */ def kafkaStream[T]( zkQuorum: String, groupId: String, topics: JMap[String, JInt], - initialOffsets: JMap[KafkaPartitionKey, JLong], storageLevel: StorageLevel) : JavaDStream[T] = { implicit val cmt: ClassManifest[T] = @@ -125,7 +98,6 @@ class JavaStreamingContext(val ssc: StreamingContext) { zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), - Map(initialOffsets.mapValues(_.longValue()).toSeq: _*), storageLevel) } diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala index d674b6ee87..c6da1a7f70 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -19,17 +19,12 @@ import scala.collection.mutable.HashMap import scala.collection.JavaConversions._ -// Key for a specific Kafka Partition: (broker, topic, group, part) -case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int) - /** * Input stream that pulls messages from a Kafka Broker. * * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. - * @param initialOffsets Optional initial offsets for each of the partitions to consume. - * By default the value is pulled from zookeper. * @param storageLevel RDD storage level. */ private[streaming] @@ -37,26 +32,25 @@ class KafkaInputDStream[T: ClassManifest]( @transient ssc_ : StreamingContext, kafkaParams: Map[String, String], topics: Map[String, Int], - initialOffsets: Map[KafkaPartitionKey, Long], storageLevel: StorageLevel ) extends NetworkInputDStream[T](ssc_ ) with Logging { def getReceiver(): NetworkReceiver[T] = { - new KafkaReceiver(kafkaParams, topics, initialOffsets, storageLevel) + new KafkaReceiver(kafkaParams, topics, storageLevel) .asInstanceOf[NetworkReceiver[T]] } } private[streaming] class KafkaReceiver(kafkaParams: Map[String, String], - topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long], + topics: Map[String, Int], storageLevel: StorageLevel) extends NetworkReceiver[Any] { // Handles pushing data into the BlockManager lazy protected val blockGenerator = new BlockGenerator(storageLevel) // Connection to Kafka - var consumerConnector : ZookeeperConsumerConnector = null + var consumerConnector : ConsumerConnector = null def onStop() { blockGenerator.stop() @@ -70,7 +64,6 @@ class KafkaReceiver(kafkaParams: Map[String, String], val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("groupid")) - logInfo("Initial offsets: " + initialOffsets.toString) // Kafka connection properties val props = new Properties() @@ -79,7 +72,7 @@ class KafkaReceiver(kafkaParams: Map[String, String], // Create the connection to the cluster logInfo("Connecting to Zookeper: " + kafkaParams("zk.connect")) val consumerConfig = new ConsumerConfig(props) - consumerConnector = Consumer.create(consumerConfig).asInstanceOf[ZookeeperConsumerConnector] + consumerConnector = Consumer.create(consumerConfig) logInfo("Connected to " + kafkaParams("zk.connect")) // When autooffset.reset is 'smallest', it is our responsibility to try and whack the @@ -88,9 +81,6 @@ class KafkaReceiver(kafkaParams: Map[String, String], tryZookeeperConsumerGroupCleanup(kafkaParams("zk.connect"), kafkaParams("groupid")) } - // If specified, set the topic offset - setOffsets(initialOffsets) - // Create Threads for each Topic/Message Stream we are listening val topicMessageStreams = consumerConnector.createMessageStreams(topics, new StringDecoder()) @@ -100,16 +90,6 @@ class KafkaReceiver(kafkaParams: Map[String, String], } } - // Overwrites the offets in Zookeper. - private def setOffsets(offsets: Map[KafkaPartitionKey, Long]) { - offsets.foreach { case(key, offset) => - val topicDirs = new ZKGroupTopicDirs(key.groupId, key.topic) - val partitionName = key.brokerId + "-" + key.partId - updatePersistentPath(consumerConnector.zkClient, - topicDirs.consumerOffsetDir + "/" + partitionName, offset.toString) - } - } - // Handles Kafka Messages private class MessageHandler(stream: KafkaStream[String]) extends Runnable { def run() { |