From 6c98c29ae0033556fd4424f41d1de005c509e511 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 24 Oct 2014 15:06:15 -0700 Subject: [SPARK-4080] Only throw IOException from [write|read][Object|External] If classes implementing Serializable or Externalizable interfaces throw exceptions other than IOException or ClassNotFoundException from their (de)serialization methods, then this results in an unhelpful "IOException: unexpected exception type" rather than the actual exception that produced the (de)serialization error. This patch fixes this by adding a utility method that re-wraps any uncaught exceptions in IOException (unless they are already instances of IOException). Author: Josh Rosen Closes #2932 from JoshRosen/SPARK-4080 and squashes the following commits: cd3a9be [Josh Rosen] [SPARK-4080] Only throw IOException from [write|read][Object|External]. --- core/src/main/scala/org/apache/spark/Accumulators.scala | 3 ++- core/src/main/scala/org/apache/spark/Partitioner.scala | 4 ++-- .../scala/org/apache/spark/SerializableWritable.scala | 5 +++-- .../scala/org/apache/spark/broadcast/HttpBroadcast.scala | 4 ++-- .../org/apache/spark/broadcast/TorrentBroadcast.scala | 6 +++--- .../org/apache/spark/deploy/master/ApplicationInfo.scala | 3 ++- .../scala/org/apache/spark/deploy/master/DriverInfo.scala | 3 ++- .../scala/org/apache/spark/deploy/master/WorkerInfo.scala | 2 +- .../main/scala/org/apache/spark/rdd/CartesianRDD.scala | 3 ++- .../main/scala/org/apache/spark/rdd/CoGroupedRDD.scala | 3 ++- .../main/scala/org/apache/spark/rdd/CoalescedRDD.scala | 3 ++- .../org/apache/spark/rdd/ParallelCollectionRDD.scala | 4 ++-- .../org/apache/spark/rdd/PartitionerAwareUnionRDD.scala | 3 ++- core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala | 3 ++- .../scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala | 3 ++- .../main/scala/org/apache/spark/scheduler/MapStatus.scala | 9 +++++---- .../scala/org/apache/spark/scheduler/TaskResult.scala | 4 ++-- .../org/apache/spark/serializer/JavaSerializer.scala | 4 ++-- .../scala/org/apache/spark/storage/BlockManagerId.scala | 4 ++-- .../org/apache/spark/storage/BlockManagerMessages.scala | 6 ++++-- .../scala/org/apache/spark/storage/StorageLevel.scala | 5 +++-- .../scala/org/apache/spark/util/SerializableBuffer.scala | 4 ++-- core/src/main/scala/org/apache/spark/util/Utils.scala | 15 +++++++++++++++ 23 files changed, 66 insertions(+), 37 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 12f2fe031c..2301caafb0 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -24,6 +24,7 @@ import scala.collection.mutable.Map import scala.reflect.ClassTag import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.util.Utils /** * A data type that can be accumulated, ie has an commutative and associative "add" operation, @@ -126,7 +127,7 @@ class Accumulable[R, T] ( } // Called by Java when deserializing an object - private def readObject(in: ObjectInputStream) { + private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { in.defaultReadObject() value_ = zero deserialized = true diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 37053bb6f3..e53a78ead2 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -204,7 +204,7 @@ class RangePartitioner[K : Ordering : ClassTag, V]( } @throws(classOf[IOException]) - private def writeObject(out: ObjectOutputStream) { + private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException { val sfactory = SparkEnv.get.serializer sfactory match { case js: JavaSerializer => out.defaultWriteObject() @@ -222,7 +222,7 @@ class RangePartitioner[K : Ordering : ClassTag, V]( } @throws(classOf[IOException]) - private def readObject(in: ObjectInputStream) { + private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { val sfactory = SparkEnv.get.serializer sfactory match { case js: JavaSerializer => in.defaultReadObject() diff --git a/core/src/main/scala/org/apache/spark/SerializableWritable.scala b/core/src/main/scala/org/apache/spark/SerializableWritable.scala index e50b9ac229..55cb25946c 100644 --- a/core/src/main/scala/org/apache/spark/SerializableWritable.scala +++ b/core/src/main/scala/org/apache/spark/SerializableWritable.scala @@ -24,18 +24,19 @@ import org.apache.hadoop.io.ObjectWritable import org.apache.hadoop.io.Writable import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.Utils @DeveloperApi class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable { def value = t override def toString = t.toString - private def writeObject(out: ObjectOutputStream) { + private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException { out.defaultWriteObject() new ObjectWritable(t).write(out) } - private def readObject(in: ObjectInputStream) { + private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { in.defaultReadObject() val ow = new ObjectWritable() ow.setConf(new Configuration()) diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 4cd4f4f96f..7dade04273 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -72,13 +72,13 @@ private[spark] class HttpBroadcast[T: ClassTag]( } /** Used by the JVM when serializing this object. */ - private def writeObject(out: ObjectOutputStream) { + private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException { assertValid() out.defaultWriteObject() } /** Used by the JVM when deserializing this object. */ - private def readObject(in: ObjectInputStream) { + private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { in.defaultReadObject() HttpBroadcast.synchronized { SparkEnv.get.blockManager.getSingle(blockId) match { diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 99af2e9608..75e64c1bf4 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -28,7 +28,7 @@ import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException} import org.apache.spark.io.CompressionCodec import org.apache.spark.serializer.Serializer import org.apache.spark.storage.{BroadcastBlockId, StorageLevel} -import org.apache.spark.util.ByteBufferInputStream +import org.apache.spark.util.{ByteBufferInputStream, Utils} import org.apache.spark.util.io.ByteArrayChunkOutputStream /** @@ -152,13 +152,13 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) } /** Used by the JVM when serializing this object. */ - private def writeObject(out: ObjectOutputStream) { + private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException { assertValid() out.defaultWriteObject() } /** Used by the JVM when deserializing this object. */ - private def readObject(in: ObjectInputStream) { + private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { in.defaultReadObject() TorrentBroadcast.synchronized { setConf(SparkEnv.get.conf) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index c3ca43f8d0..6ba395be1c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer import akka.actor.ActorRef import org.apache.spark.deploy.ApplicationDescription +import org.apache.spark.util.Utils private[spark] class ApplicationInfo( val startTime: Long, @@ -46,7 +47,7 @@ private[spark] class ApplicationInfo( init() - private def readObject(in: java.io.ObjectInputStream): Unit = { + private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException { in.defaultReadObject() init() } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala index 80b570a44a..2ac2118688 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala @@ -20,6 +20,7 @@ package org.apache.spark.deploy.master import java.util.Date import org.apache.spark.deploy.DriverDescription +import org.apache.spark.util.Utils private[spark] class DriverInfo( val startTime: Long, @@ -36,7 +37,7 @@ private[spark] class DriverInfo( init() - private def readObject(in: java.io.ObjectInputStream): Unit = { + private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException { in.defaultReadObject() init() } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index c5fa9cf7d7..d221b0f6cc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -50,7 +50,7 @@ private[spark] class WorkerInfo( def coresFree: Int = cores - coresUsed def memoryFree: Int = memory - memoryUsed - private def readObject(in: java.io.ObjectInputStream) : Unit = { + private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException { in.defaultReadObject() init() } diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala index 4908711d17..1cbd684224 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala @@ -22,6 +22,7 @@ import java.io.{IOException, ObjectOutputStream} import scala.reflect.ClassTag import org.apache.spark._ +import org.apache.spark.util.Utils private[spark] class CartesianPartition( @@ -36,7 +37,7 @@ class CartesianPartition( override val index: Int = idx @throws(classOf[IOException]) - private def writeObject(oos: ObjectOutputStream) { + private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { // Update the reference to parent split at the time of task serialization s1 = rdd1.partitions(s1Index) s2 = rdd2.partitions(s2Index) diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala index fabb882cdd..ffc0a8a6d6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala @@ -27,6 +27,7 @@ import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, CompactBuffer} +import org.apache.spark.util.Utils import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.ShuffleHandle @@ -39,7 +40,7 @@ private[spark] case class NarrowCoGroupSplitDep( ) extends CoGroupSplitDep { @throws(classOf[IOException]) - private def writeObject(oos: ObjectOutputStream) { + private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { // Update the reference to parent split at the time of task serialization split = rdd.partitions(splitIndex) oos.defaultWriteObject() diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 11ebafbf6d..9fab1d78ab 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -25,6 +25,7 @@ import scala.language.existentials import scala.reflect.ClassTag import org.apache.spark._ +import org.apache.spark.util.Utils /** * Class that captures a coalesced RDD by essentially keeping track of parent partitions @@ -42,7 +43,7 @@ private[spark] case class CoalescedRDDPartition( var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_)) @throws(classOf[IOException]) - private def writeObject(oos: ObjectOutputStream) { + private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { // Update the reference to parent partition at the time of task serialization parents = parentsIndices.map(rdd.partitions(_)) oos.defaultWriteObject() diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala index 66c71bf7e8..87b22de6ae 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala @@ -48,7 +48,7 @@ private[spark] class ParallelCollectionPartition[T: ClassTag]( override def index: Int = slice @throws(classOf[IOException]) - private def writeObject(out: ObjectOutputStream): Unit = { + private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException { val sfactory = SparkEnv.get.serializer @@ -67,7 +67,7 @@ private[spark] class ParallelCollectionPartition[T: ClassTag]( } @throws(classOf[IOException]) - private def readObject(in: ObjectInputStream): Unit = { + private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { val sfactory = SparkEnv.get.serializer sfactory match { diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala index 0c2cd7a247..92b0641d0f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala @@ -22,6 +22,7 @@ import java.io.{IOException, ObjectOutputStream} import scala.reflect.ClassTag import org.apache.spark.{OneToOneDependency, Partition, SparkContext, TaskContext} +import org.apache.spark.util.Utils /** * Class representing partitions of PartitionerAwareUnionRDD, which maintains the list of @@ -38,7 +39,7 @@ class PartitionerAwareUnionRDDPartition( override def hashCode(): Int = idx @throws(classOf[IOException]) - private def writeObject(oos: ObjectOutputStream) { + private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { // Update the reference to parent partition at the time of task serialization parents = rdds.map(_.partitions(index)).toArray oos.defaultWriteObject() diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala index 0c97eb0aaa..aece683ff3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala @@ -24,6 +24,7 @@ import scala.reflect.ClassTag import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, TaskContext} import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.Utils /** * Partition for UnionRDD. @@ -48,7 +49,7 @@ private[spark] class UnionPartition[T: ClassTag]( override val index: Int = idx @throws(classOf[IOException]) - private def writeObject(oos: ObjectOutputStream) { + private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { // Update the reference to parent split at the time of task serialization parentPartition = rdd.partitions(parentRddPartitionIndex) oos.defaultWriteObject() diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala index f3d30f6c9b..996f2cd3f3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala @@ -22,6 +22,7 @@ import java.io.{IOException, ObjectOutputStream} import scala.reflect.ClassTag import org.apache.spark.{OneToOneDependency, Partition, SparkContext, TaskContext} +import org.apache.spark.util.Utils private[spark] class ZippedPartitionsPartition( idx: Int, @@ -34,7 +35,7 @@ private[spark] class ZippedPartitionsPartition( def partitions = partitionValues @throws(classOf[IOException]) - private def writeObject(oos: ObjectOutputStream) { + private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException { // Update the reference to parent split at the time of task serialization partitionValues = rdds.map(rdd => rdd.partitions(idx)) oos.defaultWriteObject() diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala index 2ab5d9637b..01d5943d77 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala @@ -22,6 +22,7 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput} import org.roaringbitmap.RoaringBitmap import org.apache.spark.storage.BlockManagerId +import org.apache.spark.util.Utils /** * Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the @@ -105,13 +106,13 @@ private[spark] class CompressedMapStatus( MapStatus.decompressSize(compressedSizes(reduceId)) } - override def writeExternal(out: ObjectOutput): Unit = { + override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { loc.writeExternal(out) out.writeInt(compressedSizes.length) out.write(compressedSizes) } - override def readExternal(in: ObjectInput): Unit = { + override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { loc = BlockManagerId(in) val len = in.readInt() compressedSizes = new Array[Byte](len) @@ -152,13 +153,13 @@ private[spark] class HighlyCompressedMapStatus private ( } } - override def writeExternal(out: ObjectOutput): Unit = { + override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { loc.writeExternal(out) emptyBlocks.writeExternal(out) out.writeLong(avgSize) } - override def readExternal(in: ObjectInput): Unit = { + override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { loc = BlockManagerId(in) emptyBlocks = new RoaringBitmap() emptyBlocks.readExternal(in) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala index d49d8fb887..11c19eeb6e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala @@ -42,7 +42,7 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long def this() = this(null.asInstanceOf[ByteBuffer], null, null) - override def writeExternal(out: ObjectOutput) { + override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { out.writeInt(valueBytes.remaining); Utils.writeByteBuffer(valueBytes, out) @@ -55,7 +55,7 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long out.writeObject(metrics) } - override def readExternal(in: ObjectInput) { + override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { val blen = in.readInt() val byteVal = new Array[Byte](blen) diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala index 554a33ce7f..662a7b9124 100644 --- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala @@ -117,11 +117,11 @@ class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable { new JavaSerializerInstance(counterReset, classLoader) } - override def writeExternal(out: ObjectOutput) { + override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { out.writeInt(counterReset) } - override def readExternal(in: ObjectInput) { + override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { counterReset = in.readInt() } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala index 1422850943..259f423c73 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala @@ -61,13 +61,13 @@ class BlockManagerId private ( def isDriver: Boolean = (executorId == "") - override def writeExternal(out: ObjectOutput) { + override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { out.writeUTF(executorId_) out.writeUTF(host_) out.writeInt(port_) } - override def readExternal(in: ObjectInput) { + override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { executorId_ = in.readUTF() host_ = in.readUTF() port_ = in.readInt() diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala index 3db5dd9774..291ddfcc11 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala @@ -21,6 +21,8 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput} import akka.actor.ActorRef +import org.apache.spark.util.Utils + private[spark] object BlockManagerMessages { ////////////////////////////////////////////////////////////////////////////////// // Messages from the master to slaves. @@ -65,7 +67,7 @@ private[spark] object BlockManagerMessages { def this() = this(null, null, null, 0, 0, 0) // For deserialization only - override def writeExternal(out: ObjectOutput) { + override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { blockManagerId.writeExternal(out) out.writeUTF(blockId.name) storageLevel.writeExternal(out) @@ -74,7 +76,7 @@ private[spark] object BlockManagerMessages { out.writeLong(tachyonSize) } - override def readExternal(in: ObjectInput) { + override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { blockManagerId = BlockManagerId(in) blockId = BlockId(in.readUTF()) storageLevel = StorageLevel(in) diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 1e35abaab5..56edc4fe2e 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -20,6 +20,7 @@ package org.apache.spark.storage import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput} import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.Utils /** * :: DeveloperApi :: @@ -97,12 +98,12 @@ class StorageLevel private( ret } - override def writeExternal(out: ObjectOutput) { + override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { out.writeByte(toInt) out.writeByte(_replication) } - override def readExternal(in: ObjectInput) { + override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { val flags = in.readByte() _useDisk = (flags & 8) != 0 _useMemory = (flags & 4) != 0 diff --git a/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala b/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala index 2b452ad33b..770ff9d5ad 100644 --- a/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala @@ -29,7 +29,7 @@ private[spark] class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable { def value = buffer - private def readObject(in: ObjectInputStream) { + private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { val length = in.readInt() buffer = ByteBuffer.allocate(length) var amountRead = 0 @@ -44,7 +44,7 @@ class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable buffer.rewind() // Allow us to read it later } - private def writeObject(out: ObjectOutputStream) { + private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException { out.writeInt(buffer.limit()) if (Channels.newChannel(out).write(buffer) != buffer.limit()) { throw new IOException("Could not fully write buffer to output stream") diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 65bdbaae65..e1dc492387 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -969,6 +969,21 @@ private[spark] object Utils extends Logging { } } + /** + * Execute a block of code that evaluates to Unit, re-throwing any non-fatal uncaught + * exceptions as IOException. This is used when implementing Externalizable and Serializable's + * read and write methods, since Java's serializer will not report non-IOExceptions properly; + * see SPARK-4080 for more context. + */ + def tryOrIOException(block: => Unit) { + try { + block + } catch { + case e: IOException => throw e + case NonFatal(t) => throw new IOException(t) + } + } + /** Default filtering function for finding call sites using `getCallSite`. */ private def coreExclusionFunction(className: String): Boolean = { // A regular expression to match classes of the "core" Spark API that we want to skip when -- cgit v1.2.3