diff options
-rw-r--r-- | core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala | 15 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala | 3 |
2 files changed, 12 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala index c5f6062a92..1baa0e009f 100644 --- a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala @@ -27,7 +27,8 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.ByteBufferInputStream import org.apache.spark.util.Utils -private[spark] class JavaSerializationStream(out: OutputStream, counterReset: Int) +private[spark] class JavaSerializationStream( + out: OutputStream, counterReset: Int, extraDebugInfo: Boolean) extends SerializationStream { private val objOut = new ObjectOutputStream(out) private var counter = 0 @@ -42,7 +43,7 @@ private[spark] class JavaSerializationStream(out: OutputStream, counterReset: In try { objOut.writeObject(t) } catch { - case e: NotSerializableException => + case e: NotSerializableException if extraDebugInfo => throw SerializationDebugger.improveException(t, e) } counter += 1 @@ -69,7 +70,8 @@ extends DeserializationStream { } -private[spark] class JavaSerializerInstance(counterReset: Int, defaultClassLoader: ClassLoader) +private[spark] class JavaSerializerInstance( + counterReset: Int, extraDebugInfo: Boolean, defaultClassLoader: ClassLoader) extends SerializerInstance { override def serialize[T: ClassTag](t: T): ByteBuffer = { @@ -93,7 +95,7 @@ private[spark] class JavaSerializerInstance(counterReset: Int, defaultClassLoade } override def serializeStream(s: OutputStream): SerializationStream = { - new JavaSerializationStream(s, counterReset) + new JavaSerializationStream(s, counterReset, extraDebugInfo) } override def deserializeStream(s: InputStream): DeserializationStream = { @@ -116,17 +118,20 @@ private[spark] class JavaSerializerInstance(counterReset: Int, defaultClassLoade @DeveloperApi class JavaSerializer(conf: SparkConf) extends Serializer with Externalizable { private var counterReset = conf.getInt("spark.serializer.objectStreamReset", 100) + private var extraDebugInfo = conf.getBoolean("spark.serializer.extraDebugInfo", true) override def newInstance(): SerializerInstance = { val classLoader = defaultClassLoader.getOrElse(Thread.currentThread.getContextClassLoader) - new JavaSerializerInstance(counterReset, classLoader) + new JavaSerializerInstance(counterReset, extraDebugInfo, classLoader) } override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { out.writeInt(counterReset) + out.writeBoolean(extraDebugInfo) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { counterReset = in.readInt() + extraDebugInfo = in.readBoolean() } } diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala b/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala index cea7d2a864..cecb992579 100644 --- a/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala +++ b/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala @@ -30,7 +30,8 @@ private[serializer] object SerializationDebugger extends Logging { /** * Improve the given NotSerializableException with the serialization path leading from the given - * object to the problematic object. + * object to the problematic object. This is turned off automatically if + * `sun.io.serialization.extendedDebugInfo` flag is turned on for the JVM. */ def improveException(obj: Any, e: NotSerializableException): NotSerializableException = { if (enableDebugging && reflect != null) { |