diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-09 13:42:04 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-09 13:42:04 -0800 |
commit | 6f713e2a3e56185368b66fb087637dec112a1f5d (patch) | |
tree | 201400e576fb2dd27ff5362e91de23df4401f69d /examples | |
parent | a17cc602ac79b22457ed457023493fe82e9d39df (diff) | |
download | spark-6f713e2a3e56185368b66fb087637dec112a1f5d.tar.gz spark-6f713e2a3e56185368b66fb087637dec112a1f5d.tar.bz2 spark-6f713e2a3e56185368b66fb087637dec112a1f5d.zip |
Changed the way StreamingContext finds and reads checkpoint files, and added JavaStreamingContext.getOrCreate.
Diffstat (limited to 'examples')
3 files changed, 42 insertions, 10 deletions
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java index def87c199b..d8d6046914 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java @@ -41,17 +41,17 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext; public class JavaNetworkWordCount { public static void main(String[] args) { if (args.length < 3) { - System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" + + System.err.println("Usage: JavaNetworkWordCount <master> <hostname> <port>\n" + "In local mode, <master> should be 'local[n]' with n > 1"); System.exit(1); } // Create the context with a 1 second batch size - JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount", + JavaStreamingContext ssc = new JavaStreamingContext(args[0], "JavaNetworkWordCount", new Duration(1000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); // Create a NetworkInputDStream on target ip:port and count the - // words in input stream of \n delimited test (eg. generated by 'nc') + // words in input stream of \n delimited text (eg. generated by 'nc') JavaDStream<String> lines = ssc.socketTextStream(args[1], Integer.parseInt(args[2])); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override @@ -74,6 +74,5 @@ public class JavaNetworkWordCount { wordCounts.print(); ssc.start(); - } } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala index e2487dca5f..5ad4875980 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala @@ -44,7 +44,7 @@ object NetworkWordCount { System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) // Create a NetworkInputDStream on target ip:port and count the - // words in input stream of \n delimited test (eg. generated by 'nc') + // words in input stream of \n delimited text (eg. generated by 'nc') val lines = ssc.socketTextStream(args(1), args(2).toInt) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala index 0e5f39f772..739f805e87 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.streaming.examples import org.apache.spark.streaming.{Time, Seconds, StreamingContext} @@ -8,20 +25,37 @@ import org.apache.spark.rdd.RDD import com.google.common.io.Files import java.nio.charset.Charset +/** + * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * Usage: NetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-directory> + * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1. + * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data. + * <checkpoint-directory> directory in a Hadoop compatible file system to which checkpoint + * data will be saved to; this must be a fault-tolerant file system + * like HDFS for the system to recover from driver failures + * <checkpoint- + * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ ./run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999` + */ + object RecoverableNetworkWordCount { def createContext(master: String, ip: String, port: Int, outputPath: String) = { + // If you do not see this printed, that means the StreamingContext has been loaded + // from the new checkpoint + println("Creating new context") val outputFile = new File(outputPath) if (outputFile.exists()) outputFile.delete() // Create the context with a 1 second batch size - println("Creating new context") val ssc = new StreamingContext(master, "RecoverableNetworkWordCount", Seconds(1), System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) // Create a NetworkInputDStream on target ip:port and count the - // words in input stream of \n delimited test (eg. generated by 'nc') + // words in input stream of \n delimited text (eg. generated by 'nc') val lines = ssc.socketTextStream(ip, port) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) @@ -39,10 +73,10 @@ object RecoverableNetworkWordCount { System.err.println("You arguments were " + args.mkString("[", ", ", "]")) System.err.println( """ - |Usage: RecoverableNetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-directory> + |Usage: RecoverableNetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-file> | |In local mode, <master> should be 'local[n]' with n > 1 - |Both <checkpoint-directory> and <output-directory> should be full paths + |Both <checkpoint-directory> and <output-file> should be full paths """.stripMargin ) System.exit(1) @@ -53,6 +87,5 @@ object RecoverableNetworkWordCount { createContext(master, ip, port, outputPath) }) ssc.start() - } } |