aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-10 12:17:09 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-10 12:17:09 -0800
commite4bb845238d0df48f8258e925caf9af5a107af46 (patch)
tree3a11ad9abe691584cc64be3ca31403305f831686
parent2213a5a47fb2d73e3fdebec24356c69ea2968b81 (diff)
downloadspark-e4bb845238d0df48f8258e925caf9af5a107af46.tar.gz
spark-e4bb845238d0df48f8258e925caf9af5a107af46.tar.bz2
spark-e4bb845238d0df48f8258e925caf9af5a107af46.zip
Updated docs based on Patrick's comments in PR 383.
-rw-r--r--core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala3
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala49
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala13
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala14
5 files changed, 58 insertions, 25 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
index dde504fc52..8e07a0f29a 100644
--- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala
@@ -27,8 +27,8 @@ import org.apache.spark.Logging
/**
* This is a custom implementation of scala.collection.mutable.Map which stores the insertion
* timestamp along with each key-value pair. If specified, the timestamp of each pair can be
- * updated every it is accessed. Key-value pairs whose timestamp are older than a particular
- * threshold time can them be removed using the clearOldValues method. This is intended to
+ * updated every time it is accessed. Key-value pairs whose timestamp are older than a particular
+ * threshold time can then be removed using the clearOldValues method. This is intended to
* be a drop-in replacement of scala.collection.mutable.HashMap.
* @param updateTimeStampOnGet When enabled, the timestamp of a pair will be
* updated when it is accessed
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
index aba1704825..4b896eaccb 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala
@@ -21,7 +21,8 @@ import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
/**
- * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
+ * Counts words in text encoded with UTF8 received from the network every second.
+ *
* Usage: NetworkWordCount <master> <hostname> <port>
* <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
index 739f805e87..d51e6e9418 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala
@@ -26,18 +26,41 @@ import com.google.common.io.Files
import java.nio.charset.Charset
/**
- * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
- * Usage: NetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-directory>
+ * Counts words in text encoded with UTF8 received from the network every second.
+ *
+ * Usage: NetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-file>
* <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
- * <checkpoint-directory> directory in a Hadoop compatible file system to which checkpoint
- * data will be saved to; this must be a fault-tolerant file system
- * like HDFS for the system to recover from driver failures
- * <checkpoint-
+ * <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
+ * <output-file> file to which the word counts will be appended
+ *
+ * In local mode, <master> should be 'local[n]' with n > 1
+ * <checkpoint-directory> and <output-file> must be absolute paths
+ *
+ *
* To run this on your local machine, you need to first run a Netcat server
- * `$ nc -lk 9999`
- * and then run the example
- * `$ ./run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999`
+ *
+ * `$ nc -lk 9999`
+ *
+ * and run the example as
+ *
+ * `$ ./run-example org.apache.spark.streaming.examples.RecoverableNetworkWordCount \
+ * local[2] localhost 9999 ~/checkpoint/ ~/out`
+ *
+ * If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create
+ * a new StreamingContext (will print "Creating new context" to the console). Otherwise, if
+ * checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
+ * the checkpoint data.
+ *
+ * To run this example in a local standalone cluster with automatic driver recovery,
+ *
+ * `$ ./spark-class org.apache.spark.deploy.Client -s launch <cluster-url> <path-to-examples-jar> \
+ * org.apache.spark.streaming.examples.RecoverableNetworkWordCount <cluster-url> \
+ * localhost 9999 ~/checkpoint ~/out`
+ *
+ * <path-to-examples-jar> would typically be <spark-dir>/examples/target/scala-XX/spark-examples....jar
+ *
+ * Refer to the online documentation for more details.
*/
object RecoverableNetworkWordCount {
@@ -52,7 +75,7 @@ object RecoverableNetworkWordCount {
// Create the context with a 1 second batch size
val ssc = new StreamingContext(master, "RecoverableNetworkWordCount", Seconds(1),
- System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+ System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
@@ -74,9 +97,13 @@ object RecoverableNetworkWordCount {
System.err.println(
"""
|Usage: RecoverableNetworkWordCount <master> <hostname> <port> <checkpoint-directory> <output-file>
+ | <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ | <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
+ | <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
+ | <output-file> file to which the word counts will be appended
|
|In local mode, <master> should be 'local[n]' with n > 1
- |Both <checkpoint-directory> and <output-file> should be full paths
+ |Both <checkpoint-directory> and <output-file> must be absolute paths
""".stripMargin
)
System.exit(1)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 62b225382e..1249ef4c3d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -43,8 +43,9 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val pendingTimes = ssc.scheduler.getPendingTimes()
val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
val sparkConf = ssc.conf
-
- // do not save these configurations
+
+ // These should be unset when a checkpoint is deserialized,
+ // otherwise the SparkContext won't initialize correctly.
sparkConf.remove("spark.hostPort").remove("spark.driver.host").remove("spark.driver.port")
def validate() {
@@ -102,8 +103,12 @@ object Checkpoint extends Logging {
* Convenience class to handle the writing of graph checkpoint to file
*/
private[streaming]
-class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDir: String, hadoopConf: Configuration)
- extends Logging {
+class CheckpointWriter(
+ jobGenerator: JobGenerator,
+ conf: SparkConf,
+ checkpointDir: String,
+ hadoopConf: Configuration
+ ) extends Logging {
val MAX_ATTEMPTS = 3
val executor = Executors.newFixedThreadPool(1)
val compressionCodec = CompressionCodec.createCodec(conf)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index d96e9ac7b7..523173d45a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -489,15 +489,15 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * JavaStreamingContext object contains a number of static utility functions.
+ * JavaStreamingContext object contains a number of utility functions.
*/
object JavaStreamingContext {
/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
- * recreated from the checkpoint data. If the data does not exist, then the StreamingContext
- * will be created by called the provided `creatingFunc`.
+ * recreated from the checkpoint data. If the data does not exist, then the provided factory
+ * will be used to create a JavaStreamingContext.
*
* @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program
* @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext
@@ -515,8 +515,8 @@ object JavaStreamingContext {
/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
- * recreated from the checkpoint data. If the data does not exist, then the StreamingContext
- * will be created by called the provided `creatingFunc`.
+ * recreated from the checkpoint data. If the data does not exist, then the provided factory
+ * will be used to create a JavaStreamingContext.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext
@@ -537,8 +537,8 @@ object JavaStreamingContext {
/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
- * recreated from the checkpoint data. If the data does not exist, then the StreamingContext
- * will be created by called the provided `creatingFunc`.
+ * recreated from the checkpoint data. If the data does not exist, then the provided factory
+ * will be used to create a JavaStreamingContext.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext