diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-02 19:07:22 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-02 19:07:22 -0800 |
commit | a1b8dd53e3474dae2c49b30bc9719c7f6b98c7cc (patch) | |
tree | 71a8001723003d3247c5773d90701a154ee94208 /examples | |
parent | a8729770f5dc944444b9996716c2f6a26485a819 (diff) | |
download | spark-a1b8dd53e3474dae2c49b30bc9719c7f6b98c7cc.tar.gz spark-a1b8dd53e3474dae2c49b30bc9719c7f6b98c7cc.tar.bz2 spark-a1b8dd53e3474dae2c49b30bc9719c7f6b98c7cc.zip |
Added StreamingContext.getOrCreate to for automatic recovery, and added RecoverableNetworkWordCount example to use it.
Diffstat (limited to 'examples')
-rw-r--r-- | examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala | 58 |
1 files changed, 58 insertions, 0 deletions
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala new file mode 100644 index 0000000000..0e5f39f772 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala @@ -0,0 +1,58 @@ +package org.apache.spark.streaming.examples + +import org.apache.spark.streaming.{Time, Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.util.IntParam +import java.io.File +import org.apache.spark.rdd.RDD +import com.google.common.io.Files +import java.nio.charset.Charset + +object RecoverableNetworkWordCount { + + def createContext(master: String, ip: String, port: Int, outputPath: String) = { + + val outputFile = new File(outputPath) + if (outputFile.exists()) outputFile.delete() + + // Create the context with a 1 second batch size + println("Creating new context") + val ssc = new StreamingContext(master, "RecoverableNetworkWordCount", Seconds(1), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + + // Create a NetworkInputDStream on target ip:port and count the + // words in input stream of \n delimited test (eg. generated by 'nc') + val lines = ssc.socketTextStream(ip, port) + val words = lines.flatMap(_.split(" ")) + val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) + wordCounts.foreach((rdd: RDD[(String, Int)], time: Time) => { + val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]") + println(counts) + println("Appending to " + outputFile.getAbsolutePath) + Files.append(counts + "\n", outputFile, Charset.defaultCharset()) + }) + ssc + } + + def main(args: Array[String]) { + if (args.length != 5) { + System.err.println("You arguments were " + args.mkString("[", ", ", "]")) + System.err.println( + """ + |Usage: RecoverableNetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-directory> + | + |In local mode, <master> should be 'local[n]' with n > 1 + |Both <checkpoint-directory> and <output-directory> should be full paths + """.stripMargin + ) + System.exit(1) + } + val Array(master, ip, IntParam(port), checkpointDirectory, outputPath) = args + val ssc = StreamingContext.getOrCreate(checkpointDirectory, + () => { + createContext(master, ip, port, outputPath) + }) + ssc.start() + + } +} |