aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authormaxwell <maxwellzdm@gmail.com>2015-10-27 01:31:28 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-10-27 01:31:28 -0700
commit17f499920776e0e995434cfa300ff2ff38658fa8 (patch)
tree70c5388f947c850ac764eb4da6bf472a619834e8 /streaming/src/main
parent8f888eea1aef5a28916ec406a99fc19648681ecf (diff)
downloadspark-17f499920776e0e995434cfa300ff2ff38658fa8.tar.gz
spark-17f499920776e0e995434cfa300ff2ff38658fa8.tar.bz2
spark-17f499920776e0e995434cfa300ff2ff38658fa8.zip
[SPARK-5569][STREAMING] fix ObjectInputStreamWithLoader for supporting load array classes.
When use Kafka DirectStream API to create checkpoint and restore saved checkpoint when restart, ClassNotFound exception would occur. The reason for this error is that ObjectInputStreamWithLoader extends the ObjectInputStream class and override its resolveClass method. But Instead of Using Class.forName(desc,false,loader), Spark uses loader.loadClass(desc) to instance the class, which do not works with array class. For example: Class.forName("[Lorg.apache.spark.streaming.kafka.OffsetRange.",false,loader) works well while loader.loadClass("[Lorg.apache.spark.streaming.kafka.OffsetRange") would throw an class not found exception. details of the difference between Class.forName and loader.loadClass can be found here. http://bugs.java.com/view_bug.do?bug_id=6446627 Author: maxwell <maxwellzdm@gmail.com> Author: DEMING ZHU <deming.zhu@linecorp.com> Closes #8955 from maxwellzdm/master.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala4
1 files changed, 3 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 8a6050f522..b7de6dde61 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -352,7 +352,9 @@ class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoade
override def resolveClass(desc: ObjectStreamClass): Class[_] = {
try {
- return loader.loadClass(desc.getName())
+ // scalastyle:off classforname
+ return Class.forName(desc.getName(), false, loader)
+ // scalastyle:on classforname
} catch {
case e: Exception =>
}