diff options
author | Josh Rosen <joshrosen@databricks.com> | 2014-10-24 15:06:15 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2014-10-24 15:06:15 -0700 |
commit | 6c98c29ae0033556fd4424f41d1de005c509e511 (patch) | |
tree | eeaae15f90955ef04a1794f7c966960bcdbbf3fd /streaming | |
parent | 3a906c6631a914da8ede3111c63f89a0dac3f369 (diff) | |
download | spark-6c98c29ae0033556fd4424f41d1de005c509e511.tar.gz spark-6c98c29ae0033556fd4424f41d1de005c509e511.tar.bz2 spark-6c98c29ae0033556fd4424f41d1de005c509e511.zip |
[SPARK-4080] Only throw IOException from [write|read][Object|External]
If classes implementing Serializable or Externalizable interfaces throw
exceptions other than IOException or ClassNotFoundException from their
(de)serialization methods, then this results in an unhelpful
"IOException: unexpected exception type" rather than the actual exception that
produced the (de)serialization error.
This patch fixes this by adding a utility method that re-wraps any uncaught
exceptions in IOException (unless they are already instances of IOException).
Author: Josh Rosen <joshrosen@databricks.com>
Closes #2932 from JoshRosen/SPARK-4080 and squashes the following commits:
cd3a9be [Josh Rosen] [SPARK-4080] Only throw IOException from [write|read][Object|External].
Diffstat (limited to 'streaming')
6 files changed, 16 insertions, 13 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index b4adf0e965..e59c24adb8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -22,6 +22,7 @@ import java.io.{ObjectInputStream, IOException, ObjectOutputStream} import org.apache.spark.Logging import org.apache.spark.streaming.scheduler.Job import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream, InputDStream} +import org.apache.spark.util.Utils final private[streaming] class DStreamGraph extends Serializable with Logging { @@ -160,7 +161,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { } @throws(classOf[IOException]) - private def writeObject(oos: ObjectOutputStream) { + private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { logDebug("DStreamGraph.writeObject used") this.synchronized { checkpointInProgress = true @@ -172,7 +173,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { } @throws(classOf[IOException]) - private def readObject(ois: ObjectInputStream) { + private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException { logDebug("DStreamGraph.readObject used") this.synchronized { checkpointInProgress = true diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 213dff6a76..7053f47ec6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -33,6 +33,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Interval, Duration, Time} import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.api.java._ +import org.apache.spark.util.Utils /** @@ -73,13 +74,13 @@ private[python] class TransformFunction(@transient var pfunc: PythonTransformFun pfunc.call(time.milliseconds, rdds) } - private def writeObject(out: ObjectOutputStream): Unit = { + private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException { val bytes = PythonTransformFunctionSerializer.serialize(pfunc) out.writeInt(bytes.length) out.write(bytes) } - private def readObject(in: ObjectInputStream): Unit = { + private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { val length = in.readInt() val bytes = new Array[Byte](length) in.readFully(bytes) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 65f7ccd318..eabd61d713 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -31,7 +31,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.scheduler.Job -import org.apache.spark.util.{CallSite, MetadataCleaner} +import org.apache.spark.util.{CallSite, MetadataCleaner, Utils} /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous @@ -400,7 +400,7 @@ abstract class DStream[T: ClassTag] ( } @throws(classOf[IOException]) - private def writeObject(oos: ObjectOutputStream) { + private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { logDebug(this.getClass().getSimpleName + ".writeObject used") if (graph != null) { graph.synchronized { @@ -423,7 +423,7 @@ abstract class DStream[T: ClassTag] ( } @throws(classOf[IOException]) - private def readObject(ois: ObjectInputStream) { + private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException { logDebug(this.getClass().getSimpleName + ".readObject used") ois.defaultReadObject() generatedRDDs = new HashMap[Time, RDD[T]] () diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala index f33c0ceafd..0dc72790fb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.FileSystem import org.apache.spark.Logging import org.apache.spark.streaming.Time +import org.apache.spark.util.Utils private[streaming] class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) @@ -119,7 +120,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) } @throws(classOf[IOException]) - private def writeObject(oos: ObjectOutputStream) { + private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { logDebug(this.getClass().getSimpleName + ".writeObject used") if (dstream.context.graph != null) { dstream.context.graph.synchronized { @@ -142,7 +143,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) } @throws(classOf[IOException]) - private def readObject(ois: ObjectInputStream) { + private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException { logDebug(this.getClass().getSimpleName + ".readObject used") ois.defaultReadObject() timeToOldestCheckpointFileTime = new HashMap[Time, Time] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 9eecbfaef3..8152b7542a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark.rdd.RDD import org.apache.spark.rdd.UnionRDD import org.apache.spark.streaming.{StreamingContext, Time} -import org.apache.spark.util.TimeStampedHashMap +import org.apache.spark.util.{TimeStampedHashMap, Utils} private[streaming] @@ -151,7 +151,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas } @throws(classOf[IOException]) - private def readObject(ois: ObjectInputStream) { + private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException { logDebug(this.getClass().getSimpleName + ".readObject used") ois.defaultReadObject() generatedRDDs = new HashMap[Time, RDD[(K,V)]] () diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 9327ff4822..2154c24abd 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -73,7 +73,7 @@ class TestOutputStream[T: ClassTag](parent: DStream[T], // This is to clear the output buffer every it is read from a checkpoint @throws(classOf[IOException]) - private def readObject(ois: ObjectInputStream) { + private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException { ois.defaultReadObject() output.clear() } @@ -95,7 +95,7 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T], // This is to clear the output buffer every it is read from a checkpoint @throws(classOf[IOException]) - private def readObject(ois: ObjectInputStream) { + private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException { ois.defaultReadObject() output.clear() } |