diff options
author | comcmipi <pitonak@fns.uniba.sk> | 2014-11-10 12:33:48 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-11-10 12:33:48 -0800 |
commit | 0340c56a921d4eb4bc9058e25e926721f8df594c (patch) | |
tree | 1419670946947aecdd778e16347723f81a863d9c /examples/src/main/scala | |
parent | 3a02d416cd82a7a942fd6ff4a0e05ff070eb218a (diff) | |
download | spark-0340c56a921d4eb4bc9058e25e926721f8df594c.tar.gz spark-0340c56a921d4eb4bc9058e25e926721f8df594c.tar.bz2 spark-0340c56a921d4eb4bc9058e25e926721f8df594c.zip |
Update RecoverableNetworkWordCount.scala
Trying this example, I missed the moment when the checkpoint was iniciated
Author: comcmipi <pitonak@fns.uniba.sk>
Closes #2735 from comcmipi/patch-1 and squashes the following commits:
b6d8001 [comcmipi] Update RecoverableNetworkWordCount.scala
96fe274 [comcmipi] Update RecoverableNetworkWordCount.scala
Diffstat (limited to 'examples/src/main/scala')
-rw-r--r-- | examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala | 5 |
1 files changed, 3 insertions, 2 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala index eb48db85d3..19427e629f 100644 --- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala @@ -56,7 +56,7 @@ import org.apache.spark.util.IntParam */ object RecoverableNetworkWordCount { - def createContext(ip: String, port: Int, outputPath: String) = { + def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String) = { // If you do not see this printed, that means the StreamingContext has been loaded // from the new checkpoint @@ -66,6 +66,7 @@ object RecoverableNetworkWordCount { val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Seconds(1)) + ssc.checkpoint(checkpointDirectory) // Create a socket stream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') @@ -101,7 +102,7 @@ object RecoverableNetworkWordCount { val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args val ssc = StreamingContext.getOrCreate(checkpointDirectory, () => { - createContext(ip, port, outputPath) + createContext(ip, port, outputPath, checkpointDirectory) }) ssc.start() ssc.awaitTermination() |