aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorcomcmipi <pitonak@fns.uniba.sk>2014-11-10 12:33:48 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-11-10 12:34:20 -0800
commit254b135705b65a6a12937257bef5565d4f84919a (patch)
tree5d3833de1968bec521e762fe8d4825efbbd79a57
parentcdcf5467ac1274632610c34454088ba1ba1d460e (diff)
downloadspark-254b135705b65a6a12937257bef5565d4f84919a.tar.gz
spark-254b135705b65a6a12937257bef5565d4f84919a.tar.bz2
spark-254b135705b65a6a12937257bef5565d4f84919a.zip
Update RecoverableNetworkWordCount.scala
Trying this example, I missed the moment when the checkpoint was iniciated Author: comcmipi <pitonak@fns.uniba.sk> Closes #2735 from comcmipi/patch-1 and squashes the following commits: b6d8001 [comcmipi] Update RecoverableNetworkWordCount.scala 96fe274 [comcmipi] Update RecoverableNetworkWordCount.scala (cherry picked from commit 0340c56a921d4eb4bc9058e25e926721f8df594c) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala5
1 files changed, 3 insertions, 2 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
index eb48db85d3..19427e629f 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
@@ -56,7 +56,7 @@ import org.apache.spark.util.IntParam
*/
object RecoverableNetworkWordCount {
- def createContext(ip: String, port: Int, outputPath: String) = {
+ def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String) = {
// If you do not see this printed, that means the StreamingContext has been loaded
// from the new checkpoint
@@ -66,6 +66,7 @@ object RecoverableNetworkWordCount {
val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount")
// Create the context with a 1 second batch size
val ssc = new StreamingContext(sparkConf, Seconds(1))
+ ssc.checkpoint(checkpointDirectory)
// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
@@ -101,7 +102,7 @@ object RecoverableNetworkWordCount {
val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => {
- createContext(ip, port, outputPath)
+ createContext(ip, port, outputPath, checkpointDirectory)
})
ssc.start()
ssc.awaitTermination()