diff options
author | maxwell <maxwellzdm@gmail.com> | 2015-10-27 01:31:28 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-10-27 01:31:28 -0700 |
commit | 17f499920776e0e995434cfa300ff2ff38658fa8 (patch) | |
tree | 70c5388f947c850ac764eb4da6bf472a619834e8 /streaming/src/main | |
parent | 8f888eea1aef5a28916ec406a99fc19648681ecf (diff) | |
download | spark-17f499920776e0e995434cfa300ff2ff38658fa8.tar.gz spark-17f499920776e0e995434cfa300ff2ff38658fa8.tar.bz2 spark-17f499920776e0e995434cfa300ff2ff38658fa8.zip |
[SPARK-5569][STREAMING] fix ObjectInputStreamWithLoader for supporting load array classes.
When use Kafka DirectStream API to create checkpoint and restore saved checkpoint when restart,
ClassNotFound exception would occur.
The reason for this error is that ObjectInputStreamWithLoader extends the ObjectInputStream class and override its resolveClass method. But Instead of Using Class.forName(desc,false,loader), Spark uses loader.loadClass(desc) to instance the class, which do not works with array class.
For example:
Class.forName("[Lorg.apache.spark.streaming.kafka.OffsetRange.",false,loader) works well while loader.loadClass("[Lorg.apache.spark.streaming.kafka.OffsetRange") would throw an class not found exception.
details of the difference between Class.forName and loader.loadClass can be found here.
http://bugs.java.com/view_bug.do?bug_id=6446627
Author: maxwell <maxwellzdm@gmail.com>
Author: DEMING ZHU <deming.zhu@linecorp.com>
Closes #8955 from maxwellzdm/master.
Diffstat (limited to 'streaming/src/main')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala | 4 |
1 files changed, 3 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 8a6050f522..b7de6dde61 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -352,7 +352,9 @@ class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoade override def resolveClass(desc: ObjectStreamClass): Class[_] = { try { - return loader.loadClass(desc.getName()) + // scalastyle:off classforname + return Class.forName(desc.getName(), false, loader) + // scalastyle:on classforname } catch { case e: Exception => } |