diff options
Diffstat (limited to 'streaming/src')
8 files changed, 136 insertions, 102 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index e1be5ef51c..9be7926a4a 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -441,7 +441,12 @@ abstract class DStream[T: ClassManifest] ( * Return a new DStream in which each RDD has a single element generated by counting each RDD * of this DStream. */ - def count(): DStream[Long] = this.map(_ => 1L).reduce(_ + _) + def count(): DStream[Long] = { + this.map(_ => (null, 1L)) + .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1))) + .reduceByKey(_ + _) + .map(_._2) + } /** * Return a new DStream in which each RDD contains the counts of each distinct value in @@ -457,7 +462,7 @@ abstract class DStream[T: ClassManifest] ( * this DStream will be registered as an output stream and therefore materialized. */ def foreach(foreachFunc: RDD[T] => Unit) { - foreach((r: RDD[T], t: Time) => foreachFunc(r)) + this.foreach((r: RDD[T], t: Time) => foreachFunc(r)) } /** diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index b8b60aab43..f2c4073f22 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -28,6 +28,7 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path import twitter4j.Status + /** * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic * information (such as, cluster URL and job name) to internally create a SparkContext, it provides @@ -186,10 +187,11 @@ class StreamingContext private ( * should be same. */ def actorStream[T: ClassManifest]( - props: Props, - name: String, - storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, - supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = { + props: Props, + name: String, + storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2, + supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy + ): DStream[T] = { networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy)) } @@ -197,9 +199,10 @@ class StreamingContext private ( * Create an input stream that receives messages pushed by a zeromq publisher. * @param publisherUrl Url of remote zeromq publisher * @param subscribe topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence - * of byte thus it needs the converter(which might be deserializer of bytes) - * to translate from sequence of sequence of bytes, where sequence refer to a frame + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic + * and each frame has sequence of byte thus it needs the converter + * (which might be deserializer of bytes) to translate from sequence + * of sequence of bytes, where sequence refer to a frame * and sub sequence refer to its payload. * @param storageLevel RDD storage level. Defaults to memory-only. */ @@ -215,24 +218,39 @@ class StreamingContext private ( } /** - * Create an input stream that pulls messages form a Kafka Broker. + * Create an input stream that pulls messages from a Kafka Broker. * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). * @param groupId The group id for this consumer. * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param initialOffsets Optional initial offsets for each of the partitions to consume. - * By default the value is pulled from zookeper. + * in its own thread. * @param storageLevel Storage level to use for storing the received objects * (default: StorageLevel.MEMORY_AND_DISK_SER_2) */ - def kafkaStream[T: ClassManifest]( + def kafkaStream( zkQuorum: String, groupId: String, topics: Map[String, Int], - initialOffsets: Map[KafkaPartitionKey, Long] = Map[KafkaPartitionKey, Long](), storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2 + ): DStream[String] = { + val kafkaParams = Map[String, String]( + "zk.connect" -> zkQuorum, "groupid" -> groupId, "zk.connectiontimeout.ms" -> "10000") + kafkaStream[String, kafka.serializer.StringDecoder](kafkaParams, topics, storageLevel) + } + + /** + * Create an input stream that pulls messages from a Kafka Broker. + * @param kafkaParams Map of kafka configuration paramaters. + * See: http://kafka.apache.org/configuration.html + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param storageLevel Storage level to use for storing the received objects + */ + def kafkaStream[T: ClassManifest, D <: kafka.serializer.Decoder[_]: Manifest]( + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel ): DStream[T] = { - val inputStream = new KafkaInputDStream[T](this, zkQuorum, groupId, topics, initialOffsets, storageLevel) + val inputStream = new KafkaInputDStream[T, D](this, kafkaParams, topics, storageLevel) registerInputStream(inputStream) inputStream } @@ -397,7 +415,8 @@ class StreamingContext private ( * it will process either one or all of the RDDs returned by the queue. * @param queue Queue of RDDs * @param oneAtATime Whether only one RDD should be consumed from the queue in every interval - * @param defaultRDD Default RDD is returned by the DStream when the queue is empty. Set as null if no RDD should be returned when empty + * @param defaultRDD Default RDD is returned by the DStream when the queue is empty. + * Set as null if no RDD should be returned when empty * @tparam T Type of objects in the RDD */ def queueStream[T: ClassManifest]( diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index 3d149a742c..4259d4891c 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -121,14 +121,15 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. */ - def kafkaStream[T]( + def kafkaStream( zkQuorum: String, groupId: String, topics: JMap[String, JInt]) - : JavaDStream[T] = { - implicit val cmt: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] - ssc.kafkaStream[T](zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) + : JavaDStream[String] = { + implicit val cmt: ClassManifest[String] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]] + ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), + StorageLevel.MEMORY_ONLY_SER_2) } /** @@ -136,49 +137,45 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). * @param groupId The group id for this consumer. * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed - * in its own thread. - * @param initialOffsets Optional initial offsets for each of the partitions to consume. - * By default the value is pulled from zookeper. + * in its own thread. + * @param storageLevel RDD storage level. Defaults to memory-only + * */ - def kafkaStream[T]( + def kafkaStream( zkQuorum: String, groupId: String, topics: JMap[String, JInt], - initialOffsets: JMap[KafkaPartitionKey, JLong]) - : JavaDStream[T] = { - implicit val cmt: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] - ssc.kafkaStream[T]( - zkQuorum, - groupId, - Map(topics.mapValues(_.intValue()).toSeq: _*), - Map(initialOffsets.mapValues(_.longValue()).toSeq: _*)) + storageLevel: StorageLevel) + : JavaDStream[String] = { + implicit val cmt: ClassManifest[String] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]] + ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), + storageLevel) } /** * Create an input stream that pulls messages form a Kafka Broker. - * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer. + * @param typeClass Type of RDD + * @param decoderClass Type of kafka decoder + * @param kafkaParams Map of kafka configuration paramaters. + * See: http://kafka.apache.org/configuration.html * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. - * @param initialOffsets Optional initial offsets for each of the partitions to consume. - * By default the value is pulled from zookeper. * @param storageLevel RDD storage level. Defaults to memory-only */ - def kafkaStream[T]( - zkQuorum: String, - groupId: String, + def kafkaStream[T, D <: kafka.serializer.Decoder[_]]( + typeClass: Class[T], + decoderClass: Class[D], + kafkaParams: JMap[String, String], topics: JMap[String, JInt], - initialOffsets: JMap[KafkaPartitionKey, JLong], storageLevel: StorageLevel) : JavaDStream[T] = { implicit val cmt: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] - ssc.kafkaStream[T]( - zkQuorum, - groupId, + implicit val cmd: Manifest[D] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[D]] + ssc.kafkaStream[T, D]( + kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), - Map(initialOffsets.mapValues(_.longValue()).toSeq: _*), storageLevel) } diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala index ddd9becf32..55d2957be4 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -9,58 +9,51 @@ import java.util.concurrent.Executors import kafka.consumer._ import kafka.message.{Message, MessageSet, MessageAndMetadata} -import kafka.serializer.StringDecoder +import kafka.serializer.Decoder import kafka.utils.{Utils, ZKGroupTopicDirs} import kafka.utils.ZkUtils._ +import kafka.utils.ZKStringSerializer +import org.I0Itec.zkclient._ import scala.collection.Map import scala.collection.mutable.HashMap import scala.collection.JavaConversions._ -// Key for a specific Kafka Partition: (broker, topic, group, part) -case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int) - /** * Input stream that pulls messages from a Kafka Broker. * - * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..). - * @param groupId The group id for this consumer. + * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread. - * @param initialOffsets Optional initial offsets for each of the partitions to consume. - * By default the value is pulled from zookeper. * @param storageLevel RDD storage level. */ private[streaming] -class KafkaInputDStream[T: ClassManifest]( +class KafkaInputDStream[T: ClassManifest, D <: Decoder[_]: Manifest]( @transient ssc_ : StreamingContext, - zkQuorum: String, - groupId: String, + kafkaParams: Map[String, String], topics: Map[String, Int], - initialOffsets: Map[KafkaPartitionKey, Long], storageLevel: StorageLevel ) extends NetworkInputDStream[T](ssc_ ) with Logging { def getReceiver(): NetworkReceiver[T] = { - new KafkaReceiver(zkQuorum, groupId, topics, initialOffsets, storageLevel) + new KafkaReceiver[T, D](kafkaParams, topics, storageLevel) .asInstanceOf[NetworkReceiver[T]] } } private[streaming] -class KafkaReceiver(zkQuorum: String, groupId: String, - topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long], - storageLevel: StorageLevel) extends NetworkReceiver[Any] { - - // Timeout for establishing a connection to Zookeper in ms. - val ZK_TIMEOUT = 10000 +class KafkaReceiver[T: ClassManifest, D <: Decoder[_]: Manifest]( + kafkaParams: Map[String, String], + topics: Map[String, Int], + storageLevel: StorageLevel + ) extends NetworkReceiver[Any] { // Handles pushing data into the BlockManager lazy protected val blockGenerator = new BlockGenerator(storageLevel) // Connection to Kafka - var consumerConnector : ZookeeperConsumerConnector = null + var consumerConnector : ConsumerConnector = null def onStop() { blockGenerator.stop() @@ -73,54 +66,59 @@ class KafkaReceiver(zkQuorum: String, groupId: String, // In case we are using multiple Threads to handle Kafka Messages val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) - logInfo("Starting Kafka Consumer Stream with group: " + groupId) - logInfo("Initial offsets: " + initialOffsets.toString) + logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("groupid")) - // Zookeper connection properties + // Kafka connection properties val props = new Properties() - props.put("zk.connect", zkQuorum) - props.put("zk.connectiontimeout.ms", ZK_TIMEOUT.toString) - props.put("groupid", groupId) + kafkaParams.foreach(param => props.put(param._1, param._2)) // Create the connection to the cluster - logInfo("Connecting to Zookeper: " + zkQuorum) + logInfo("Connecting to Zookeper: " + kafkaParams("zk.connect")) val consumerConfig = new ConsumerConfig(props) - consumerConnector = Consumer.create(consumerConfig).asInstanceOf[ZookeeperConsumerConnector] - logInfo("Connected to " + zkQuorum) + consumerConnector = Consumer.create(consumerConfig) + logInfo("Connected to " + kafkaParams("zk.connect")) - // If specified, set the topic offset - setOffsets(initialOffsets) + // When autooffset.reset is defined, it is our responsibility to try and whack the + // consumer group zk node. + if (kafkaParams.contains("autooffset.reset")) { + tryZookeeperConsumerGroupCleanup(kafkaParams("zk.connect"), kafkaParams("groupid")) + } // Create Threads for each Topic/Message Stream we are listening - val topicMessageStreams = consumerConnector.createMessageStreams(topics, new StringDecoder()) + val decoder = manifest[D].erasure.newInstance.asInstanceOf[Decoder[T]] + val topicMessageStreams = consumerConnector.createMessageStreams(topics, decoder) // Start the messages handler for each partition topicMessageStreams.values.foreach { streams => streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } } - - } - - // Overwrites the offets in Zookeper. - private def setOffsets(offsets: Map[KafkaPartitionKey, Long]) { - offsets.foreach { case(key, offset) => - val topicDirs = new ZKGroupTopicDirs(key.groupId, key.topic) - val partitionName = key.brokerId + "-" + key.partId - updatePersistentPath(consumerConnector.zkClient, - topicDirs.consumerOffsetDir + "/" + partitionName, offset.toString) - } } // Handles Kafka Messages - private class MessageHandler(stream: KafkaStream[String]) extends Runnable { + private class MessageHandler[T: ClassManifest](stream: KafkaStream[T]) extends Runnable { def run() { logInfo("Starting MessageHandler.") - stream.takeWhile { msgAndMetadata => + for (msgAndMetadata <- stream) { blockGenerator += msgAndMetadata.message - // Keep on handling messages - - true } } } + + // It is our responsibility to delete the consumer group when specifying autooffset.reset. This is because + // Kafka 0.7.2 only honors this param when the group is not in zookeeper. + // + // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas' + // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest'/'largest': + // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala + private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) { + try { + val dir = "/consumers/" + groupId + logInfo("Cleaning up temporary zookeeper data under " + dir + ".") + val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer) + zk.deleteRecursive(dir) + zk.close() + } catch { + case _ => // swallow + } + } } diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala index 7385474963..26805e9621 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala @@ -198,7 +198,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log case class Block(id: String, iterator: Iterator[T], metadata: Any = null) val clock = new SystemClock() - val blockInterval = 200L + val blockInterval = System.getProperty("spark.streaming.blockInterval", "200").toLong val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer) val blockStorageLevel = storageLevel val blocksForPushing = new ArrayBlockingQueue[Block](1000) diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java index 3bed500f73..e5fdbe1b7a 100644 --- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java @@ -4,6 +4,7 @@ import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Files; +import kafka.serializer.StringDecoder; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.junit.After; import org.junit.Assert; @@ -23,7 +24,6 @@ import spark.streaming.api.java.JavaPairDStream; import spark.streaming.api.java.JavaStreamingContext; import spark.streaming.JavaTestUtils; import spark.streaming.JavaCheckpointTestUtils; -import spark.streaming.dstream.KafkaPartitionKey; import spark.streaming.InputStreamsSuite; import java.io.*; @@ -1203,10 +1203,14 @@ public class JavaAPISuite implements Serializable { @Test public void testKafkaStream() { HashMap<String, Integer> topics = Maps.newHashMap(); - HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap(); JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics); - JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, offsets); - JavaDStream test3 = ssc.kafkaStream("localhost:12345", "group", topics, offsets, + JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, + StorageLevel.MEMORY_AND_DISK()); + + HashMap<String, String> kafkaParams = Maps.newHashMap(); + kafkaParams.put("zk.connect","localhost:12345"); + kafkaParams.put("groupid","consumer-group"); + JavaDStream test3 = ssc.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK()); } diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala index e7352deb81..565089a853 100644 --- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala @@ -93,9 +93,9 @@ class BasicOperationsSuite extends TestSuiteBase { test("count") { testOperation( - Seq(1 to 1, 1 to 2, 1 to 3, 1 to 4), + Seq(Seq(), 1 to 1, 1 to 2, 1 to 3, 1 to 4), (s: DStream[Int]) => s.count(), - Seq(Seq(1L), Seq(2L), Seq(3L), Seq(4L)) + Seq(Seq(0L), Seq(1L), Seq(2L), Seq(3L), Seq(4L)) ) } diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 0acb6db6f2..b024fc9dcc 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -243,6 +243,17 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { assert(output(i) === expectedOutput(i)) } } + + test("kafka input stream") { + val ssc = new StreamingContext(master, framework, batchDuration) + val topics = Map("my-topic" -> 1) + val test1 = ssc.kafkaStream("localhost:12345", "group", topics) + val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK) + + // Test specifying decoder + val kafkaParams = Map("zk.connect"->"localhost:12345","groupid"->"consumer-group") + val test3 = ssc.kafkaStream[String, kafka.serializer.StringDecoder](kafkaParams, topics, StorageLevel.MEMORY_AND_DISK) + } } |