From 4f609f79015732a91a83c5625d357c4edfc7c962 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 10 Jan 2014 12:58:07 +0000 Subject: Removed spark.hostPort and other setting from SparkConf before saving to checkpoint. --- .../org/apache/spark/streaming/Checkpoint.scala | 24 ++++++---------------- 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 7366d8a7a4..62b225382e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -43,6 +43,9 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val pendingTimes = ssc.scheduler.getPendingTimes() val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) val sparkConf = ssc.conf + + // do not save these configurations + sparkConf.remove("spark.hostPort").remove("spark.driver.host").remove("spark.driver.port") def validate() { assert(master != null, "Checkpoint.master is null") @@ -73,11 +76,7 @@ object Checkpoint extends Logging { def sortFunc(path1: Path, path2: Path): Boolean = { val (time1, bk1) = path1.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) } val (time2, bk2) = path2.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) } - logInfo("Path 1: " + path1 + " -> " + time1 + ", " + bk1) - logInfo("Path 2: " + path2 + " -> " + time2 + ", " + bk2) - val precede = (time1 < time2) || (time1 == time2 && bk1) - logInfo(precede.toString) - precede + (time1 < time2) || (time1 == time2 && bk1) } val path = new Path(checkpointDir) @@ -85,12 +84,8 @@ object Checkpoint extends Logging { val statuses = fs.listStatus(path) if (statuses != null) { val paths = statuses.map(_.getPath) - logInfo("Paths = " + paths.map(_.getName).mkString(", ")) val filtered = paths.filter(p => REGEX.findFirstIn(p.toString).nonEmpty) - logInfo("Filtered paths = " + filtered.map(_.getName).mkString(", ")) - val sorted = filtered.sortWith(sortFunc) - logInfo("Sorted paths = " + sorted.map(_.getName).mkString(", ")) - sorted + filtered.sortWith(sortFunc) } else { logWarning("Listing " + path + " returned null") Seq.empty @@ -112,16 +107,9 @@ class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDi val MAX_ATTEMPTS = 3 val executor = Executors.newFixedThreadPool(1) val compressionCodec = CompressionCodec.createCodec(conf) - // The file to which we actually write - and then "move" to file - // val writeFile = new Path(file.getParent, file.getName + ".next") - // The file to which existing checkpoint is backed up (i.e. "moved") - // val bakFile = new Path(file.getParent, file.getName + ".bk") - private var stopped = false private var fs_ : FileSystem = _ - // Removed code which validates whether there is only one CheckpointWriter per path 'file' since - // I did not notice any errors - reintroduce it ? class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable { def run() { var attempts = 0 @@ -189,7 +177,7 @@ class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDi bos.close() try { executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray)) - logInfo("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue") + logDebug("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue") } catch { case rej: RejectedExecutionException => logError("Could not submit checkpoint task to the thread pool executor", rej) -- cgit v1.2.3