From 17f499920776e0e995434cfa300ff2ff38658fa8 Mon Sep 17 00:00:00 2001 From: maxwell Date: Tue, 27 Oct 2015 01:31:28 -0700 Subject: [SPARK-5569][STREAMING] fix ObjectInputStreamWithLoader for supporting load array classes. When use Kafka DirectStream API to create checkpoint and restore saved checkpoint when restart, ClassNotFound exception would occur. The reason for this error is that ObjectInputStreamWithLoader extends the ObjectInputStream class and override its resolveClass method. But Instead of Using Class.forName(desc,false,loader), Spark uses loader.loadClass(desc) to instance the class, which do not works with array class. For example: Class.forName("[Lorg.apache.spark.streaming.kafka.OffsetRange.",false,loader) works well while loader.loadClass("[Lorg.apache.spark.streaming.kafka.OffsetRange") would throw an class not found exception. details of the difference between Class.forName and loader.loadClass can be found here. http://bugs.java.com/view_bug.do?bug_id=6446627 Author: maxwell Author: DEMING ZHU Closes #8955 from maxwellzdm/master. --- .../org/apache/spark/streaming/Checkpoint.scala | 4 ++- .../apache/spark/streaming/CheckpointSuite.scala | 35 ++++++++++++++++++++-- 2 files changed, 36 insertions(+), 3 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 8a6050f522..b7de6dde61 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -352,7 +352,9 @@ class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoade override def resolveClass(desc: ObjectStreamClass): Class[_] = { try { - return loader.loadClass(desc.getName()) + // scalastyle:off classforname + return Class.forName(desc.getName(), false, loader) + // scalastyle:on classforname } catch { case e: Exception => } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index a6956533c0..84f5294aa3 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -17,7 +17,8 @@ package org.apache.spark.streaming -import java.io.File +import java.io.{ObjectOutputStream, ByteArrayOutputStream, ByteArrayInputStream, File} +import org.apache.spark.TestUtils import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.reflect.ClassTag @@ -34,7 +35,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.streaming.dstream.{DStream, FileInputDStream} import org.apache.spark.streaming.scheduler.{ConstantEstimator, RateTestInputDStream, RateTestReceiver} -import org.apache.spark.util.{Clock, ManualClock, Utils} +import org.apache.spark.util.{MutableURLClassLoader, Clock, ManualClock, Utils} /** * This test suites tests the checkpointing functionality of DStreams - @@ -579,6 +580,36 @@ class CheckpointSuite extends TestSuiteBase { } } + // This tests whether spark can deserialize array object + // refer to SPARK-5569 + test("recovery from checkpoint contains array object") { + // create a class which is invisible to app class loader + val jar = TestUtils.createJarWithClasses( + classNames = Seq("testClz"), + toStringValue = "testStringValue" + ) + + // invisible to current class loader + val appClassLoader = getClass.getClassLoader + intercept[ClassNotFoundException](appClassLoader.loadClass("testClz")) + + // visible to mutableURLClassLoader + val loader = new MutableURLClassLoader( + Array(jar), appClassLoader) + assert(loader.loadClass("testClz").newInstance().toString == "testStringValue") + + // create and serialize Array[testClz] + // scalastyle:off classforname + val arrayObj = Class.forName("[LtestClz;", false, loader) + // scalastyle:on classforname + val bos = new ByteArrayOutputStream() + new ObjectOutputStream(bos).writeObject(arrayObj) + + // deserialize the Array[testClz] + val ois = new ObjectInputStreamWithLoader( + new ByteArrayInputStream(bos.toByteArray), loader) + assert(ois.readObject().asInstanceOf[Class[_]].getName == "[LtestClz;") + } /** * Tests a streaming operation under checkpointing, by restarting the operation -- cgit v1.2.3