aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-10 12:58:07 +0000
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-10 12:58:07 +0000
commit4f609f79015732a91a83c5625d357c4edfc7c962 (patch)
tree366ab7b26e58f3219acbc84d4d5bc18eddd6b782
parentd7ec73ac761cfd7823bc34fac7e56c8392bdf8dc (diff)
downloadspark-4f609f79015732a91a83c5625d357c4edfc7c962.tar.gz
spark-4f609f79015732a91a83c5625d357c4edfc7c962.tar.bz2
spark-4f609f79015732a91a83c5625d357c4edfc7c962.zip
Removed spark.hostPort and other setting from SparkConf before saving to checkpoint.
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala24
1 files changed, 6 insertions, 18 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 7366d8a7a4..62b225382e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -43,6 +43,9 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val pendingTimes = ssc.scheduler.getPendingTimes()
val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
val sparkConf = ssc.conf
+
+ // do not save these configurations
+ sparkConf.remove("spark.hostPort").remove("spark.driver.host").remove("spark.driver.port")
def validate() {
assert(master != null, "Checkpoint.master is null")
@@ -73,11 +76,7 @@ object Checkpoint extends Logging {
def sortFunc(path1: Path, path2: Path): Boolean = {
val (time1, bk1) = path1.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) }
val (time2, bk2) = path2.getName match { case REGEX(x, y) => (x.toLong, !y.isEmpty) }
- logInfo("Path 1: " + path1 + " -> " + time1 + ", " + bk1)
- logInfo("Path 2: " + path2 + " -> " + time2 + ", " + bk2)
- val precede = (time1 < time2) || (time1 == time2 && bk1)
- logInfo(precede.toString)
- precede
+ (time1 < time2) || (time1 == time2 && bk1)
}
val path = new Path(checkpointDir)
@@ -85,12 +84,8 @@ object Checkpoint extends Logging {
val statuses = fs.listStatus(path)
if (statuses != null) {
val paths = statuses.map(_.getPath)
- logInfo("Paths = " + paths.map(_.getName).mkString(", "))
val filtered = paths.filter(p => REGEX.findFirstIn(p.toString).nonEmpty)
- logInfo("Filtered paths = " + filtered.map(_.getName).mkString(", "))
- val sorted = filtered.sortWith(sortFunc)
- logInfo("Sorted paths = " + sorted.map(_.getName).mkString(", "))
- sorted
+ filtered.sortWith(sortFunc)
} else {
logWarning("Listing " + path + " returned null")
Seq.empty
@@ -112,16 +107,9 @@ class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDi
val MAX_ATTEMPTS = 3
val executor = Executors.newFixedThreadPool(1)
val compressionCodec = CompressionCodec.createCodec(conf)
- // The file to which we actually write - and then "move" to file
- // val writeFile = new Path(file.getParent, file.getName + ".next")
- // The file to which existing checkpoint is backed up (i.e. "moved")
- // val bakFile = new Path(file.getParent, file.getName + ".bk")
-
private var stopped = false
private var fs_ : FileSystem = _
- // Removed code which validates whether there is only one CheckpointWriter per path 'file' since
- // I did not notice any errors - reintroduce it ?
class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable {
def run() {
var attempts = 0
@@ -189,7 +177,7 @@ class CheckpointWriter(jobGenerator: JobGenerator, conf: SparkConf, checkpointDi
bos.close()
try {
executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray))
- logInfo("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue")
+ logDebug("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue")
} catch {
case rej: RejectedExecutionException =>
logError("Could not submit checkpoint task to the thread pool executor", rej)