aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2014-10-24 15:06:15 -0700
committerJosh Rosen <joshrosen@databricks.com>2014-10-24 15:06:15 -0700
commit6c98c29ae0033556fd4424f41d1de005c509e511 (patch)
treeeeaae15f90955ef04a1794f7c966960bcdbbf3fd /streaming
parent3a906c6631a914da8ede3111c63f89a0dac3f369 (diff)
downloadspark-6c98c29ae0033556fd4424f41d1de005c509e511.tar.gz
spark-6c98c29ae0033556fd4424f41d1de005c509e511.tar.bz2
spark-6c98c29ae0033556fd4424f41d1de005c509e511.zip
[SPARK-4080] Only throw IOException from [write|read][Object|External]
If classes implementing Serializable or Externalizable interfaces throw exceptions other than IOException or ClassNotFoundException from their (de)serialization methods, then this results in an unhelpful "IOException: unexpected exception type" rather than the actual exception that produced the (de)serialization error. This patch fixes this by adding a utility method that re-wraps any uncaught exceptions in IOException (unless they are already instances of IOException). Author: Josh Rosen <joshrosen@databricks.com> Closes #2932 from JoshRosen/SPARK-4080 and squashes the following commits: cd3a9be [Josh Rosen] [SPARK-4080] Only throw IOException from [write|read][Object|External].
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala4
6 files changed, 16 insertions, 13 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index b4adf0e965..e59c24adb8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -22,6 +22,7 @@ import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
import org.apache.spark.Logging
import org.apache.spark.streaming.scheduler.Job
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream, InputDStream}
+import org.apache.spark.util.Utils
final private[streaming] class DStreamGraph extends Serializable with Logging {
@@ -160,7 +161,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
}
@throws(classOf[IOException])
- private def writeObject(oos: ObjectOutputStream) {
+ private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
logDebug("DStreamGraph.writeObject used")
this.synchronized {
checkpointInProgress = true
@@ -172,7 +173,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
}
@throws(classOf[IOException])
- private def readObject(ois: ObjectInputStream) {
+ private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
logDebug("DStreamGraph.readObject used")
this.synchronized {
checkpointInProgress = true
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
index 213dff6a76..7053f47ec6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
@@ -33,6 +33,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Interval, Duration, Time}
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.api.java._
+import org.apache.spark.util.Utils
/**
@@ -73,13 +74,13 @@ private[python] class TransformFunction(@transient var pfunc: PythonTransformFun
pfunc.call(time.milliseconds, rdds)
}
- private def writeObject(out: ObjectOutputStream): Unit = {
+ private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
val bytes = PythonTransformFunctionSerializer.serialize(pfunc)
out.writeInt(bytes.length)
out.write(bytes)
}
- private def readObject(in: ObjectInputStream): Unit = {
+ private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
val length = in.readInt()
val bytes = new Array[Byte](length)
in.readFully(bytes)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 65f7ccd318..eabd61d713 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -31,7 +31,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.scheduler.Job
-import org.apache.spark.util.{CallSite, MetadataCleaner}
+import org.apache.spark.util.{CallSite, MetadataCleaner, Utils}
/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
@@ -400,7 +400,7 @@ abstract class DStream[T: ClassTag] (
}
@throws(classOf[IOException])
- private def writeObject(oos: ObjectOutputStream) {
+ private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
logDebug(this.getClass().getSimpleName + ".writeObject used")
if (graph != null) {
graph.synchronized {
@@ -423,7 +423,7 @@ abstract class DStream[T: ClassTag] (
}
@throws(classOf[IOException])
- private def readObject(ois: ObjectInputStream) {
+ private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
logDebug(this.getClass().getSimpleName + ".readObject used")
ois.defaultReadObject()
generatedRDDs = new HashMap[Time, RDD[T]] ()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index f33c0ceafd..0dc72790fb 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.Logging
import org.apache.spark.streaming.Time
+import org.apache.spark.util.Utils
private[streaming]
class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
@@ -119,7 +120,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
}
@throws(classOf[IOException])
- private def writeObject(oos: ObjectOutputStream) {
+ private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
logDebug(this.getClass().getSimpleName + ".writeObject used")
if (dstream.context.graph != null) {
dstream.context.graph.synchronized {
@@ -142,7 +143,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
}
@throws(classOf[IOException])
- private def readObject(ois: ObjectInputStream) {
+ private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
logDebug(this.getClass().getSimpleName + ".readObject used")
ois.defaultReadObject()
timeToOldestCheckpointFileTime = new HashMap[Time, Time]
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 9eecbfaef3..8152b7542a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.UnionRDD
import org.apache.spark.streaming.{StreamingContext, Time}
-import org.apache.spark.util.TimeStampedHashMap
+import org.apache.spark.util.{TimeStampedHashMap, Utils}
private[streaming]
@@ -151,7 +151,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
}
@throws(classOf[IOException])
- private def readObject(ois: ObjectInputStream) {
+ private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
logDebug(this.getClass().getSimpleName + ".readObject used")
ois.defaultReadObject()
generatedRDDs = new HashMap[Time, RDD[(K,V)]] ()
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 9327ff4822..2154c24abd 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -73,7 +73,7 @@ class TestOutputStream[T: ClassTag](parent: DStream[T],
// This is to clear the output buffer every it is read from a checkpoint
@throws(classOf[IOException])
- private def readObject(ois: ObjectInputStream) {
+ private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
ois.defaultReadObject()
output.clear()
}
@@ -95,7 +95,7 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
// This is to clear the output buffer every it is read from a checkpoint
@throws(classOf[IOException])
- private def readObject(ois: ObjectInputStream) {
+ private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
ois.defaultReadObject()
output.clear()
}