aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2014-10-24 15:06:15 -0700
committerJosh Rosen <joshrosen@databricks.com>2014-10-24 15:06:15 -0700
commit6c98c29ae0033556fd4424f41d1de005c509e511 (patch)
treeeeaae15f90955ef04a1794f7c966960bcdbbf3fd
parent3a906c6631a914da8ede3111c63f89a0dac3f369 (diff)
downloadspark-6c98c29ae0033556fd4424f41d1de005c509e511.tar.gz
spark-6c98c29ae0033556fd4424f41d1de005c509e511.tar.bz2
spark-6c98c29ae0033556fd4424f41d1de005c509e511.zip
[SPARK-4080] Only throw IOException from [write|read][Object|External]
If classes implementing Serializable or Externalizable interfaces throw exceptions other than IOException or ClassNotFoundException from their (de)serialization methods, then this results in an unhelpful "IOException: unexpected exception type" rather than the actual exception that produced the (de)serialization error. This patch fixes this by adding a utility method that re-wraps any uncaught exceptions in IOException (unless they are already instances of IOException). Author: Josh Rosen <joshrosen@databricks.com> Closes #2932 from JoshRosen/SPARK-4080 and squashes the following commits: cd3a9be [Josh Rosen] [SPARK-4080] Only throw IOException from [write|read][Object|External].
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulators.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/Partitioner.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/SerializableWritable.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/StorageLevel.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala15
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala4
30 files changed, 84 insertions, 52 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index 12f2fe031c..2301caafb0 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.Map
import scala.reflect.ClassTag
import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.util.Utils
/**
* A data type that can be accumulated, ie has an commutative and associative "add" operation,
@@ -126,7 +127,7 @@ class Accumulable[R, T] (
}
// Called by Java when deserializing an object
- private def readObject(in: ObjectInputStream) {
+ private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
value_ = zero
deserialized = true
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 37053bb6f3..e53a78ead2 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -204,7 +204,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
}
@throws(classOf[IOException])
- private def writeObject(out: ObjectOutputStream) {
+ private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
val sfactory = SparkEnv.get.serializer
sfactory match {
case js: JavaSerializer => out.defaultWriteObject()
@@ -222,7 +222,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
}
@throws(classOf[IOException])
- private def readObject(in: ObjectInputStream) {
+ private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
val sfactory = SparkEnv.get.serializer
sfactory match {
case js: JavaSerializer => in.defaultReadObject()
diff --git a/core/src/main/scala/org/apache/spark/SerializableWritable.scala b/core/src/main/scala/org/apache/spark/SerializableWritable.scala
index e50b9ac229..55cb25946c 100644
--- a/core/src/main/scala/org/apache/spark/SerializableWritable.scala
+++ b/core/src/main/scala/org/apache/spark/SerializableWritable.scala
@@ -24,18 +24,19 @@ import org.apache.hadoop.io.ObjectWritable
import org.apache.hadoop.io.Writable
import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.Utils
@DeveloperApi
class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable {
def value = t
override def toString = t.toString
- private def writeObject(out: ObjectOutputStream) {
+ private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
out.defaultWriteObject()
new ObjectWritable(t).write(out)
}
- private def readObject(in: ObjectInputStream) {
+ private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
val ow = new ObjectWritable()
ow.setConf(new Configuration())
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 4cd4f4f96f..7dade04273 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -72,13 +72,13 @@ private[spark] class HttpBroadcast[T: ClassTag](
}
/** Used by the JVM when serializing this object. */
- private def writeObject(out: ObjectOutputStream) {
+ private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
assertValid()
out.defaultWriteObject()
}
/** Used by the JVM when deserializing this object. */
- private def readObject(in: ObjectInputStream) {
+ private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
HttpBroadcast.synchronized {
SparkEnv.get.blockManager.getSingle(blockId) match {
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 99af2e9608..75e64c1bf4 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -28,7 +28,7 @@ import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.serializer.Serializer
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
-import org.apache.spark.util.ByteBufferInputStream
+import org.apache.spark.util.{ByteBufferInputStream, Utils}
import org.apache.spark.util.io.ByteArrayChunkOutputStream
/**
@@ -152,13 +152,13 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
}
/** Used by the JVM when serializing this object. */
- private def writeObject(out: ObjectOutputStream) {
+ private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
assertValid()
out.defaultWriteObject()
}
/** Used by the JVM when deserializing this object. */
- private def readObject(in: ObjectInputStream) {
+ private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
TorrentBroadcast.synchronized {
setConf(SparkEnv.get.conf)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index c3ca43f8d0..6ba395be1c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer
import akka.actor.ActorRef
import org.apache.spark.deploy.ApplicationDescription
+import org.apache.spark.util.Utils
private[spark] class ApplicationInfo(
val startTime: Long,
@@ -46,7 +47,7 @@ private[spark] class ApplicationInfo(
init()
- private def readObject(in: java.io.ObjectInputStream): Unit = {
+ private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
init()
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
index 80b570a44a..2ac2118688 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala
@@ -20,6 +20,7 @@ package org.apache.spark.deploy.master
import java.util.Date
import org.apache.spark.deploy.DriverDescription
+import org.apache.spark.util.Utils
private[spark] class DriverInfo(
val startTime: Long,
@@ -36,7 +37,7 @@ private[spark] class DriverInfo(
init()
- private def readObject(in: java.io.ObjectInputStream): Unit = {
+ private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
init()
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index c5fa9cf7d7..d221b0f6cc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -50,7 +50,7 @@ private[spark] class WorkerInfo(
def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed
- private def readObject(in: java.io.ObjectInputStream) : Unit = {
+ private def readObject(in: java.io.ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
init()
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
index 4908711d17..1cbd684224 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
@@ -22,6 +22,7 @@ import java.io.{IOException, ObjectOutputStream}
import scala.reflect.ClassTag
import org.apache.spark._
+import org.apache.spark.util.Utils
private[spark]
class CartesianPartition(
@@ -36,7 +37,7 @@ class CartesianPartition(
override val index: Int = idx
@throws(classOf[IOException])
- private def writeObject(oos: ObjectOutputStream) {
+ private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
// Update the reference to parent split at the time of task serialization
s1 = rdd1.partitions(s1Index)
s2 = rdd2.partitions(s2Index)
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index fabb882cdd..ffc0a8a6d6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -27,6 +27,7 @@ import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv
import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, CompactBuffer}
+import org.apache.spark.util.Utils
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.ShuffleHandle
@@ -39,7 +40,7 @@ private[spark] case class NarrowCoGroupSplitDep(
) extends CoGroupSplitDep {
@throws(classOf[IOException])
- private def writeObject(oos: ObjectOutputStream) {
+ private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
// Update the reference to parent split at the time of task serialization
split = rdd.partitions(splitIndex)
oos.defaultWriteObject()
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index 11ebafbf6d..9fab1d78ab 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -25,6 +25,7 @@ import scala.language.existentials
import scala.reflect.ClassTag
import org.apache.spark._
+import org.apache.spark.util.Utils
/**
* Class that captures a coalesced RDD by essentially keeping track of parent partitions
@@ -42,7 +43,7 @@ private[spark] case class CoalescedRDDPartition(
var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_))
@throws(classOf[IOException])
- private def writeObject(oos: ObjectOutputStream) {
+ private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
// Update the reference to parent partition at the time of task serialization
parents = parentsIndices.map(rdd.partitions(_))
oos.defaultWriteObject()
diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
index 66c71bf7e8..87b22de6ae 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -48,7 +48,7 @@ private[spark] class ParallelCollectionPartition[T: ClassTag](
override def index: Int = slice
@throws(classOf[IOException])
- private def writeObject(out: ObjectOutputStream): Unit = {
+ private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
val sfactory = SparkEnv.get.serializer
@@ -67,7 +67,7 @@ private[spark] class ParallelCollectionPartition[T: ClassTag](
}
@throws(classOf[IOException])
- private def readObject(in: ObjectInputStream): Unit = {
+ private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
val sfactory = SparkEnv.get.serializer
sfactory match {
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
index 0c2cd7a247..92b0641d0f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
@@ -22,6 +22,7 @@ import java.io.{IOException, ObjectOutputStream}
import scala.reflect.ClassTag
import org.apache.spark.{OneToOneDependency, Partition, SparkContext, TaskContext}
+import org.apache.spark.util.Utils
/**
* Class representing partitions of PartitionerAwareUnionRDD, which maintains the list of
@@ -38,7 +39,7 @@ class PartitionerAwareUnionRDDPartition(
override def hashCode(): Int = idx
@throws(classOf[IOException])
- private def writeObject(oos: ObjectOutputStream) {
+ private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
// Update the reference to parent partition at the time of task serialization
parents = rdds.map(_.partitions(index)).toArray
oos.defaultWriteObject()
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index 0c97eb0aaa..aece683ff3 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -24,6 +24,7 @@ import scala.reflect.ClassTag
import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, TaskContext}
import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.Utils
/**
* Partition for UnionRDD.
@@ -48,7 +49,7 @@ private[spark] class UnionPartition[T: ClassTag](
override val index: Int = idx
@throws(classOf[IOException])
- private def writeObject(oos: ObjectOutputStream) {
+ private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
// Update the reference to parent split at the time of task serialization
parentPartition = rdd.partitions(parentRddPartitionIndex)
oos.defaultWriteObject()
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index f3d30f6c9b..996f2cd3f3 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -22,6 +22,7 @@ import java.io.{IOException, ObjectOutputStream}
import scala.reflect.ClassTag
import org.apache.spark.{OneToOneDependency, Partition, SparkContext, TaskContext}
+import org.apache.spark.util.Utils
private[spark] class ZippedPartitionsPartition(
idx: Int,
@@ -34,7 +35,7 @@ private[spark] class ZippedPartitionsPartition(
def partitions = partitionValues
@throws(classOf[IOException])
- private def writeObject(oos: ObjectOutputStream) {
+ private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
// Update the reference to parent split at the time of task serialization
partitionValues = rdds.map(rdd => rdd.partitions(idx))
oos.defaultWriteObject()
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index 2ab5d9637b..01d5943d77 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -22,6 +22,7 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
import org.roaringbitmap.RoaringBitmap
import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.util.Utils
/**
* Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
@@ -105,13 +106,13 @@ private[spark] class CompressedMapStatus(
MapStatus.decompressSize(compressedSizes(reduceId))
}
- override def writeExternal(out: ObjectOutput): Unit = {
+ override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
loc.writeExternal(out)
out.writeInt(compressedSizes.length)
out.write(compressedSizes)
}
- override def readExternal(in: ObjectInput): Unit = {
+ override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
val len = in.readInt()
compressedSizes = new Array[Byte](len)
@@ -152,13 +153,13 @@ private[spark] class HighlyCompressedMapStatus private (
}
}
- override def writeExternal(out: ObjectOutput): Unit = {
+ override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
loc.writeExternal(out)
emptyBlocks.writeExternal(out)
out.writeLong(avgSize)
}
- override def readExternal(in: ObjectInput): Unit = {
+ override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
emptyBlocks = new RoaringBitmap()
emptyBlocks.readExternal(in)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index d49d8fb887..11c19eeb6e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -42,7 +42,7 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long
def this() = this(null.asInstanceOf[ByteBuffer], null, null)
- override def writeExternal(out: ObjectOutput) {
+ override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
out.writeInt(valueBytes.remaining);
Utils.writeByteBuffer(valueBytes, out)
@@ -55,7 +55,7 @@ class DirectTaskResult[T](var valueBytes: ByteBuffer, var accumUpdates: Map[Long
out.writeObject(metrics)
}
- override def readExternal(in: ObjectInput) {
+ override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
val blen = in.readInt()
val byteVal = new Array[Byte](blen)
diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
index 554a33ce7f..662a7b9124 100644
--- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -117,11 +117,11 @@ class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable {
new JavaSerializerInstance(counterReset, classLoader)
}
- override def writeExternal(out: ObjectOutput) {
+ override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
out.writeInt(counterReset)
}
- override def readExternal(in: ObjectInput) {
+ override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
counterReset = in.readInt()
}
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index 1422850943..259f423c73 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -61,13 +61,13 @@ class BlockManagerId private (
def isDriver: Boolean = (executorId == "<driver>")
- override def writeExternal(out: ObjectOutput) {
+ override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
out.writeUTF(executorId_)
out.writeUTF(host_)
out.writeInt(port_)
}
- override def readExternal(in: ObjectInput) {
+ override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
executorId_ = in.readUTF()
host_ = in.readUTF()
port_ = in.readInt()
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 3db5dd9774..291ddfcc11 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -21,6 +21,8 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput}
import akka.actor.ActorRef
+import org.apache.spark.util.Utils
+
private[spark] object BlockManagerMessages {
//////////////////////////////////////////////////////////////////////////////////
// Messages from the master to slaves.
@@ -65,7 +67,7 @@ private[spark] object BlockManagerMessages {
def this() = this(null, null, null, 0, 0, 0) // For deserialization only
- override def writeExternal(out: ObjectOutput) {
+ override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
blockManagerId.writeExternal(out)
out.writeUTF(blockId.name)
storageLevel.writeExternal(out)
@@ -74,7 +76,7 @@ private[spark] object BlockManagerMessages {
out.writeLong(tachyonSize)
}
- override def readExternal(in: ObjectInput) {
+ override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
blockManagerId = BlockManagerId(in)
blockId = BlockId(in.readUTF())
storageLevel = StorageLevel(in)
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 1e35abaab5..56edc4fe2e 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -20,6 +20,7 @@ package org.apache.spark.storage
import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.util.Utils
/**
* :: DeveloperApi ::
@@ -97,12 +98,12 @@ class StorageLevel private(
ret
}
- override def writeExternal(out: ObjectOutput) {
+ override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
out.writeByte(toInt)
out.writeByte(_replication)
}
- override def readExternal(in: ObjectInput) {
+ override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
val flags = in.readByte()
_useDisk = (flags & 8) != 0
_useMemory = (flags & 4) != 0
diff --git a/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala b/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala
index 2b452ad33b..770ff9d5ad 100644
--- a/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/SerializableBuffer.scala
@@ -29,7 +29,7 @@ private[spark]
class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable {
def value = buffer
- private def readObject(in: ObjectInputStream) {
+ private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
val length = in.readInt()
buffer = ByteBuffer.allocate(length)
var amountRead = 0
@@ -44,7 +44,7 @@ class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable
buffer.rewind() // Allow us to read it later
}
- private def writeObject(out: ObjectOutputStream) {
+ private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
out.writeInt(buffer.limit())
if (Channels.newChannel(out).write(buffer) != buffer.limit()) {
throw new IOException("Could not fully write buffer to output stream")
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 65bdbaae65..e1dc492387 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -969,6 +969,21 @@ private[spark] object Utils extends Logging {
}
}
+ /**
+ * Execute a block of code that evaluates to Unit, re-throwing any non-fatal uncaught
+ * exceptions as IOException. This is used when implementing Externalizable and Serializable's
+ * read and write methods, since Java's serializer will not report non-IOExceptions properly;
+ * see SPARK-4080 for more context.
+ */
+ def tryOrIOException(block: => Unit) {
+ try {
+ block
+ } catch {
+ case e: IOException => throw e
+ case NonFatal(t) => throw new IOException(t)
+ }
+ }
+
/** Default filtering function for finding call sites using `getCallSite`. */
private def coreExclusionFunction(className: String): Boolean = {
// A regular expression to match classes of the "core" Spark API that we want to skip when
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 4b2ea45fb8..2de2a7926b 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -66,7 +66,7 @@ class SparkFlumeEvent() extends Externalizable {
var event : AvroFlumeEvent = new AvroFlumeEvent()
/* De-serialize from bytes. */
- def readExternal(in: ObjectInput) {
+ def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
val bodyLength = in.readInt()
val bodyBuff = new Array[Byte](bodyLength)
in.readFully(bodyBuff)
@@ -93,7 +93,7 @@ class SparkFlumeEvent() extends Externalizable {
}
/* Serialize to bytes. */
- def writeExternal(out: ObjectOutput) {
+ def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
val body = event.getBody.array()
out.writeInt(body.length)
out.write(body)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index b4adf0e965..e59c24adb8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -22,6 +22,7 @@ import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
import org.apache.spark.Logging
import org.apache.spark.streaming.scheduler.Job
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream, InputDStream}
+import org.apache.spark.util.Utils
final private[streaming] class DStreamGraph extends Serializable with Logging {
@@ -160,7 +161,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
}
@throws(classOf[IOException])
- private def writeObject(oos: ObjectOutputStream) {
+ private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
logDebug("DStreamGraph.writeObject used")
this.synchronized {
checkpointInProgress = true
@@ -172,7 +173,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
}
@throws(classOf[IOException])
- private def readObject(ois: ObjectInputStream) {
+ private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
logDebug("DStreamGraph.readObject used")
this.synchronized {
checkpointInProgress = true
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
index 213dff6a76..7053f47ec6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala
@@ -33,6 +33,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Interval, Duration, Time}
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.api.java._
+import org.apache.spark.util.Utils
/**
@@ -73,13 +74,13 @@ private[python] class TransformFunction(@transient var pfunc: PythonTransformFun
pfunc.call(time.milliseconds, rdds)
}
- private def writeObject(out: ObjectOutputStream): Unit = {
+ private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
val bytes = PythonTransformFunctionSerializer.serialize(pfunc)
out.writeInt(bytes.length)
out.write(bytes)
}
- private def readObject(in: ObjectInputStream): Unit = {
+ private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
val length = in.readInt()
val bytes = new Array[Byte](length)
in.readFully(bytes)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
index 65f7ccd318..eabd61d713 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
@@ -31,7 +31,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.scheduler.Job
-import org.apache.spark.util.{CallSite, MetadataCleaner}
+import org.apache.spark.util.{CallSite, MetadataCleaner, Utils}
/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
@@ -400,7 +400,7 @@ abstract class DStream[T: ClassTag] (
}
@throws(classOf[IOException])
- private def writeObject(oos: ObjectOutputStream) {
+ private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
logDebug(this.getClass().getSimpleName + ".writeObject used")
if (graph != null) {
graph.synchronized {
@@ -423,7 +423,7 @@ abstract class DStream[T: ClassTag] (
}
@throws(classOf[IOException])
- private def readObject(ois: ObjectInputStream) {
+ private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
logDebug(this.getClass().getSimpleName + ".readObject used")
ois.defaultReadObject()
generatedRDDs = new HashMap[Time, RDD[T]] ()
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
index f33c0ceafd..0dc72790fb 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.Logging
import org.apache.spark.streaming.Time
+import org.apache.spark.util.Utils
private[streaming]
class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
@@ -119,7 +120,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
}
@throws(classOf[IOException])
- private def writeObject(oos: ObjectOutputStream) {
+ private def writeObject(oos: ObjectOutputStream): Unit = Utils.tryOrIOException {
logDebug(this.getClass().getSimpleName + ".writeObject used")
if (dstream.context.graph != null) {
dstream.context.graph.synchronized {
@@ -142,7 +143,7 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
}
@throws(classOf[IOException])
- private def readObject(ois: ObjectInputStream) {
+ private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
logDebug(this.getClass().getSimpleName + ".readObject used")
ois.defaultReadObject()
timeToOldestCheckpointFileTime = new HashMap[Time, Time]
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 9eecbfaef3..8152b7542a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.UnionRDD
import org.apache.spark.streaming.{StreamingContext, Time}
-import org.apache.spark.util.TimeStampedHashMap
+import org.apache.spark.util.{TimeStampedHashMap, Utils}
private[streaming]
@@ -151,7 +151,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
}
@throws(classOf[IOException])
- private def readObject(ois: ObjectInputStream) {
+ private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
logDebug(this.getClass().getSimpleName + ".readObject used")
ois.defaultReadObject()
generatedRDDs = new HashMap[Time, RDD[(K,V)]] ()
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 9327ff4822..2154c24abd 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -73,7 +73,7 @@ class TestOutputStream[T: ClassTag](parent: DStream[T],
// This is to clear the output buffer every it is read from a checkpoint
@throws(classOf[IOException])
- private def readObject(ois: ObjectInputStream) {
+ private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
ois.defaultReadObject()
output.clear()
}
@@ -95,7 +95,7 @@ class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
// This is to clear the output buffer every it is read from a checkpoint
@throws(classOf[IOException])
- private def readObject(ois: ObjectInputStream) {
+ private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
ois.defaultReadObject()
output.clear()
}