diff options
author | Denny <dennybritz@gmail.com> | 2012-12-05 10:30:40 -0800 |
---|---|---|
committer | Denny <dennybritz@gmail.com> | 2012-12-05 10:30:40 -0800 |
commit | a23462191f9ad492d14f9efc3e915b1f522f543a (patch) | |
tree | 24d8bcabd6ee895124f96e06c3081c37a9845f57 /streaming | |
parent | 15df4b0e52c1a594ed07981e6f2ee1602de7ccbb (diff) | |
download | spark-a23462191f9ad492d14f9efc3e915b1f522f543a.tar.gz spark-a23462191f9ad492d14f9efc3e915b1f522f543a.tar.bz2 spark-a23462191f9ad492d14f9efc3e915b1f522f543a.zip |
Adjust Kafka code to work with new streaming changes.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/DStream.scala | 4 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala | 5 |
2 files changed, 5 insertions, 4 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index 85106b3ad8..792c129be8 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -365,8 +365,8 @@ extends Serializable with Logging { } } } - logInfo("Updated checkpoint data for time " + currentTime + ", " + checkpointData.size + " checkpoints, " - + "[" + checkpointData.mkString(",") + "]") + logInfo("Updated checkpoint data for time " + currentTime + ", " + checkpointData.rdds.size + " checkpoints, " + + "[" + checkpointData.rdds.mkString(",") + "]") } /** diff --git a/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala index 12e3f49fe9..fe55db6e2c 100644 --- a/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala +++ b/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala @@ -4,6 +4,7 @@ import java.util.Properties import kafka.message.Message import kafka.producer.SyncProducerConfig import kafka.producer._ +import spark.SparkContext import spark.streaming._ import spark.streaming.StreamingContext._ import spark.storage.StorageLevel @@ -19,9 +20,9 @@ object KafkaWordCount { val Array(master, hostname, port, group, topics, numThreads) = args - val ssc = new StreamingContext(master, "KafkaWordCount") + val sc = new SparkContext(master, "KafkaWordCount") + val ssc = new StreamingContext(sc, Seconds(2)) ssc.checkpoint("checkpoint") - ssc.setBatchDuration(Seconds(2)) val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap val lines = ssc.kafkaStream[String](hostname, port.toInt, group, topicpMap) |