aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-14 22:20:14 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-14 22:20:14 -0800
commit1f4718c4805082cb6d6fa5af7c3529c6a79ae4e0 (patch)
treeb5ece0cae499b1041d179ed34b038004fbd21e77 /streaming/src
parentf8bd828c7ccf1ff69bc35bf95d07183cb35a7c72 (diff)
downloadspark-1f4718c4805082cb6d6fa5af7c3529c6a79ae4e0.tar.gz
spark-1f4718c4805082cb6d6fa5af7c3529c6a79ae4e0.tar.bz2
spark-1f4718c4805082cb6d6fa5af7c3529c6a79ae4e0.zip
Changed SparkConf to not be serializable. And also fixed unit-test log paths in log4j.properties of external modules.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala10
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala26
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala28
4 files changed, 53 insertions, 13 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 5046a1d53f..4d778dc4d4 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -42,11 +42,13 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
val checkpointDuration = ssc.checkpointDuration
val pendingTimes = ssc.scheduler.getPendingTimes().toArray
val delaySeconds = MetadataCleaner.getDelaySeconds(ssc.conf)
- val sparkConf = ssc.conf
+ val sparkConfPairs = ssc.conf.getAll
- // These should be unset when a checkpoint is deserialized,
- // otherwise the SparkContext won't initialize correctly.
- sparkConf.remove("spark.driver.host").remove("spark.driver.port")
+ def sparkConf = {
+ new SparkConf(false).setAll(sparkConfPairs)
+ .remove("spark.driver.host")
+ .remove("spark.driver.port")
+ }
def validate() {
assert(master != null, "Checkpoint.master is null")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 8faa79f8c7..0683113bd0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -163,8 +163,10 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
logDebug("DStreamGraph.writeObject used")
this.synchronized {
checkpointInProgress = true
+ logDebug("Enabled checkpoint mode")
oos.defaultWriteObject()
checkpointInProgress = false
+ logDebug("Disabled checkpoint mode")
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index 38bad5ac80..906a16e508 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.dstream
import scala.collection.mutable.HashMap
import scala.reflect.ClassTag
-import java.io.{ObjectInputStream, IOException}
+import java.io.{ObjectOutputStream, ObjectInputStream, IOException}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.Logging
@@ -118,7 +118,31 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
}
@throws(classOf[IOException])
+ private def writeObject(oos: ObjectOutputStream) {
+ logDebug(this.getClass().getSimpleName + ".writeObject used")
+ if (dstream.context.graph != null) {
+ dstream.context.graph.synchronized {
+ if (dstream.context.graph.checkpointInProgress) {
+ oos.defaultWriteObject()
+ } else {
+ val msg = "Object of " + this.getClass.getName + " is being serialized " +
+ " possibly as a part of closure of an RDD operation. This is because " +
+ " the DStream object is being referred to from within the closure. " +
+ " Please rewrite the RDD operation inside this DStream to avoid this. " +
+ " This has been enforced to avoid bloating of Spark tasks " +
+ " with unnecessary objects."
+ throw new java.io.NotSerializableException(msg)
+ }
+ }
+ } else {
+ throw new java.io.NotSerializableException(
+ "Graph is unexpectedly null when DStream is being serialized.")
+ }
+ }
+
+ @throws(classOf[IOException])
private def readObject(ois: ObjectInputStream) {
+ logDebug(this.getClass().getSimpleName + ".readObject used")
ois.defaultReadObject()
timeToOldestCheckpointFileTime = new HashMap[Time, Time]
timeToCheckpointFile = new HashMap[Time, String]
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 89daf47586..831e7c1471 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -151,17 +151,29 @@ class CheckpointSuite extends TestSuiteBase {
val value = "myvalue"
System.setProperty(key, value)
ssc = new StreamingContext(master, framework, batchDuration)
+ val originalConf = ssc.conf
+
val cp = new Checkpoint(ssc, Time(1000))
- assert(!cp.sparkConf.contains("spark.driver.host"))
- assert(!cp.sparkConf.contains("spark.driver.port"))
- assert(!cp.sparkConf.contains("spark.hostPort"))
- assert(cp.sparkConf.get(key) === value)
+ val cpConf = cp.sparkConf
+ assert(cpConf.get("spark.master") === originalConf.get("spark.master"))
+ assert(cpConf.get("spark.app.name") === originalConf.get("spark.app.name"))
+ assert(cpConf.get(key) === value)
ssc.stop()
+
+ // Serialize/deserialize to simulate write to storage and reading it back
val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
- assert(!newCp.sparkConf.contains("spark.driver.host"))
- assert(!newCp.sparkConf.contains("spark.driver.port"))
- assert(!newCp.sparkConf.contains("spark.hostPort"))
- assert(newCp.sparkConf.get(key) === value)
+
+ val newCpConf = newCp.sparkConf
+ assert(newCpConf.get("spark.master") === originalConf.get("spark.master"))
+ assert(newCpConf.get("spark.app.name") === originalConf.get("spark.app.name"))
+ assert(newCpConf.get(key) === value)
+ assert(!newCpConf.contains("spark.driver.host"))
+ assert(!newCpConf.contains("spark.driver.port"))
+
+ // Check if all the parameters have been restored
+ ssc = new StreamingContext(null, newCp, null)
+ val restoredConf = ssc.conf
+ assert(restoredConf.get(key) === value)
}