aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-10 12:17:09 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-10 12:17:09 -0800
commite4bb845238d0df48f8258e925caf9af5a107af46 (patch)
tree3a11ad9abe691584cc64be3ca31403305f831686 /streaming/src
parent2213a5a47fb2d73e3fdebec24356c69ea2968b81 (diff)
downloadspark-e4bb845238d0df48f8258e925caf9af5a107af46.tar.gz
spark-e4bb845238d0df48f8258e925caf9af5a107af46.tar.bz2
spark-e4bb845238d0df48f8258e925caf9af5a107af46.zip
Updated docs based on Patrick's comments in PR 383.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala13
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala14
2 files changed, 16 insertions, 11 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 62b225382e..1249ef4c3d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -43,8 +43,9 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val pendingTimes = ssc.scheduler.getPendingTimes()
val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
val sparkConf = ssc.conf
-
- // do not save these configurations
+
+ // These should be unset when a checkpoint is deserialized,
+ // otherwise the SparkContext won't initialize correctly.
sparkConf.remove("spark.hostPort").remove("spark.driver.host").remove("spark.driver.port")
def validate() {
@@ -102,8 +103,12 @@ object Checkpoint extends Logging {
* Convenience class to handle the writing of graph checkpoint to file
*/
private[streaming]
-class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDir: String, hadoopConf: Configuration)
- extends Logging {
+class CheckpointWriter(
+ jobGenerator: JobGenerator,
+ conf: SparkConf,
+ checkpointDir: String,
+ hadoopConf: Configuration
+ ) extends Logging {
val MAX_ATTEMPTS = 3
val executor = Executors.newFixedThreadPool(1)
val compressionCodec = CompressionCodec.createCodec(conf)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index d96e9ac7b7..523173d45a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -489,15 +489,15 @@ class JavaStreamingContext(val ssc: StreamingContext) {
}
/**
- * JavaStreamingContext object contains a number of static utility functions.
+ * JavaStreamingContext object contains a number of utility functions.
*/
object JavaStreamingContext {
/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
- * recreated from the checkpoint data. If the data does not exist, then the StreamingContext
- * will be created by called the provided `creatingFunc`.
+ * recreated from the checkpoint data. If the data does not exist, then the provided factory
+ * will be used to create a JavaStreamingContext.
*
* @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program
* @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext
@@ -515,8 +515,8 @@ object JavaStreamingContext {
/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
- * recreated from the checkpoint data. If the data does not exist, then the StreamingContext
- * will be created by called the provided `creatingFunc`.
+ * recreated from the checkpoint data. If the data does not exist, then the provided factory
+ * will be used to create a JavaStreamingContext.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext
@@ -537,8 +537,8 @@ object JavaStreamingContext {
/**
* Either recreate a StreamingContext from checkpoint data or create a new StreamingContext.
* If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be
- * recreated from the checkpoint data. If the data does not exist, then the StreamingContext
- * will be created by called the provided `creatingFunc`.
+ * recreated from the checkpoint data. If the data does not exist, then the provided factory
+ * will be used to create a JavaStreamingContext.
*
* @param checkpointPath Checkpoint directory used in an earlier StreamingContext program
* @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext