aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala35
2 files changed, 36 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 8a6050f522..b7de6dde61 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -352,7 +352,9 @@ class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoade
override def resolveClass(desc: ObjectStreamClass): Class[_] = {
try {
- return loader.loadClass(desc.getName())
+ // scalastyle:off classforname
+ return Class.forName(desc.getName(), false, loader)
+ // scalastyle:on classforname
} catch {
case e: Exception =>
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index a6956533c0..84f5294aa3 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -17,7 +17,8 @@
package org.apache.spark.streaming
-import java.io.File
+import java.io.{ObjectOutputStream, ByteArrayOutputStream, ByteArrayInputStream, File}
+import org.apache.spark.TestUtils
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.reflect.ClassTag
@@ -34,7 +35,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
import org.apache.spark.streaming.scheduler.{ConstantEstimator, RateTestInputDStream, RateTestReceiver}
-import org.apache.spark.util.{Clock, ManualClock, Utils}
+import org.apache.spark.util.{MutableURLClassLoader, Clock, ManualClock, Utils}
/**
* This test suites tests the checkpointing functionality of DStreams -
@@ -579,6 +580,36 @@ class CheckpointSuite extends TestSuiteBase {
}
}
+ // This tests whether spark can deserialize array object
+ // refer to SPARK-5569
+ test("recovery from checkpoint contains array object") {
+ // create a class which is invisible to app class loader
+ val jar = TestUtils.createJarWithClasses(
+ classNames = Seq("testClz"),
+ toStringValue = "testStringValue"
+ )
+
+ // invisible to current class loader
+ val appClassLoader = getClass.getClassLoader
+ intercept[ClassNotFoundException](appClassLoader.loadClass("testClz"))
+
+ // visible to mutableURLClassLoader
+ val loader = new MutableURLClassLoader(
+ Array(jar), appClassLoader)
+ assert(loader.loadClass("testClz").newInstance().toString == "testStringValue")
+
+ // create and serialize Array[testClz]
+ // scalastyle:off classforname
+ val arrayObj = Class.forName("[LtestClz;", false, loader)
+ // scalastyle:on classforname
+ val bos = new ByteArrayOutputStream()
+ new ObjectOutputStream(bos).writeObject(arrayObj)
+
+ // deserialize the Array[testClz]
+ val ois = new ObjectInputStreamWithLoader(
+ new ByteArrayInputStream(bos.toByteArray), loader)
+ assert(ois.readObject().asInstanceOf[Class[_]].getName == "[LtestClz;")
+ }
/**
* Tests a streaming operation under checkpointing, by restarting the operation