aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala9
2 files changed, 11 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 3985e1a3d9..27024ecfd9 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -321,7 +321,7 @@ object CheckpointReader extends Logging {
// Try to read the checkpoint files in the order
logInfo("Checkpoint files found: " + checkpointFiles.mkString(","))
- val compressionCodec = CompressionCodec.createCodec(conf)
+ var readError: Exception = null
checkpointFiles.foreach(file => {
logInfo("Attempting to load checkpoint from file " + file)
try {
@@ -332,13 +332,15 @@ object CheckpointReader extends Logging {
return Some(cp)
} catch {
case e: Exception =>
+ readError = e
logWarning("Error reading checkpoint from file " + file, e)
}
})
// If none of checkpoint files could be read, then throw exception
if (!ignoreReadError) {
- throw new SparkException(s"Failed to read checkpoint from directory $checkpointPath")
+ throw new SparkException(
+ s"Failed to read checkpoint from directory $checkpointPath", readError)
}
None
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
index a2f5d82a79..bab78a3536 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.dstream
-import java.io.{NotSerializableException, ObjectOutputStream}
+import java.io.{NotSerializableException, ObjectInputStream, ObjectOutputStream}
import scala.collection.mutable.{ArrayBuffer, Queue}
import scala.reflect.ClassTag
@@ -37,8 +37,13 @@ class QueueInputDStream[T: ClassTag](
override def stop() { }
+ private def readObject(in: ObjectInputStream): Unit = {
+ throw new NotSerializableException("queueStream doesn't support checkpointing. " +
+ "Please don't use queueStream when checkpointing is enabled.")
+ }
+
private def writeObject(oos: ObjectOutputStream): Unit = {
- throw new NotSerializableException("queueStream doesn't support checkpointing")
+ logWarning("queueStream doesn't support checkpointing")
}
override def compute(validTime: Time): Option[RDD[T]] = {