aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authormaxwell <maxwellzdm@gmail.com>2015-10-27 01:31:28 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-10-27 01:31:28 -0700
commit17f499920776e0e995434cfa300ff2ff38658fa8 (patch)
tree70c5388f947c850ac764eb4da6bf472a619834e8 /streaming
parent8f888eea1aef5a28916ec406a99fc19648681ecf (diff)
downloadspark-17f499920776e0e995434cfa300ff2ff38658fa8.tar.gz
spark-17f499920776e0e995434cfa300ff2ff38658fa8.tar.bz2
spark-17f499920776e0e995434cfa300ff2ff38658fa8.zip
[SPARK-5569][STREAMING] fix ObjectInputStreamWithLoader for supporting load array classes.
When use Kafka DirectStream API to create checkpoint and restore saved checkpoint when restart, ClassNotFound exception would occur. The reason for this error is that ObjectInputStreamWithLoader extends the ObjectInputStream class and override its resolveClass method. But Instead of Using Class.forName(desc,false,loader), Spark uses loader.loadClass(desc) to instance the class, which do not works with array class. For example: Class.forName("[Lorg.apache.spark.streaming.kafka.OffsetRange.",false,loader) works well while loader.loadClass("[Lorg.apache.spark.streaming.kafka.OffsetRange") would throw an class not found exception. details of the difference between Class.forName and loader.loadClass can be found here. http://bugs.java.com/view_bug.do?bug_id=6446627 Author: maxwell <maxwellzdm@gmail.com> Author: DEMING ZHU <deming.zhu@linecorp.com> Closes #8955 from maxwellzdm/master.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala35
2 files changed, 36 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 8a6050f522..b7de6dde61 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -352,7 +352,9 @@ class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoade
override def resolveClass(desc: ObjectStreamClass): Class[_] = {
try {
- return loader.loadClass(desc.getName())
+ // scalastyle:off classforname
+ return Class.forName(desc.getName(), false, loader)
+ // scalastyle:on classforname
} catch {
case e: Exception =>
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index a6956533c0..84f5294aa3 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -17,7 +17,8 @@
package org.apache.spark.streaming
-import java.io.File
+import java.io.{ObjectOutputStream, ByteArrayOutputStream, ByteArrayInputStream, File}
+import org.apache.spark.TestUtils
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.reflect.ClassTag
@@ -34,7 +35,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
import org.apache.spark.streaming.scheduler.{ConstantEstimator, RateTestInputDStream, RateTestReceiver}
-import org.apache.spark.util.{Clock, ManualClock, Utils}
+import org.apache.spark.util.{MutableURLClassLoader, Clock, ManualClock, Utils}
/**
* This test suites tests the checkpointing functionality of DStreams -
@@ -579,6 +580,36 @@ class CheckpointSuite extends TestSuiteBase {
}
}
+ // This tests whether spark can deserialize array object
+ // refer to SPARK-5569
+ test("recovery from checkpoint contains array object") {
+ // create a class which is invisible to app class loader
+ val jar = TestUtils.createJarWithClasses(
+ classNames = Seq("testClz"),
+ toStringValue = "testStringValue"
+ )
+
+ // invisible to current class loader
+ val appClassLoader = getClass.getClassLoader
+ intercept[ClassNotFoundException](appClassLoader.loadClass("testClz"))
+
+ // visible to mutableURLClassLoader
+ val loader = new MutableURLClassLoader(
+ Array(jar), appClassLoader)
+ assert(loader.loadClass("testClz").newInstance().toString == "testStringValue")
+
+ // create and serialize Array[testClz]
+ // scalastyle:off classforname
+ val arrayObj = Class.forName("[LtestClz;", false, loader)
+ // scalastyle:on classforname
+ val bos = new ByteArrayOutputStream()
+ new ObjectOutputStream(bos).writeObject(arrayObj)
+
+ // deserialize the Array[testClz]
+ val ois = new ObjectInputStreamWithLoader(
+ new ByteArrayInputStream(bos.toByteArray), loader)
+ assert(ois.readObject().asInstanceOf[Class[_]].getName == "[LtestClz;")
+ }
/**
* Tests a streaming operation under checkpointing, by restarting the operation