From e4bb845238d0df48f8258e925caf9af5a107af46 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 10 Jan 2014 12:17:09 -0800 Subject: Updated docs based on Patrick's comments in PR 383. --- .../org/apache/spark/util/TimeStampedHashMap.scala | 4 +- .../streaming/examples/NetworkWordCount.scala | 3 +- .../examples/RecoverableNetworkWordCount.scala | 49 +++++++++++++++++----- .../org/apache/spark/streaming/Checkpoint.scala | 13 ++++-- .../streaming/api/java/JavaStreamingContext.scala | 14 +++---- 5 files changed, 58 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala index dde504fc52..8e07a0f29a 100644 --- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala @@ -27,8 +27,8 @@ import org.apache.spark.Logging /** * This is a custom implementation of scala.collection.mutable.Map which stores the insertion * timestamp along with each key-value pair. If specified, the timestamp of each pair can be - * updated every it is accessed. Key-value pairs whose timestamp are older than a particular - * threshold time can them be removed using the clearOldValues method. This is intended to + * updated every time it is accessed. Key-value pairs whose timestamp are older than a particular + * threshold time can then be removed using the clearOldValues method. This is intended to * be a drop-in replacement of scala.collection.mutable.HashMap. * @param updateTimeStampOnGet When enabled, the timestamp of a pair will be * updated when it is accessed diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala index aba1704825..4b896eaccb 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala @@ -21,7 +21,8 @@ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ /** - * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. + * Counts words in text encoded with UTF8 received from the network every second. + * * Usage: NetworkWordCount * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. * and describe the TCP server that Spark Streaming would connect to receive data. diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala index 739f805e87..d51e6e9418 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/RecoverableNetworkWordCount.scala @@ -26,18 +26,41 @@ import com.google.common.io.Files import java.nio.charset.Charset /** - * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. - * Usage: NetworkWordCount + * Counts words in text encoded with UTF8 received from the network every second. + * + * Usage: NetworkWordCount * is the Spark master URL. In local mode, should be 'local[n]' with n > 1. * and describe the TCP server that Spark Streaming would connect to receive data. - * directory in a Hadoop compatible file system to which checkpoint - * data will be saved to; this must be a fault-tolerant file system - * like HDFS for the system to recover from driver failures - * directory to HDFS-compatible file system which checkpoint data + * file to which the word counts will be appended + * + * In local mode, should be 'local[n]' with n > 1 + * and must be absolute paths + * + * * To run this on your local machine, you need to first run a Netcat server - * `$ nc -lk 9999` - * and then run the example - * `$ ./run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999` + * + * `$ nc -lk 9999` + * + * and run the example as + * + * `$ ./run-example org.apache.spark.streaming.examples.RecoverableNetworkWordCount \ + * local[2] localhost 9999 ~/checkpoint/ ~/out` + * + * If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create + * a new StreamingContext (will print "Creating new context" to the console). Otherwise, if + * checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from + * the checkpoint data. + * + * To run this example in a local standalone cluster with automatic driver recovery, + * + * `$ ./spark-class org.apache.spark.deploy.Client -s launch \ + * org.apache.spark.streaming.examples.RecoverableNetworkWordCount \ + * localhost 9999 ~/checkpoint ~/out` + * + * would typically be /examples/target/scala-XX/spark-examples....jar + * + * Refer to the online documentation for more details. */ object RecoverableNetworkWordCount { @@ -52,7 +75,7 @@ object RecoverableNetworkWordCount { // Create the context with a 1 second batch size val ssc = new StreamingContext(master, "RecoverableNetworkWordCount", Seconds(1), - System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') @@ -74,9 +97,13 @@ object RecoverableNetworkWordCount { System.err.println( """ |Usage: RecoverableNetworkWordCount + | is the Spark master URL. In local mode, should be 'local[n]' with n > 1. + | and describe the TCP server that Spark Streaming would connect to receive data. + | directory to HDFS-compatible file system which checkpoint data + | file to which the word counts will be appended | |In local mode, should be 'local[n]' with n > 1 - |Both and should be full paths + |Both and must be absolute paths """.stripMargin ) System.exit(1) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 62b225382e..1249ef4c3d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -43,8 +43,9 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val pendingTimes = ssc.scheduler.getPendingTimes() val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) val sparkConf = ssc.conf - - // do not save these configurations + + // These should be unset when a checkpoint is deserialized, + // otherwise the SparkContext won't initialize correctly. sparkConf.remove("spark.hostPort").remove("spark.driver.host").remove("spark.driver.port") def validate() { @@ -102,8 +103,12 @@ object Checkpoint extends Logging { * Convenience class to handle the writing of graph checkpoint to file */ private[streaming] -class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDir: String, hadoopConf: Configuration) - extends Logging { +class CheckpointWriter( + jobGenerator: JobGenerator, + conf: SparkConf, + checkpointDir: String, + hadoopConf: Configuration + ) extends Logging { val MAX_ATTEMPTS = 3 val executor = Executors.newFixedThreadPool(1) val compressionCodec = CompressionCodec.createCodec(conf) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index d96e9ac7b7..523173d45a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -489,15 +489,15 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * JavaStreamingContext object contains a number of static utility functions. + * JavaStreamingContext object contains a number of utility functions. */ object JavaStreamingContext { /** * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be - * recreated from the checkpoint data. If the data does not exist, then the StreamingContext - * will be created by called the provided `creatingFunc`. + * recreated from the checkpoint data. If the data does not exist, then the provided factory + * will be used to create a JavaStreamingContext. * * @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext @@ -515,8 +515,8 @@ object JavaStreamingContext { /** * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be - * recreated from the checkpoint data. If the data does not exist, then the StreamingContext - * will be created by called the provided `creatingFunc`. + * recreated from the checkpoint data. If the data does not exist, then the provided factory + * will be used to create a JavaStreamingContext. * * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext @@ -537,8 +537,8 @@ object JavaStreamingContext { /** * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be - * recreated from the checkpoint data. If the data does not exist, then the StreamingContext - * will be created by called the provided `creatingFunc`. + * recreated from the checkpoint data. If the data does not exist, then the provided factory + * will be used to create a JavaStreamingContext. * * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext -- cgit v1.2.3