aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala5
2 files changed, 5 insertions, 4 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 85106b3ad8..792c129be8 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -365,8 +365,8 @@ extends Serializable with Logging {
}
}
}
- logInfo("Updated checkpoint data for time " + currentTime + ", " + checkpointData.size + " checkpoints, "
- + "[" + checkpointData.mkString(",") + "]")
+ logInfo("Updated checkpoint data for time " + currentTime + ", " + checkpointData.rdds.size + " checkpoints, "
+ + "[" + checkpointData.rdds.mkString(",") + "]")
}
/**
diff --git a/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
index 12e3f49fe9..fe55db6e2c 100644
--- a/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
@@ -4,6 +4,7 @@ import java.util.Properties
import kafka.message.Message
import kafka.producer.SyncProducerConfig
import kafka.producer._
+import spark.SparkContext
import spark.streaming._
import spark.streaming.StreamingContext._
import spark.storage.StorageLevel
@@ -19,9 +20,9 @@ object KafkaWordCount {
val Array(master, hostname, port, group, topics, numThreads) = args
- val ssc = new StreamingContext(master, "KafkaWordCount")
+ val sc = new SparkContext(master, "KafkaWordCount")
+ val ssc = new StreamingContext(sc, Seconds(2))
ssc.checkpoint("checkpoint")
- ssc.setBatchDuration(Seconds(2))
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
val lines = ssc.kafkaStream[String](hostname, port.toInt, group, topicpMap)