aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-06-25 09:16:57 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-06-25 09:16:57 -0700
commitf5e32ed13a7820fdc38f56dd9a19c160800fd652 (patch)
tree01f9f42f30a4aa2f73cb200c89a71254bf74d80e
parent78ffe164b33c6b11a2e511442605acd2f795a1b5 (diff)
parentc89af0a7f9eebce22dfe2bb4d8b1676ec7f760f6 (diff)
downloadspark-f5e32ed13a7820fdc38f56dd9a19c160800fd652.tar.gz
spark-f5e32ed13a7820fdc38f56dd9a19c160800fd652.tar.bz2
spark-f5e32ed13a7820fdc38f56dd9a19c160800fd652.zip
Merge pull request #661 from mesos/streaming
Kafka fixes and DStream.count fix for master
-rw-r--r--.gitignore1
-rw-r--r--docs/configuration.md7
-rw-r--r--examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala2
-rw-r--r--project/SparkBuild.scala3
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala9
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala49
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala57
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala94
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala2
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java12
-rw-r--r--streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala4
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala11
12 files changed, 148 insertions, 103 deletions
diff --git a/.gitignore b/.gitignore
index b87fc1ee79..ae39c52b11 100644
--- a/.gitignore
+++ b/.gitignore
@@ -36,4 +36,5 @@ streaming-tests.log
dependency-reduced-pom.xml
.ensime
.ensime_lucene
+checkpoint
derby.log
diff --git a/docs/configuration.md b/docs/configuration.md
index 17fdbf04d1..2de512f896 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -260,6 +260,13 @@ Apart from these, the following properties are also available, and may be useful
applications). Note that any RDD that persists in memory for more than this duration will be cleared as well.
</td>
</tr>
+<tr>
+ <td>spark.streaming.blockInterval</td>
+ <td>200</td>
+ <td>
+ Duration (milliseconds) of how long to batch new objects coming from network receivers.
+ </td>
+</tr>
</table>
diff --git a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
index c3a9e491ba..9202e65e09 100644
--- a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
@@ -37,7 +37,7 @@ object KafkaWordCount {
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
- val lines = ssc.kafkaStream[String](zkQuorum, group, topicpMap)
+ val lines = ssc.kafkaStream(zkQuorum, group, topicpMap)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
wordCounts.print()
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index faf6e2ae8e..484f97d992 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -222,6 +222,9 @@ object SparkBuild extends Build {
def streamingSettings = sharedSettings ++ Seq(
name := "spark-streaming",
+ resolvers ++= Seq(
+ "Akka Repository" at "http://repo.akka.io/releases/"
+ ),
libraryDependencies ++= Seq(
"org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty),
"com.github.sgroschupf" % "zkclient" % "0.1" excludeAll(excludeNetty),
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index e1be5ef51c..9be7926a4a 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -441,7 +441,12 @@ abstract class DStream[T: ClassManifest] (
* Return a new DStream in which each RDD has a single element generated by counting each RDD
* of this DStream.
*/
- def count(): DStream[Long] = this.map(_ => 1L).reduce(_ + _)
+ def count(): DStream[Long] = {
+ this.map(_ => (null, 1L))
+ .transform(_.union(context.sparkContext.makeRDD(Seq((null, 0L)), 1)))
+ .reduceByKey(_ + _)
+ .map(_._2)
+ }
/**
* Return a new DStream in which each RDD contains the counts of each distinct value in
@@ -457,7 +462,7 @@ abstract class DStream[T: ClassManifest] (
* this DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: RDD[T] => Unit) {
- foreach((r: RDD[T], t: Time) => foreachFunc(r))
+ this.foreach((r: RDD[T], t: Time) => foreachFunc(r))
}
/**
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index b8b60aab43..f2c4073f22 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -28,6 +28,7 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.fs.Path
import twitter4j.Status
+
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
* information (such as, cluster URL and job name) to internally create a SparkContext, it provides
@@ -186,10 +187,11 @@ class StreamingContext private (
* should be same.
*/
def actorStream[T: ClassManifest](
- props: Props,
- name: String,
- storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
- supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy): DStream[T] = {
+ props: Props,
+ name: String,
+ storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
+ supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
+ ): DStream[T] = {
networkStream(new ActorReceiver[T](props, name, storageLevel, supervisorStrategy))
}
@@ -197,9 +199,10 @@ class StreamingContext private (
* Create an input stream that receives messages pushed by a zeromq publisher.
* @param publisherUrl Url of remote zeromq publisher
* @param subscribe topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
- * of byte thus it needs the converter(which might be deserializer of bytes)
- * to translate from sequence of sequence of bytes, where sequence refer to a frame
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
+ * and each frame has sequence of byte thus it needs the converter
+ * (which might be deserializer of bytes) to translate from sequence
+ * of sequence of bytes, where sequence refer to a frame
* and sub sequence refer to its payload.
* @param storageLevel RDD storage level. Defaults to memory-only.
*/
@@ -215,24 +218,39 @@ class StreamingContext private (
}
/**
- * Create an input stream that pulls messages form a Kafka Broker.
+ * Create an input stream that pulls messages from a Kafka Broker.
* @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param initialOffsets Optional initial offsets for each of the partitions to consume.
- * By default the value is pulled from zookeper.
+ * in its own thread.
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
- def kafkaStream[T: ClassManifest](
+ def kafkaStream(
zkQuorum: String,
groupId: String,
topics: Map[String, Int],
- initialOffsets: Map[KafkaPartitionKey, Long] = Map[KafkaPartitionKey, Long](),
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2
+ ): DStream[String] = {
+ val kafkaParams = Map[String, String](
+ "zk.connect" -> zkQuorum, "groupid" -> groupId, "zk.connectiontimeout.ms" -> "10000")
+ kafkaStream[String, kafka.serializer.StringDecoder](kafkaParams, topics, storageLevel)
+ }
+
+ /**
+ * Create an input stream that pulls messages from a Kafka Broker.
+ * @param kafkaParams Map of kafka configuration paramaters.
+ * See: http://kafka.apache.org/configuration.html
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def kafkaStream[T: ClassManifest, D <: kafka.serializer.Decoder[_]: Manifest](
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ storageLevel: StorageLevel
): DStream[T] = {
- val inputStream = new KafkaInputDStream[T](this, zkQuorum, groupId, topics, initialOffsets, storageLevel)
+ val inputStream = new KafkaInputDStream[T, D](this, kafkaParams, topics, storageLevel)
registerInputStream(inputStream)
inputStream
}
@@ -397,7 +415,8 @@ class StreamingContext private (
* it will process either one or all of the RDDs returned by the queue.
* @param queue Queue of RDDs
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
- * @param defaultRDD Default RDD is returned by the DStream when the queue is empty. Set as null if no RDD should be returned when empty
+ * @param defaultRDD Default RDD is returned by the DStream when the queue is empty.
+ * Set as null if no RDD should be returned when empty
* @tparam T Type of objects in the RDD
*/
def queueStream[T: ClassManifest](
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index 3d149a742c..4259d4891c 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -121,14 +121,15 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
*/
- def kafkaStream[T](
+ def kafkaStream(
zkQuorum: String,
groupId: String,
topics: JMap[String, JInt])
- : JavaDStream[T] = {
- implicit val cmt: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- ssc.kafkaStream[T](zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
+ : JavaDStream[String] = {
+ implicit val cmt: ClassManifest[String] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]]
+ ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
+ StorageLevel.MEMORY_ONLY_SER_2)
}
/**
@@ -136,49 +137,45 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
- * in its own thread.
- * @param initialOffsets Optional initial offsets for each of the partitions to consume.
- * By default the value is pulled from zookeper.
+ * in its own thread.
+ * @param storageLevel RDD storage level. Defaults to memory-only
+ *
*/
- def kafkaStream[T](
+ def kafkaStream(
zkQuorum: String,
groupId: String,
topics: JMap[String, JInt],
- initialOffsets: JMap[KafkaPartitionKey, JLong])
- : JavaDStream[T] = {
- implicit val cmt: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- ssc.kafkaStream[T](
- zkQuorum,
- groupId,
- Map(topics.mapValues(_.intValue()).toSeq: _*),
- Map(initialOffsets.mapValues(_.longValue()).toSeq: _*))
+ storageLevel: StorageLevel)
+ : JavaDStream[String] = {
+ implicit val cmt: ClassManifest[String] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[String]]
+ ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
+ storageLevel)
}
/**
* Create an input stream that pulls messages form a Kafka Broker.
- * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
- * @param groupId The group id for this consumer.
+ * @param typeClass Type of RDD
+ * @param decoderClass Type of kafka decoder
+ * @param kafkaParams Map of kafka configuration paramaters.
+ * See: http://kafka.apache.org/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
- * @param initialOffsets Optional initial offsets for each of the partitions to consume.
- * By default the value is pulled from zookeper.
* @param storageLevel RDD storage level. Defaults to memory-only
*/
- def kafkaStream[T](
- zkQuorum: String,
- groupId: String,
+ def kafkaStream[T, D <: kafka.serializer.Decoder[_]](
+ typeClass: Class[T],
+ decoderClass: Class[D],
+ kafkaParams: JMap[String, String],
topics: JMap[String, JInt],
- initialOffsets: JMap[KafkaPartitionKey, JLong],
storageLevel: StorageLevel)
: JavaDStream[T] = {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
- ssc.kafkaStream[T](
- zkQuorum,
- groupId,
+ implicit val cmd: Manifest[D] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[D]]
+ ssc.kafkaStream[T, D](
+ kafkaParams.toMap,
Map(topics.mapValues(_.intValue()).toSeq: _*),
- Map(initialOffsets.mapValues(_.longValue()).toSeq: _*),
storageLevel)
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index ddd9becf32..55d2957be4 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -9,58 +9,51 @@ import java.util.concurrent.Executors
import kafka.consumer._
import kafka.message.{Message, MessageSet, MessageAndMetadata}
-import kafka.serializer.StringDecoder
+import kafka.serializer.Decoder
import kafka.utils.{Utils, ZKGroupTopicDirs}
import kafka.utils.ZkUtils._
+import kafka.utils.ZKStringSerializer
+import org.I0Itec.zkclient._
import scala.collection.Map
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
-// Key for a specific Kafka Partition: (broker, topic, group, part)
-case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int)
-
/**
* Input stream that pulls messages from a Kafka Broker.
*
- * @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
- * @param groupId The group id for this consumer.
+ * @param kafkaParams Map of kafka configuration paramaters. See: http://kafka.apache.org/configuration.html
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
- * @param initialOffsets Optional initial offsets for each of the partitions to consume.
- * By default the value is pulled from zookeper.
* @param storageLevel RDD storage level.
*/
private[streaming]
-class KafkaInputDStream[T: ClassManifest](
+class KafkaInputDStream[T: ClassManifest, D <: Decoder[_]: Manifest](
@transient ssc_ : StreamingContext,
- zkQuorum: String,
- groupId: String,
+ kafkaParams: Map[String, String],
topics: Map[String, Int],
- initialOffsets: Map[KafkaPartitionKey, Long],
storageLevel: StorageLevel
) extends NetworkInputDStream[T](ssc_ ) with Logging {
def getReceiver(): NetworkReceiver[T] = {
- new KafkaReceiver(zkQuorum, groupId, topics, initialOffsets, storageLevel)
+ new KafkaReceiver[T, D](kafkaParams, topics, storageLevel)
.asInstanceOf[NetworkReceiver[T]]
}
}
private[streaming]
-class KafkaReceiver(zkQuorum: String, groupId: String,
- topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long],
- storageLevel: StorageLevel) extends NetworkReceiver[Any] {
-
- // Timeout for establishing a connection to Zookeper in ms.
- val ZK_TIMEOUT = 10000
+class KafkaReceiver[T: ClassManifest, D <: Decoder[_]: Manifest](
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ storageLevel: StorageLevel
+ ) extends NetworkReceiver[Any] {
// Handles pushing data into the BlockManager
lazy protected val blockGenerator = new BlockGenerator(storageLevel)
// Connection to Kafka
- var consumerConnector : ZookeeperConsumerConnector = null
+ var consumerConnector : ConsumerConnector = null
def onStop() {
blockGenerator.stop()
@@ -73,54 +66,59 @@ class KafkaReceiver(zkQuorum: String, groupId: String,
// In case we are using multiple Threads to handle Kafka Messages
val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _))
- logInfo("Starting Kafka Consumer Stream with group: " + groupId)
- logInfo("Initial offsets: " + initialOffsets.toString)
+ logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("groupid"))
- // Zookeper connection properties
+ // Kafka connection properties
val props = new Properties()
- props.put("zk.connect", zkQuorum)
- props.put("zk.connectiontimeout.ms", ZK_TIMEOUT.toString)
- props.put("groupid", groupId)
+ kafkaParams.foreach(param => props.put(param._1, param._2))
// Create the connection to the cluster
- logInfo("Connecting to Zookeper: " + zkQuorum)
+ logInfo("Connecting to Zookeper: " + kafkaParams("zk.connect"))
val consumerConfig = new ConsumerConfig(props)
- consumerConnector = Consumer.create(consumerConfig).asInstanceOf[ZookeeperConsumerConnector]
- logInfo("Connected to " + zkQuorum)
+ consumerConnector = Consumer.create(consumerConfig)
+ logInfo("Connected to " + kafkaParams("zk.connect"))
- // If specified, set the topic offset
- setOffsets(initialOffsets)
+ // When autooffset.reset is defined, it is our responsibility to try and whack the
+ // consumer group zk node.
+ if (kafkaParams.contains("autooffset.reset")) {
+ tryZookeeperConsumerGroupCleanup(kafkaParams("zk.connect"), kafkaParams("groupid"))
+ }
// Create Threads for each Topic/Message Stream we are listening
- val topicMessageStreams = consumerConnector.createMessageStreams(topics, new StringDecoder())
+ val decoder = manifest[D].erasure.newInstance.asInstanceOf[Decoder[T]]
+ val topicMessageStreams = consumerConnector.createMessageStreams(topics, decoder)
// Start the messages handler for each partition
topicMessageStreams.values.foreach { streams =>
streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
}
-
- }
-
- // Overwrites the offets in Zookeper.
- private def setOffsets(offsets: Map[KafkaPartitionKey, Long]) {
- offsets.foreach { case(key, offset) =>
- val topicDirs = new ZKGroupTopicDirs(key.groupId, key.topic)
- val partitionName = key.brokerId + "-" + key.partId
- updatePersistentPath(consumerConnector.zkClient,
- topicDirs.consumerOffsetDir + "/" + partitionName, offset.toString)
- }
}
// Handles Kafka Messages
- private class MessageHandler(stream: KafkaStream[String]) extends Runnable {
+ private class MessageHandler[T: ClassManifest](stream: KafkaStream[T]) extends Runnable {
def run() {
logInfo("Starting MessageHandler.")
- stream.takeWhile { msgAndMetadata =>
+ for (msgAndMetadata <- stream) {
blockGenerator += msgAndMetadata.message
- // Keep on handling messages
-
- true
}
}
}
+
+ // It is our responsibility to delete the consumer group when specifying autooffset.reset. This is because
+ // Kafka 0.7.2 only honors this param when the group is not in zookeeper.
+ //
+ // The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied from Kafkas'
+ // ConsoleConsumer. See code related to 'autooffset.reset' when it is set to 'smallest'/'largest':
+ // https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+ private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
+ try {
+ val dir = "/consumers/" + groupId
+ logInfo("Cleaning up temporary zookeeper data under " + dir + ".")
+ val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
+ zk.deleteRecursive(dir)
+ zk.close()
+ } catch {
+ case _ => // swallow
+ }
+ }
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
index 7385474963..26805e9621 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
@@ -198,7 +198,7 @@ abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Log
case class Block(id: String, iterator: Iterator[T], metadata: Any = null)
val clock = new SystemClock()
- val blockInterval = 200L
+ val blockInterval = System.getProperty("spark.streaming.blockInterval", "200").toLong
val blockIntervalTimer = new RecurringTimer(clock, blockInterval, updateCurrentBuffer)
val blockStorageLevel = storageLevel
val blocksForPushing = new ArrayBlockingQueue[Block](1000)
diff --git a/streaming/src/test/java/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index 3bed500f73..e5fdbe1b7a 100644
--- a/streaming/src/test/java/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -4,6 +4,7 @@ import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
+import kafka.serializer.StringDecoder;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.junit.After;
import org.junit.Assert;
@@ -23,7 +24,6 @@ import spark.streaming.api.java.JavaPairDStream;
import spark.streaming.api.java.JavaStreamingContext;
import spark.streaming.JavaTestUtils;
import spark.streaming.JavaCheckpointTestUtils;
-import spark.streaming.dstream.KafkaPartitionKey;
import spark.streaming.InputStreamsSuite;
import java.io.*;
@@ -1203,10 +1203,14 @@ public class JavaAPISuite implements Serializable {
@Test
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();
- HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap();
JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics);
- JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, offsets);
- JavaDStream test3 = ssc.kafkaStream("localhost:12345", "group", topics, offsets,
+ JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics,
+ StorageLevel.MEMORY_AND_DISK());
+
+ HashMap<String, String> kafkaParams = Maps.newHashMap();
+ kafkaParams.put("zk.connect","localhost:12345");
+ kafkaParams.put("groupid","consumer-group");
+ JavaDStream test3 = ssc.kafkaStream(String.class, StringDecoder.class, kafkaParams, topics,
StorageLevel.MEMORY_AND_DISK());
}
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
index e7352deb81..565089a853 100644
--- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
@@ -93,9 +93,9 @@ class BasicOperationsSuite extends TestSuiteBase {
test("count") {
testOperation(
- Seq(1 to 1, 1 to 2, 1 to 3, 1 to 4),
+ Seq(Seq(), 1 to 1, 1 to 2, 1 to 3, 1 to 4),
(s: DStream[Int]) => s.count(),
- Seq(Seq(1L), Seq(2L), Seq(3L), Seq(4L))
+ Seq(Seq(0L), Seq(1L), Seq(2L), Seq(3L), Seq(4L))
)
}
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index 0acb6db6f2..b024fc9dcc 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -243,6 +243,17 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
assert(output(i) === expectedOutput(i))
}
}
+
+ test("kafka input stream") {
+ val ssc = new StreamingContext(master, framework, batchDuration)
+ val topics = Map("my-topic" -> 1)
+ val test1 = ssc.kafkaStream("localhost:12345", "group", topics)
+ val test2 = ssc.kafkaStream("localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK)
+
+ // Test specifying decoder
+ val kafkaParams = Map("zk.connect"->"localhost:12345","groupid"->"consumer-group")
+ val test3 = ssc.kafkaStream[String, kafka.serializer.StringDecoder](kafkaParams, topics, StorageLevel.MEMORY_AND_DISK)
+ }
}