aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorseanm <sean.mcnamara@webtrends.com>2013-03-14 23:32:52 -0600
committerseanm <sean.mcnamara@webtrends.com>2013-03-15 00:10:13 -0600
commit33fa1e7e4aca4d9e0edf65d2b768b569305fd044 (patch)
tree3c3b2c54cc8c778fbcc45052d679d865ad3b90e1 /streaming/src/main
parentd06928321194b11e082986cd2bb2737d9bc3b698 (diff)
downloadspark-33fa1e7e4aca4d9e0edf65d2b768b569305fd044.tar.gz
spark-33fa1e7e4aca4d9e0edf65d2b768b569305fd044.tar.bz2
spark-33fa1e7e4aca4d9e0edf65d2b768b569305fd044.zip
removing dependency on ZookeeperConsumerConnector + purging last relic of kafka reliability that never solidified (ie- setOffsets)
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala9
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala28
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala28
3 files changed, 6 insertions, 59 deletions
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index 4e1732adf5..bb7f216ca7 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -204,8 +204,6 @@ class StreamingContext private (
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
- * @param initialOffsets Optional initial offsets for each of the partitions to consume.
- * By default the value is pulled from zookeper.
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
@@ -213,11 +211,10 @@ class StreamingContext private (
zkQuorum: String,
groupId: String,
topics: Map[String, Int],
- initialOffsets: Map[KafkaPartitionKey, Long] = Map[KafkaPartitionKey, Long](),
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2
): DStream[T] = {
val kafkaParams = Map[String, String]("zk.connect" -> zkQuorum, "groupid" -> groupId, "zk.connectiontimeout.ms" -> "10000");
- kafkaStream[T](kafkaParams, topics, initialOffsets, storageLevel)
+ kafkaStream[T](kafkaParams, topics, storageLevel)
}
/**
@@ -225,16 +222,14 @@ class StreamingContext private (
* @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
- * @param initialOffsets Optional initial offsets for each of the partitions to consume.
* @param storageLevel Storage level to use for storing the received objects
*/
def kafkaStream[T: ClassManifest](
kafkaParams: Map[String, String],
topics: Map[String, Int],
- initialOffsets: Map[KafkaPartitionKey, Long],
storageLevel: StorageLevel
): DStream[T] = {
- val inputStream = new KafkaInputDStream[T](this, kafkaParams, topics, initialOffsets, storageLevel)
+ val inputStream = new KafkaInputDStream[T](this, kafkaParams, topics, storageLevel)
registerInputStream(inputStream)
inputStream
}
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index f3b40b5b88..2373f4824a 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -84,39 +84,12 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
- * @param initialOffsets Optional initial offsets for each of the partitions to consume.
- * By default the value is pulled from zookeper.
- */
- def kafkaStream[T](
- zkQuorum: String,
- groupId: String,
- topics: JMap[String, JInt],
- initialOffsets: JMap[KafkaPartitionKey, JLong])
- : JavaDStream[T] = {
- implicit val cmt: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- ssc.kafkaStream[T](
- zkQuorum,
- groupId,
- Map(topics.mapValues(_.intValue()).toSeq: _*),
- Map(initialOffsets.mapValues(_.longValue()).toSeq: _*))
- }
-
- /**
- * Create an input stream that pulls messages form a Kafka Broker.
- * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
- * @param groupId The group id for this consumer.
- * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param initialOffsets Optional initial offsets for each of the partitions to consume.
- * By default the value is pulled from zookeper.
* @param storageLevel RDD storage level. Defaults to memory-only
*/
def kafkaStream[T](
zkQuorum: String,
groupId: String,
topics: JMap[String, JInt],
- initialOffsets: JMap[KafkaPartitionKey, JLong],
storageLevel: StorageLevel)
: JavaDStream[T] = {
implicit val cmt: ClassManifest[T] =
@@ -125,7 +98,6 @@ class JavaStreamingContext(val ssc: StreamingContext) {
zkQuorum,
groupId,
Map(topics.mapValues(_.intValue()).toSeq: _*),
- Map(initialOffsets.mapValues(_.longValue()).toSeq: _*),
storageLevel)
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index d674b6ee87..c6da1a7f70 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -19,17 +19,12 @@ import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
-// Key for a specific Kafka Partition: (broker, topic, group, part)
-case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int)
-
/**
* Input stream that pulls messages from a Kafka Broker.
*
* @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
- * @param initialOffsets Optional initial offsets for each of the partitions to consume.
- * By default the value is pulled from zookeper.
* @param storageLevel RDD storage level.
*/
private[streaming]
@@ -37,26 +32,25 @@ class KafkaInputDStream[T: ClassManifest](
@transient ssc_ : StreamingContext,
kafkaParams: Map[String, String],
topics: Map[String, Int],
- initialOffsets: Map[KafkaPartitionKey, Long],
storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_ ) with Logging {
def getReceiver(): NetworkReceiver[T] = {
- new KafkaReceiver(kafkaParams, topics, initialOffsets, storageLevel)
+ new KafkaReceiver(kafkaParams, topics, storageLevel)
.asInstanceOf[NetworkReceiver[T]]
}
}
private[streaming]
class KafkaReceiver(kafkaParams: Map[String, String],
- topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long],
+ topics: Map[String, Int],
storageLevel: StorageLevel) extends NetworkReceiver[Any] {
// Handles pushing data into the BlockManager
lazy protected val blockGenerator = new BlockGenerator(storageLevel)
// Connection to Kafka
- var consumerConnector : ZookeeperConsumerConnector = null
+ var consumerConnector : ConsumerConnector = null
def onStop() {
blockGenerator.stop()
@@ -70,7 +64,6 @@ class KafkaReceiver(kafkaParams: Map[String, String],
val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("groupid"))
- logInfo("Initial offsets: " + initialOffsets.toString)
// Kafka connection properties
val props = new Properties()
@@ -79,7 +72,7 @@ class KafkaReceiver(kafkaParams: Map[String, String],
// Create the connection to the cluster
logInfo("Connecting to Zookeper: " + kafkaParams("zk.connect"))
val consumerConfig = new ConsumerConfig(props)
- consumerConnector = Consumer.create(consumerConfig).asInstanceOf[ZookeeperConsumerConnector]
+ consumerConnector = Consumer.create(consumerConfig)
logInfo("Connected to " + kafkaParams("zk.connect"))
// When autooffset.reset is 'smallest', it is our responsibility to try and whack the
@@ -88,9 +81,6 @@ class KafkaReceiver(kafkaParams: Map[String, String],
tryZookeeperConsumerGroupCleanup(kafkaParams("zk.connect"), kafkaParams("groupid"))
}
- // If specified, set the topic offset
- setOffsets(initialOffsets)
-
// Create Threads for each Topic/Message Stream we are listening
val topicMessageStreams = consumerConnector.createMessageStreams(topics, new StringDecoder())
@@ -100,16 +90,6 @@ class KafkaReceiver(kafkaParams: Map[String, String],
}
}
- // Overwrites the offets in Zookeper.
- private def setOffsets(offsets: Map[KafkaPartitionKey, Long]) {
- offsets.foreach { case(key, offset) =>
- val topicDirs = new ZKGroupTopicDirs(key.groupId, key.topic)
- val partitionName = key.brokerId + "-" + key.partId
- updatePersistentPath(consumerConnector.zkClient,
- topicDirs.consumerOffsetDir + "/" + partitionName, offset.toString)
- }
- }
-
// Handles Kafka Messages
private class MessageHandler(stream: KafkaStream[String]) extends Runnable {
def run() {