From 6c98c29ae0033556fd4424f41d1de005c509e511 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 24 Oct 2014 15:06:15 -0700 Subject: [SPARK-4080] Only throw IOException from [write|read][Object|External] If classes implementing Serializable or Externalizable interfaces throw exceptions other than IOException or ClassNotFoundException from their (de)serialization methods, then this results in an unhelpful "IOException: unexpected exception type" rather than the actual exception that produced the (de)serialization error. This patch fixes this by adding a utility method that re-wraps any uncaught exceptions in IOException (unless they are already instances of IOException). Author: Josh Rosen Closes #2932 from JoshRosen/SPARK-4080 and squashes the following commits: cd3a9be [Josh Rosen] [SPARK-4080] Only throw IOException from [write|read][Object|External]. --- core/src/main/scala/org/apache/spark/Accumulators.scala | 3 ++- core/src/main/scala/org/apache/spark/Partitioner.scala | 4 ++-- .../scala/org/apache/spark/SerializableWritable.scala | 5 +++-- .../scala/org/apache/spark/broadcast/HttpBroadcast.scala | 4 ++-- .../org/apache/spark/broadcast/TorrentBroadcast.scala | 6 +++--- .../org/apache/spark/deploy/master/ApplicationInfo.scala | 3 ++- .../scala/org/apache/spark/deploy/master/DriverInfo.scala | 3 ++- .../scala/org/apache/spark/deploy/master/WorkerInfo.scala | 2 +- .../main/scala/org/apache/spark/rdd/CartesianRDD.scala | 3 ++- .../main/scala/org/apache/spark/rdd/CoGroupedRDD.scala | 3 ++- .../main/scala/org/apache/spark/rdd/CoalescedRDD.scala | 3 ++- .../org/apache/spark/rdd/ParallelCollectionRDD.scala | 4 ++-- .../org/apache/spark/rdd/PartitionerAwareUnionRDD.scala | 3 ++- core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala | 3 ++- .../scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala | 3 ++- .../main/scala/org/apache/spark/scheduler/MapStatus.scala | 9 +++++---- .../scala/org/apache/spark/scheduler/TaskResult.scala | 4 ++-- .../org/apache/spark/serializer/JavaSerializer.scala | 4 ++-- .../scala/org/apache/spark/storage/BlockManagerId.scala | 4 ++-- .../org/apache/spark/storage/BlockManagerMessages.scala | 6 ++++-- .../scala/org/apache/spark/storage/StorageLevel.scala | 5 +++-- .../scala/org/apache/spark/util/SerializableBuffer.scala | 4 ++-- core/src/main/scala/org/apache/spark/util/Utils.scala | 15 +++++++++++++++ .../apache/spark/streaming/flume/FlumeInputDStream.scala | 4 ++-- .../scala/org/apache/spark/streaming/DStreamGraph.scala | 5 +++-- .../apache/spark/streaming/api/python/PythonDStream.scala | 5 +++-- .../org/apache/spark/streaming/dstream/DStream.scala | 6 +++--- .../spark/streaming/dstream/DStreamCheckpointData.scala | 5 +++-- .../apache/spark/streaming/dstream/FileInputDStream.scala | 4 ++-- .../scala/org/apache/spark/streaming/TestSuiteBase.scala | 4 ++-- 30 files changed, 84 insertions(+), 52 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 12f2fe031c..2301caafb0 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -24,6 +24,7 @@ import scala.collection.mutable.Map import scala.reflect.ClassTag import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.util.Utils /** * A data type that can be accumulated, ie has an commutative and associative "add" operation, @@ -126,7 +127,7 @@ class Accumulable[R, T] ( } // Called by Java when deserializing an object - private def readObject(in: ObjectInputStream) { + private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { in.defaultReadObject() value_ = zero deserialized = true diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 37053bb6f3..e53a78ead2 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -204,7 +204,7 @@ class RangePartitioner[K : Ordering : ClassTag, V]( } @throws(classOf[IOException]) - private def writeObject(out: ObjectOutputStream) { + private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException { val sfactory = SparkEnv.get.serializer sfactory match { case js: JavaSerializer => out.defaultWriteObject() @@ -222,7 +222,7 @@ class RangePartitioner[K : Ordering : ClassTag, V]( } @throws(classOf[IOException]) - private def readObject(in: ObjectInputStream) { + private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { val sfactory = SparkEnv.get.serializer sfactory match { case js: JavaSerializer => in.defaultReadObject() diff --git a/core/src/main/scala/org/apache/spark/SerializableWritable.scala b/core/src/main/scala/org/apache/spark/SerializableWritable.scala index e50b9ac229..55cb25946c 100644 --- a/core/src/main/scala/org/apache/spark/SerializableWritable.scala +++ b/core/src/main/scala/org/apache/spark/SerializableWritable.scala @@ -24,18 +24,19 @@ import org.apache.hadoop.io.ObjectWritable import org.apache.hadoop.io.Writable import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.Utils @DeveloperApi class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable { def value = t override def toString = t.toString - private def writeObject(out: ObjectOutputStream) { + private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException { out.defaultWriteObject() new ObjectWritable(t).write(out) } - private def readObject(in: ObjectInputStream) { + private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { in.defaultReadObject() val ow = new ObjectWritable() ow.setConf(new Configuration()) diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 4cd4f4f96f..7dade04273 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -72,13 +72,13 @@ private[spark] class HttpBroadcast[T: ClassTag]( } /** Used by the JVM when serializing this object. */ - private def writeObject(out: ObjectOutputStream) { + private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException { assertValid() out.defaultWriteObject() } /** Used by the JVM when deserializing this object. */ - private def readObject(in: ObjectInputStream) { + private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { in.defaultReadObject() HttpBroadcast.synchronized { SparkEnv.get.blockManager.getSingle(blockId) match { diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 99af2e9608..75e64c1bf4 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -28,7 +28,7 @@ import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException} import org.apache.spark.io.CompressionCodec import org.apache.spark.serializer.Serializer import org.apache.spark.storage.{BroadcastBlockId, StorageLevel} -import org.apache.spark.util.ByteBufferInputStream +import org.apache.spark.util.{ByteBufferInputStream, Utils} import org.apache.spark.util.io.ByteArrayChunkOutputStream /** @@ -152,13 +152,13 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) } /** Used by the JVM when serializing this object. */ - private def writeObject(out: ObjectOutputStream) { + private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException { assertValid() out.defaultWriteObject() } /** Used by the JVM when deserializing this object. */ - private def readObject(in: ObjectInputStream) { + private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { in.defaultReadObject() TorrentBroadcast.synchronized { setConf(SparkEnv.get.conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index c3ca43f8d0..6ba395be1c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer import akka.actor.ActorRef import org.apache.spark.deploy.ApplicationDescription +import org.apache.spark.util.Utils private[spark] class ApplicationInfo( val startTime: Long, @@ -46,7 +47,7 @@ private[spark] class ApplicationInfo( init() - private def readObject(in: java.io.ObjectInputStream): Unit = { + private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException { in.defaultReadObject() init() } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala index 80b570a44a..2ac2118688 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala @@ -20,6 +20,7 @@ package org.apache.spark.deploy.master import java.util.Date import org.apache.spark.deploy.DriverDescription +import org.apache.spark.util.Utils private[spark] class DriverInfo( val startTime: Long, @@ -36,7 +37,7 @@ private[spark] class DriverInfo( init() - private def readObject(in: java.io.ObjectInputStream): Unit = { + private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException { in.defaultReadObject() init() } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index c5fa9cf7d7..d221b0f6cc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -50,7 +50,7 @@ private[spark] class WorkerInfo( def coresFree: Int = cores - coresUsed def memoryFree: Int = memory - memoryUsed - private def readObject(in: java.io.ObjectInputStream) : Unit = { + private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException { in.defaultReadObject() init() } diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index 4908711d17..1cbd684224 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -22,6 +22,7 @@ import java.io.{IOException, ObjectOutputStream} import scala.reflect.ClassTag import org.apache.spark._ +import org.apache.spark.util.Utils private[spark] class CartesianPartition( @@ -36,7 +37,7 @@ class CartesianPartition( override val index: Int = idx @throws(classOf[IOException]) - private def writeObject(oos: ObjectOutputStream) { + private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { // Update the reference to parent split at the time of task serialization s1 = rdd1.partitions(s1Index) s2 = rdd2.partitions(s2Index) diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index fabb882cdd..ffc0a8a6d6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -27,6 +27,7 @@ import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, CompactBuffer} +import org.apache.spark.util.Utils import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.ShuffleHandle @@ -39,7 +40,7 @@ private[spark] case class NarrowCoGroupSplitDep( ) extends CoGroupSplitDep { @throws(classOf[IOException]) - private def writeObject(oos: ObjectOutputStream) { + private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { // Update the reference to parent split at the time of task serialization split = rdd.partitions(splitIndex) oos.defaultWriteObject() diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 11ebafbf6d..9fab1d78ab 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -25,6 +25,7 @@ import scala.language.existentials import scala.reflect.ClassTag import org.apache.spark._ +import org.apache.spark.util.Utils /** * Class that captures a coalesced RDD by essentially keeping track of parent partitions @@ -42,7 +43,7 @@ private[spark] case class CoalescedRDDPartition( var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_)) @throws(classOf[IOException]) - private def writeObject(oos: ObjectOutputStream) { + private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { // Update the reference to parent partition at the time of task serialization parents = parentsIndices.map(rdd.partitions(_)) oos.defaultWriteObject() diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala index 66c71bf7e8..87b22de6ae 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala @@ -48,7 +48,7 @@ private[spark] class ParallelCollectionPartition[T: ClassTag]( override def index: Int = slice @throws(classOf[IOException]) - private def writeObject(out: ObjectOutputStream): Unit = { + private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException { val sfactory = SparkEnv.get.serializer @@ -67,7 +67,7 @@ private[spark] class ParallelCollectionPartition[T: ClassTag]( } @throws(classOf[IOException]) - private def readObject(in: ObjectInputStream): Unit = { + private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { val sfactory = SparkEnv.get.serializer sfactory match { diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala index 0c2cd7a247..92b0641d0f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala @@ -22,6 +22,7 @@ import java.io.{IOException, ObjectOutputStream} import scala.reflect.ClassTag import org.apache.spark.{OneToOneDependency, Partition, SparkContext, TaskContext} +import org.apache.spark.util.Utils /** * Class representing partitions of PartitionerAwareUnionRDD, which maintains the list of @@ -38,7 +39,7 @@ class PartitionerAwareUnionRDDPartition( override def hashCode(): Int = idx @throws(classOf[IOException]) - private def writeObject(oos: ObjectOutputStream) { + private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { // Update the reference to parent partition at the time of task serialization parents = rdds.map(_.partitions(index)).toArray oos.defaultWriteObject() diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala index 0c97eb0aaa..aece683ff3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala @@ -24,6 +24,7 @@ import scala.reflect.ClassTag import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, TaskContext} import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.Utils /** * Partition for UnionRDD. @@ -48,7 +49,7 @@ private[spark] class UnionPartition[T: ClassTag]( override val index: Int = idx @throws(classOf[IOException]) - private def writeObject(oos: ObjectOutputStream) { + private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { // Update the reference to parent split at the time of task serialization parentPartition = rdd.partitions(parentRddPartitionIndex) oos.defaultWriteObject() diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala index f3d30f6c9b..996f2cd3f3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala @@ -22,6 +22,7 @@ import java.io.{IOException, ObjectOutputStream} import scala.reflect.ClassTag import org.apache.spark.{OneToOneDependency, Partition, SparkContext, TaskContext} +import org.apache.spark.util.Utils private[spark] class ZippedPartitionsPartition( idx: Int, @@ -34,7 +35,7 @@ private[spark] class ZippedPartitionsPartition( def partitions = partitionValues @throws(classOf[IOException]) - private def writeObject(oos: ObjectOutputStream) { + private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { // Update the reference to parent split at the time of task serialization partitionValues = rdds.map(rdd => rdd.partitions(idx)) oos.defaultWriteObject() diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 2ab5d9637b..01d5943d77 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -22,6 +22,7 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput} import org.roaringbitmap.RoaringBitmap import org.apache.spark.storage.BlockManagerId +import org.apache.spark.util.Utils /** * Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the @@ -105,13 +106,13 @@ private[spark] class CompressedMapStatus( MapStatus.decompressSize(compressedSizes(reduceId)) } - override def writeExternal(out: ObjectOutput): Unit = { + override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { loc.writeExternal(out) out.writeInt(compressedSizes.length) out.write(compressedSizes) } - override def readExternal(in: ObjectInput): Unit = { + override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { loc = BlockManagerId(in) val len = in.readInt() compressedSizes = new Array[Byte](len) @@ -152,13 +153,13 @@ private[spark] class HighlyCompressedMapStatus private ( } } - override def writeExternal(out: ObjectOutput): Unit = { + override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { loc.writeExternal(out) emptyBlocks.writeExternal(out) out.writeLong(avgSize) } - override def readExternal(in: ObjectInput): Unit = { + override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { loc = BlockManagerId(in) emptyBlocks = new RoaringBitmap() emptyBlocks.readExternal(in) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala index d49d8fb887..11c19eeb6e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala @@ -42,7 +42,7 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long def this() = this(null.asInstanceOf[ByteBuffer], null, null) - override def writeExternal(out: ObjectOutput) { + override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { out.writeInt(valueBytes.remaining); Utils.writeByteBuffer(valueBytes, out) @@ -55,7 +55,7 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long out.writeObject(metrics) } - override def readExternal(in: ObjectInput) { + override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { val blen = in.readInt() val byteVal = new Array[Byte](blen) diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala index 554a33ce7f..662a7b9124 100644 --- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala @@ -117,11 +117,11 @@ class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable { new JavaSerializerInstance(counterReset, classLoader) } - override def writeExternal(out: ObjectOutput) { + override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { out.writeInt(counterReset) } - override def readExternal(in: ObjectInput) { + override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { counterReset = in.readInt() } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index 1422850943..259f423c73 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -61,13 +61,13 @@ class BlockManagerId private ( def isDriver: Boolean = (executorId == "") - override def writeExternal(out: ObjectOutput) { + override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { out.writeUTF(executorId_) out.writeUTF(host_) out.writeInt(port_) } - override def readExternal(in: ObjectInput) { + override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { executorId_ = in.readUTF() host_ = in.readUTF() port_ = in.readInt() diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 3db5dd9774..291ddfcc11 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -21,6 +21,8 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput} import akka.actor.ActorRef +import org.apache.spark.util.Utils + private[spark] object BlockManagerMessages { ////////////////////////////////////////////////////////////////////////////////// // Messages from the master to slaves. @@ -65,7 +67,7 @@ private[spark] object BlockManagerMessages { def this() = this(null, null, null, 0, 0, 0) // For deserialization only - override def writeExternal(out: ObjectOutput) { + override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { blockManagerId.writeExternal(out) out.writeUTF(blockId.name) storageLevel.writeExternal(out) @@ -74,7 +76,7 @@ private[spark] object BlockManagerMessages { out.writeLong(tachyonSize) } - override def readExternal(in: ObjectInput) { + override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { blockManagerId = BlockManagerId(in) blockId = BlockId(in.readUTF()) storageLevel = StorageLevel(in) diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 1e35abaab5..56edc4fe2e 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -20,6 +20,7 @@ package org.apache.spark.storage import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.Utils /** * :: DeveloperApi :: @@ -97,12 +98,12 @@ class StorageLevel private( ret } - override def writeExternal(out: ObjectOutput) { + override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { out.writeByte(toInt) out.writeByte(_replication) } - override def readExternal(in: ObjectInput) { + override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { val flags = in.readByte() _useDisk = (flags & 8) != 0 _useMemory = (flags & 4) != 0 diff --git a/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala b/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala index 2b452ad33b..770ff9d5ad 100644 --- a/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala @@ -29,7 +29,7 @@ private[spark] class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable { def value = buffer - private def readObject(in: ObjectInputStream) { + private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { val length = in.readInt() buffer = ByteBuffer.allocate(length) var amountRead = 0 @@ -44,7 +44,7 @@ class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable buffer.rewind() // Allow us to read it later } - private def writeObject(out: ObjectOutputStream) { + private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException { out.writeInt(buffer.limit()) if (Channels.newChannel(out).write(buffer) != buffer.limit()) { throw new IOException("Could not fully write buffer to output stream") diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 65bdbaae65..e1dc492387 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -969,6 +969,21 @@ private[spark] object Utils extends Logging { } } + /** + * Execute a block of code that evaluates to Unit, re-throwing any non-fatal uncaught + * exceptions as IOException. This is used when implementing Externalizable and Serializable's + * read and write methods, since Java's serializer will not report non-IOExceptions properly; + * see SPARK-4080 for more context. + */ + def tryOrIOException(block: => Unit) { + try { + block + } catch { + case e: IOException => throw e + case NonFatal(t) => throw new IOException(t) + } + } + /** Default filtering function for finding call sites using `getCallSite`. */ private def coreExclusionFunction(className: String): Boolean = { // A regular expression to match classes of the "core" Spark API that we want to skip when diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala index 4b2ea45fb8..2de2a7926b 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -66,7 +66,7 @@ class SparkFlumeEvent() extends Externalizable { var event : AvroFlumeEvent = new AvroFlumeEvent() /* De-serialize from bytes. */ - def readExternal(in: ObjectInput) { + def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { val bodyLength = in.readInt() val bodyBuff = new Array[Byte](bodyLength) in.readFully(bodyBuff) @@ -93,7 +93,7 @@ class SparkFlumeEvent() extends Externalizable { } /* Serialize to bytes. */ - def writeExternal(out: ObjectOutput) { + def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { val body = event.getBody.array() out.writeInt(body.length) out.write(body) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index b4adf0e965..e59c24adb8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -22,6 +22,7 @@ import java.io.{ObjectInputStream, IOException, ObjectOutputStream} import org.apache.spark.Logging import org.apache.spark.streaming.scheduler.Job import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream, InputDStream} +import org.apache.spark.util.Utils final private[streaming] class DStreamGraph extends Serializable with Logging { @@ -160,7 +161,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { } @throws(classOf[IOException]) - private def writeObject(oos: ObjectOutputStream) { + private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { logDebug("DStreamGraph.writeObject used") this.synchronized { checkpointInProgress = true @@ -172,7 +173,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { } @throws(classOf[IOException]) - private def readObject(ois: ObjectInputStream) { + private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException { logDebug("DStreamGraph.readObject used") this.synchronized { checkpointInProgress = true diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala index 213dff6a76..7053f47ec6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala @@ -33,6 +33,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Interval, Duration, Time} import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.api.java._ +import org.apache.spark.util.Utils /** @@ -73,13 +74,13 @@ private[python] class TransformFunction(@transient var pfunc: PythonTransformFun pfunc.call(time.milliseconds, rdds) } - private def writeObject(out: ObjectOutputStream): Unit = { + private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException { val bytes = PythonTransformFunctionSerializer.serialize(pfunc) out.writeInt(bytes.length) out.write(bytes) } - private def readObject(in: ObjectInputStream): Unit = { + private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { val length = in.readInt() val bytes = new Array[Byte](length) in.readFully(bytes) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 65f7ccd318..eabd61d713 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -31,7 +31,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.scheduler.Job -import org.apache.spark.util.{CallSite, MetadataCleaner} +import org.apache.spark.util.{CallSite, MetadataCleaner, Utils} /** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous @@ -400,7 +400,7 @@ abstract class DStream[T: ClassTag] ( } @throws(classOf[IOException]) - private def writeObject(oos: ObjectOutputStream) { + private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { logDebug(this.getClass().getSimpleName + ".writeObject used") if (graph != null) { graph.synchronized { @@ -423,7 +423,7 @@ abstract class DStream[T: ClassTag] ( } @throws(classOf[IOException]) - private def readObject(ois: ObjectInputStream) { + private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException { logDebug(this.getClass().getSimpleName + ".readObject used") ois.defaultReadObject() generatedRDDs = new HashMap[Time, RDD[T]] () diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala index f33c0ceafd..0dc72790fb 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.FileSystem import org.apache.spark.Logging import org.apache.spark.streaming.Time +import org.apache.spark.util.Utils private[streaming] class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) @@ -119,7 +120,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) } @throws(classOf[IOException]) - private def writeObject(oos: ObjectOutputStream) { + private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { logDebug(this.getClass().getSimpleName + ".writeObject used") if (dstream.context.graph != null) { dstream.context.graph.synchronized { @@ -142,7 +143,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T]) } @throws(classOf[IOException]) - private def readObject(ois: ObjectInputStream) { + private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException { logDebug(this.getClass().getSimpleName + ".readObject used") ois.defaultReadObject() timeToOldestCheckpointFileTime = new HashMap[Time, Time] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 9eecbfaef3..8152b7542a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark.rdd.RDD import org.apache.spark.rdd.UnionRDD import org.apache.spark.streaming.{StreamingContext, Time} -import org.apache.spark.util.TimeStampedHashMap +import org.apache.spark.util.{TimeStampedHashMap, Utils} private[streaming] @@ -151,7 +151,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas } @throws(classOf[IOException]) - private def readObject(ois: ObjectInputStream) { + private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException { logDebug(this.getClass().getSimpleName + ".readObject used") ois.defaultReadObject() generatedRDDs = new HashMap[Time, RDD[(K,V)]] () diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index 9327ff4822..2154c24abd 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -73,7 +73,7 @@ class TestOutputStream[T: ClassTag](parent: DStream[T], // This is to clear the output buffer every it is read from a checkpoint @throws(classOf[IOException]) - private def readObject(ois: ObjectInputStream) { + private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException { ois.defaultReadObject() output.clear() } @@ -95,7 +95,7 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T], // This is to clear the output buffer every it is read from a checkpoint @throws(classOf[IOException]) - private def readObject(ois: ObjectInputStream) { + private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException { ois.defaultReadObject() output.clear() } -- cgit v1.2.3