aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorDenny <dennybritz@gmail.com>2012-12-05 10:30:40 -0800
committerDenny <dennybritz@gmail.com>2012-12-05 10:30:40 -0800
commita23462191f9ad492d14f9efc3e915b1f522f543a (patch)
tree24d8bcabd6ee895124f96e06c3081c37a9845f57 /streaming/src
parent15df4b0e52c1a594ed07981e6f2ee1602de7ccbb (diff)
downloadspark-a23462191f9ad492d14f9efc3e915b1f522f543a.tar.gz
spark-a23462191f9ad492d14f9efc3e915b1f522f543a.tar.bz2
spark-a23462191f9ad492d14f9efc3e915b1f522f543a.zip
Adjust Kafka code to work with new streaming changes.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala5
2 files changed, 5 insertions, 4 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index 85106b3ad8..792c129be8 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -365,8 +365,8 @@ extends Serializable with Logging {
}
}
}
- logInfo("Updated checkpoint data for time " + currentTime + ", " + checkpointData.size + " checkpoints, "
- + "[" + checkpointData.mkString(",") + "]")
+ logInfo("Updated checkpoint data for time " + currentTime + ", " + checkpointData.rdds.size + " checkpoints, "
+ + "[" + checkpointData.rdds.mkString(",") + "]")
}
/**
diff --git a/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
index 12e3f49fe9..fe55db6e2c 100644
--- a/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
@@ -4,6 +4,7 @@ import java.util.Properties
import kafka.message.Message
import kafka.producer.SyncProducerConfig
import kafka.producer._
+import spark.SparkContext
import spark.streaming._
import spark.streaming.StreamingContext._
import spark.storage.StorageLevel
@@ -19,9 +20,9 @@ object KafkaWordCount {
val Array(master, hostname, port, group, topics, numThreads) = args
- val ssc = new StreamingContext(master, "KafkaWordCount")
+ val sc = new SparkContext(master, "KafkaWordCount")
+ val ssc = new StreamingContext(sc, Seconds(2))
ssc.checkpoint("checkpoint")
- ssc.setBatchDuration(Seconds(2))
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
val lines = ssc.kafkaStream[String](hostname, port.toInt, group, topicpMap)