aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
diff options
context:
space:
mode:
Diffstat (limited to 'examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala')
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala2
1 files changed, 1 insertions, 1 deletions
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
index d51e6e9418..8c5d0bd568 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
@@ -82,7 +82,7 @@ object RecoverableNetworkWordCount {
val lines = ssc.socketTextStream(ip, port)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
- wordCounts.foreach((rdd: RDD[(String, Int)], time: Time) => {
+ wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]")
println(counts)
println("Appending to " + outputFile.getAbsolutePath)