From e4bb845238d0df48f8258e925caf9af5a107af46 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 10 Jan 2014 12:17:09 -0800 Subject: Updated docs based on Patrick's comments in PR 383. --- .../main/scala/org/apache/spark/streaming/Checkpoint.scala | 13 +++++++++---- .../spark/streaming/api/java/JavaStreamingContext.scala | 14 +++++++------- 2 files changed, 16 insertions(+), 11 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 62b225382e..1249ef4c3d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -43,8 +43,9 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val pendingTimes = ssc.scheduler.getPendingTimes() val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) val sparkConf = ssc.conf - - // do not save these configurations + + // These should be unset when a checkpoint is deserialized, + // otherwise the SparkContext won't initialize correctly. sparkConf.remove("spark.hostPort").remove("spark.driver.host").remove("spark.driver.port") def validate() { @@ -102,8 +103,12 @@ object Checkpoint extends Logging { * Convenience class to handle the writing of graph checkpoint to file */ private[streaming] -class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDir: String, hadoopConf: Configuration) - extends Logging { +class CheckpointWriter( + jobGenerator: JobGenerator, + conf: SparkConf, + checkpointDir: String, + hadoopConf: Configuration + ) extends Logging { val MAX_ATTEMPTS = 3 val executor = Executors.newFixedThreadPool(1) val compressionCodec = CompressionCodec.createCodec(conf) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index d96e9ac7b7..523173d45a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -489,15 +489,15 @@ class JavaStreamingContext(val ssc: StreamingContext) { } /** - * JavaStreamingContext object contains a number of static utility functions. + * JavaStreamingContext object contains a number of utility functions. */ object JavaStreamingContext { /** * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be - * recreated from the checkpoint data. If the data does not exist, then the StreamingContext - * will be created by called the provided `creatingFunc`. + * recreated from the checkpoint data. If the data does not exist, then the provided factory + * will be used to create a JavaStreamingContext. * * @param checkpointPath Checkpoint directory used in an earlier JavaStreamingContext program * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext @@ -515,8 +515,8 @@ object JavaStreamingContext { /** * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be - * recreated from the checkpoint data. If the data does not exist, then the StreamingContext - * will be created by called the provided `creatingFunc`. + * recreated from the checkpoint data. If the data does not exist, then the provided factory + * will be used to create a JavaStreamingContext. * * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext @@ -537,8 +537,8 @@ object JavaStreamingContext { /** * Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. * If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be - * recreated from the checkpoint data. If the data does not exist, then the StreamingContext - * will be created by called the provided `creatingFunc`. + * recreated from the checkpoint data. If the data does not exist, then the provided factory + * will be used to create a JavaStreamingContext. * * @param checkpointPath Checkpoint directory used in an earlier StreamingContext program * @param factory JavaStreamingContextFactory object to create a new JavaStreamingContext -- cgit v1.2.3