diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-14 22:20:14 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-14 22:20:14 -0800 |
commit | 1f4718c4805082cb6d6fa5af7c3529c6a79ae4e0 (patch) | |
tree | b5ece0cae499b1041d179ed34b038004fbd21e77 /streaming | |
parent | f8bd828c7ccf1ff69bc35bf95d07183cb35a7c72 (diff) | |
download | spark-1f4718c4805082cb6d6fa5af7c3529c6a79ae4e0.tar.gz spark-1f4718c4805082cb6d6fa5af7c3529c6a79ae4e0.tar.bz2 spark-1f4718c4805082cb6d6fa5af7c3529c6a79ae4e0.zip |
Changed SparkConf to not be serializable. And also fixed unit-test log paths in log4j.properties of external modules.
Diffstat (limited to 'streaming')
4 files changed, 53 insertions, 13 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 5046a1d53f..4d778dc4d4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -42,11 +42,13 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) val checkpointDuration = ssc.checkpointDuration val pendingTimes = ssc.scheduler.getPendingTimes().toArray val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf) - val sparkConf = ssc.conf + val sparkConfPairs = ssc.conf.getAll - // These should be unset when a checkpoint is deserialized, - // otherwise the SparkContext won't initialize correctly. - sparkConf.remove("spark.driver.host").remove("spark.driver.port") + def sparkConf = { + new SparkConf(false).setAll(sparkConfPairs) + .remove("spark.driver.host") + .remove("spark.driver.port") + } def validate() { assert(master != null, "Checkpoint.master is null") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 8faa79f8c7..0683113bd0 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -163,8 +163,10 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { logDebug("DStreamGraph.writeObject used") this.synchronized { checkpointInProgress = true + logDebug("Enabled checkpoint mode") oos.defaultWriteObject() checkpointInProgress = false + logDebug("Disabled checkpoint mode") } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala index 38bad5ac80..906a16e508 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming.dstream import scala.collection.mutable.HashMap import scala.reflect.ClassTag -import java.io.{ObjectInputStream, IOException} +import java.io.{ObjectOutputStream, ObjectInputStream, IOException} import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.FileSystem import org.apache.spark.Logging @@ -118,7 +118,31 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) } @throws(classOf[IOException]) + private def writeObject(oos: ObjectOutputStream) { + logDebug(this.getClass().getSimpleName + ".writeObject used") + if (dstream.context.graph != null) { + dstream.context.graph.synchronized { + if (dstream.context.graph.checkpointInProgress) { + oos.defaultWriteObject() + } else { + val msg = "Object of " + this.getClass.getName + " is being serialized " + + " possibly as a part of closure of an RDD operation. This is because " + + " the DStream object is being referred to from within the closure. " + + " Please rewrite the RDD operation inside this DStream to avoid this. " + + " This has been enforced to avoid bloating of Spark tasks " + + " with unnecessary objects." + throw new java.io.NotSerializableException(msg) + } + } + } else { + throw new java.io.NotSerializableException( + "Graph is unexpectedly null when DStream is being serialized.") + } + } + + @throws(classOf[IOException]) private def readObject(ois: ObjectInputStream) { + logDebug(this.getClass().getSimpleName + ".readObject used") ois.defaultReadObject() timeToOldestCheckpointFileTime = new HashMap[Time, Time] timeToCheckpointFile = new HashMap[Time, String] diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 89daf47586..831e7c1471 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -151,17 +151,29 @@ class CheckpointSuite extends TestSuiteBase { val value = "myvalue" System.setProperty(key, value) ssc = new StreamingContext(master, framework, batchDuration) + val originalConf = ssc.conf + val cp = new Checkpoint(ssc, Time(1000)) - assert(!cp.sparkConf.contains("spark.driver.host")) - assert(!cp.sparkConf.contains("spark.driver.port")) - assert(!cp.sparkConf.contains("spark.hostPort")) - assert(cp.sparkConf.get(key) === value) + val cpConf = cp.sparkConf + assert(cpConf.get("spark.master") === originalConf.get("spark.master")) + assert(cpConf.get("spark.app.name") === originalConf.get("spark.app.name")) + assert(cpConf.get(key) === value) ssc.stop() + + // Serialize/deserialize to simulate write to storage and reading it back val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp)) - assert(!newCp.sparkConf.contains("spark.driver.host")) - assert(!newCp.sparkConf.contains("spark.driver.port")) - assert(!newCp.sparkConf.contains("spark.hostPort")) - assert(newCp.sparkConf.get(key) === value) + + val newCpConf = newCp.sparkConf + assert(newCpConf.get("spark.master") === originalConf.get("spark.master")) + assert(newCpConf.get("spark.app.name") === originalConf.get("spark.app.name")) + assert(newCpConf.get(key) === value) + assert(!newCpConf.contains("spark.driver.host")) + assert(!newCpConf.contains("spark.driver.port")) + + // Check if all the parameters have been restored + ssc = new StreamingContext(null, newCp, null) + val restoredConf = ssc.conf + assert(restoredConf.get(key) === value) } |