aboutsummaryrefslogtreecommitdiff
path: root/examples/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-09 13:42:04 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-09 13:42:04 -0800
commit6f713e2a3e56185368b66fb087637dec112a1f5d (patch)
tree201400e576fb2dd27ff5362e91de23df4401f69d /examples/src
parenta17cc602ac79b22457ed457023493fe82e9d39df (diff)
downloadspark-6f713e2a3e56185368b66fb087637dec112a1f5d.tar.gz
spark-6f713e2a3e56185368b66fb087637dec112a1f5d.tar.bz2
spark-6f713e2a3e56185368b66fb087637dec112a1f5d.zip
Changed the way StreamingContext finds and reads checkpoint files, and added JavaStreamingContext.getOrCreate.
Diffstat (limited to 'examples/src')
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java7
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala2
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala43
3 files changed, 42 insertions, 10 deletions
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
index def87c199b..d8d6046914 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
@@ -41,17 +41,17 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
public class JavaNetworkWordCount {
public static void main(String[] args) {
if (args.length < 3) {
- System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
+ System.err.println("Usage: JavaNetworkWordCount <master> <hostname> <port>\n" +
"In local mode, <master> should be 'local[n]' with n > 1");
System.exit(1);
}
// Create the context with a 1 second batch size
- JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount",
+ JavaStreamingContext ssc = new JavaStreamingContext(args[0], "JavaNetworkWordCount",
new Duration(1000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
// Create a NetworkInputDStream on target ip:port and count the
- // words in input stream of \n delimited test (eg. generated by 'nc')
+ // words in input stream of \n delimited text (eg. generated by 'nc')
JavaDStream<String> lines = ssc.socketTextStream(args[1], Integer.parseInt(args[2]));
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
@@ -74,6 +74,5 @@ public class JavaNetworkWordCount {
wordCounts.print();
ssc.start();
-
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
index e2487dca5f..5ad4875980 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
@@ -44,7 +44,7 @@ object NetworkWordCount {
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
// Create a NetworkInputDStream on target ip:port and count the
- // words in input stream of \n delimited test (eg. generated by 'nc')
+ // words in input stream of \n delimited text (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(1), args(2).toInt)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
index 0e5f39f772..739f805e87 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.spark.streaming.examples
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
@@ -8,20 +25,37 @@ import org.apache.spark.rdd.RDD
import com.google.common.io.Files
import java.nio.charset.Charset
+/**
+ * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
+ * Usage: NetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-directory>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
+ * <checkpoint-directory> directory in a Hadoop compatible file system to which checkpoint
+ * data will be saved to; this must be a fault-tolerant file system
+ * like HDFS for the system to recover from driver failures
+ * <checkpoint-
+ * To run this on your local machine, you need to first run a Netcat server
+ * `$ nc -lk 9999`
+ * and then run the example
+ * `$ ./run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999`
+ */
+
object RecoverableNetworkWordCount {
def createContext(master: String, ip: String, port: Int, outputPath: String) = {
+ // If you do not see this printed, that means the StreamingContext has been loaded
+ // from the new checkpoint
+ println("Creating new context")
val outputFile = new File(outputPath)
if (outputFile.exists()) outputFile.delete()
// Create the context with a 1 second batch size
- println("Creating new context")
val ssc = new StreamingContext(master, "RecoverableNetworkWordCount", Seconds(1),
System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
// Create a NetworkInputDStream on target ip:port and count the
- // words in input stream of \n delimited test (eg. generated by 'nc')
+ // words in input stream of \n delimited text (eg. generated by 'nc')
val lines = ssc.socketTextStream(ip, port)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
@@ -39,10 +73,10 @@ object RecoverableNetworkWordCount {
System.err.println("You arguments were " + args.mkString("[", ", ", "]"))
System.err.println(
"""
- |Usage: RecoverableNetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-directory>
+ |Usage: RecoverableNetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-file>
|
|In local mode, <master> should be 'local[n]' with n > 1
- |Both <checkpoint-directory> and <output-directory> should be full paths
+ |Both <checkpoint-directory> and <output-file> should be full paths
""".stripMargin
)
System.exit(1)
@@ -53,6 +87,5 @@ object RecoverableNetworkWordCount {
createContext(master, ip, port, outputPath)
})
ssc.start()
-
}
}