aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-02 19:07:22 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-02 19:07:22 -0800
commita1b8dd53e3474dae2c49b30bc9719c7f6b98c7cc (patch)
tree71a8001723003d3247c5773d90701a154ee94208 /examples
parenta8729770f5dc944444b9996716c2f6a26485a819 (diff)
downloadspark-a1b8dd53e3474dae2c49b30bc9719c7f6b98c7cc.tar.gz
spark-a1b8dd53e3474dae2c49b30bc9719c7f6b98c7cc.tar.bz2
spark-a1b8dd53e3474dae2c49b30bc9719c7f6b98c7cc.zip
Added StreamingContext.getOrCreate to for automatic recovery, and added RecoverableNetworkWordCount example to use it.
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala58
1 files changed, 58 insertions, 0 deletions
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
new file mode 100644
index 0000000000..0e5f39f772
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
@@ -0,0 +1,58 @@
+package org.apache.spark.streaming.examples
+
+import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.util.IntParam
+import java.io.File
+import org.apache.spark.rdd.RDD
+import com.google.common.io.Files
+import java.nio.charset.Charset
+
+object RecoverableNetworkWordCount {
+
+ def createContext(master: String, ip: String, port: Int, outputPath: String) = {
+
+ val outputFile = new File(outputPath)
+ if (outputFile.exists()) outputFile.delete()
+
+ // Create the context with a 1 second batch size
+ println("Creating new context")
+ val ssc = new StreamingContext(master, "RecoverableNetworkWordCount", Seconds(1),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+
+ // Create a NetworkInputDStream on target ip:port and count the
+ // words in input stream of \n delimited test (eg. generated by 'nc')
+ val lines = ssc.socketTextStream(ip, port)
+ val words = lines.flatMap(_.split(" "))
+ val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ wordCounts.foreach((rdd: RDD[(String, Int)], time: Time) => {
+ val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]")
+ println(counts)
+ println("Appending to " + outputFile.getAbsolutePath)
+ Files.append(counts + "\n", outputFile, Charset.defaultCharset())
+ })
+ ssc
+ }
+
+ def main(args: Array[String]) {
+ if (args.length != 5) {
+ System.err.println("You arguments were " + args.mkString("[", ", ", "]"))
+ System.err.println(
+ """
+ |Usage: RecoverableNetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-directory>
+ |
+ |In local mode, <master> should be 'local[n]' with n > 1
+ |Both <checkpoint-directory> and <output-directory> should be full paths
+ """.stripMargin
+ )
+ System.exit(1)
+ }
+ val Array(master, ip, IntParam(port), checkpointDirectory, outputPath) = args
+ val ssc = StreamingContext.getOrCreate(checkpointDirectory,
+ () => {
+ createContext(master, ip, port, outputPath)
+ })
+ ssc.start()
+
+ }
+}